9 个版本
新版本 0.1.8 | 2024 年 8 月 24 日 |
---|---|
0.1.7 | 2024 年 8 月 19 日 |
0.1.2 | 2024 年 4 月 2 日 |
#10 在 #项目
658 每月下载
64KB
1K SLoC
批量处理
并行处理项目列表的库。
简介
该想法是使用低内存使用和高性能处理大量数据。该概念涉及并行处理项目列表,同时处理的项数最多。
示例
这是一个使用异步编程进行批量处理的简单示例,处理 CSV 文件并将数据插入数据库。
let step_builder: AsyncComplexStepBuilder<Result<StringRecord, csv_async::Error>, CarPrice> = AsyncComplexStepBuilder::get("csv_transfer".to_string())
.reader(
Box::new(move || {
let csv_file = csv_path.clone();
return Box::pin(async move {
let csv_file = tokio::fs::File::open(csv_file).await.expect("Error opening file");
let reader = csv_async::AsyncReader::from_reader(csv_file);
let records = reader.into_records();
let stream: Box<dyn Stream<Item=Result<StringRecord, csv_async::Error>> + Send + Unpin>
= Box::new(records);
stream
});
})
).processor(
Box::new(
|csv_line: Result<StringRecord, csv_async::Error>| {
let car_price = csv_line.unwrap();
return Box::pin(
async move {
let car_price = CarPrice {
id: None,
year: car_price.get(1).unwrap().parse::<i32>().ok(),
make: car_price.get(2).map(|s| Some(s.to_string())).unwrap_or(None),
model: car_price.get(3).map(|s| Some(s.to_string())).unwrap_or(None),
trim: car_price.get(4).map(|s| Some(s.to_string())).unwrap_or(None),
body: car_price.get(5).map(|s| Some(s.to_string())).unwrap_or(None),
transmission: car_price.get(6).map(|s| Some(s.to_string())).unwrap_or(None),
vin: car_price.get(7).map(|s| Some(s.to_string())).unwrap_or(None),
state: car_price.get(8).map(|s| Some(s.to_string())).unwrap_or(None),
condition: car_price.get(9).map(|s| s.parse::<i32>().ok()).unwrap_or(None),
odometer: car_price.get(10).map(|s| s.parse::<i32>().ok()).unwrap_or(None),
color: car_price.get(11).map(|s| Some(s.to_string())).unwrap_or(None),
interior: car_price.get(12).map(|s| Some(s.to_string())).unwrap_or(None),
seller: car_price.get(13).map(|s| Some(s.to_string())).unwrap_or(None),
nmr: car_price.get(14).map(|s| s.parse::<i32>().ok()).unwrap_or(None),
sellingprice: car_price.get(15).map(|s| s.parse::<i32>().ok()).unwrap_or(None),
saledate: None,
};
car_price
}
);
}
)
).writer(
Box::new(
move |vec_car_price: Vec<CarPrice>| {
let pool = Arc::clone(&pool);
let all_memory_usage = Arc::clone(&all_memory_usage);
return Box::pin(
async move {
let mut conn = pool.get().await.expect("Error getting connection");
insert_into(car_prices::table)
.values(vec_car_price)
.execute(&mut conn)
.await
.expect("Error inserting data");
let current_mem = PEAK_ALLOC.current_usage_as_mb() as i32;
all_memory_usage.lock().await.push(current_mem);
}
);
}
)
).chunk_size(2000);
let step = step_builder.build();
step.run().await;
运行集成测试的步骤
- 安装 docker 和 docker-compose
- 运行命令
docker-compose up -d
启动数据库 - 运行命令
cargo test -- --ignored
运行集成测试 - 运行命令
docker-compose down
停止数据库
依赖项
~0–1.7MB
~30K SLoC