5 个稳定版本
2.0.0 | 2020年1月8日 |
---|---|
1.0.3 | 2019年8月6日 |
1.0.1 | 2019年8月5日 |
#502 在 数据库接口
被 3 crates 使用
39KB
602 行
rettle
这个库是一个受 Keras 启发的多线程 ETL(提取、转换、加载),允许“调酒师”定义数据转换和输出的任何顺序。
类型
rettle 提供以下类型,以便在任何项目中“酿造”数据
- 锅:包含数据源、接收器和转换指令的容器(见下面的成分类型)
- 酿酒厂:包含酿酒师并给他们发送任务和要处理的初始茶的状态的管理器
- 酿酒师:酿造茶的工作者
特质
- 成分:定义可以包含在 ETL 菜谱中的步骤
- 参数:定义一个成分操作可以使用的附加参数(可选)
成分类型
- 填充:数据输入源
- 输血:组合在此步骤之前定义的多个源的数据 尚未实现
- 浸泡:数据转换步骤
- 撇去:移除一个字段(或茶对象) 尚未实现
- 倒出:数据输出目标
使用 rettle
在您的自定义项目中,您首先需要定义由 Fill
成分创建的“茶”结构。
示例
pub struct TextTea {
pub x: i32,
pub str_val: String,
pub y: bool,
}
接下来,您可以创建一个新的 Pot
结构,并为其提供源和成分,然后在调用它的 brew()
方法之前,启动酿造过程。成分可以与可选的 Argument
特质结构一起提供,以传递由您的自定义过滤器使用的附加运行时参数。
可选浸泡参数示例
pub struct SteepArgs {
pub increment: i32,
}
impl Argument for SteepArgs {
fn as_any(&self) -> &dyn Any {
self
}
}
最后,必须创建一个 Brewery
结构来指定要运行的 Brewers
(线程)数量,并提供一个 start_time
值以提供运行时间指标。
Fill
操作收集并传递要处理的 Tea
对象到 Brewery
,由 Brewers
处理。
示例项目代码
fn main() {
// Initialize variables
let mut new_pot = Pot::new();
let brewery = Brewery::new(2);
let steep_args = SteepArgs { increment: 10000 };
// Add source to pot
new_pot = new_pot.add_source(Box::new(Fill{
name: String::from("fake_tea"),
source: String::from("hardcoded"),
computation: Box::new(|_args, brewery, recipe| {
let total_data = 1000000;
let batch_size = 200;
let num_iterations = total_data / batch_size;
println!("Testing {} iterations", total_data);
for _ in 0 .. num_iterations {
let mut tea_batch = Vec::with_capacity(batch_size);
for _ in 0 .. batch_size {
tea_batch.push(Box::new(TextTea::default()) as Box<dyn Tea + Send>);
}
let recipe = Arc::clone(&recipe);
brewery.take_order(|| {
make_tea(tea_batch, recipe);
});
}
}),
params: None,
}));
// Add ingredients to pot
new_pot = new_pot.add_ingredient(Box::new(Steep{
name: String::from("steep1"),
computation: Box::new(|tea_batch, args| {
tea_batch.into_iter()
.map(|tea| {
let tea = tea.as_any().downcast_ref::<TextTea>().unwrap();
let mut new_tea = tea.clone();
match args {
None => panic!("No params passed, not editing object!"),
Some(box_args) => {
let box_args = box_args.as_any().downcast_ref::<SteepArgs>().unwrap();
new_tea.x = new_tea.x - box_args.increment;
}
}
Box::new(new_tea) as Box<dyn Tea + Send>
})
.collect()
}),
params: Some(Box::new(steep_args)),
}))
new_pot = new_pot.add_ingredient(Box::new(Pour{
name: String::from("pour1"),
computation: Box::new(|tea_batch, _args| {
tea_batch.into_iter()
.map(|tea| {
//println!("Final Tea: {:?}", tea.as_any().downcast_ref::<TextTea>().unwrap());
let tea = tea.as_any().downcast_ref::<TextTea>().unwrap();
let same_tea = TextTea { x: tea.x, str_val: String::from(&tea.str_val[..]), y: tea.y };
Box::new(same_tea) as Box<dyn Tea + Send>
})
.collect()
}),
params: None,
}));
// Process Tea
new_pot.brew(&brewery);
// Display information
brewery.get_brewer_info();
println!("Number of sources: {}", new_pot.get_sources().len());
println!("Number of steps: {}", new_pot.get_recipe().read().unwrap().len());
}
配料箱
社区可以添加配料箱,这些配料箱可以与该箱一起使用,以简化添加常见集成或转换的配料。一些示例配料箱包括
- cstea:csv文件填充/倒出集成
- elastictea:Elasticsearch填充/倒出集成
- logtea:日志文件填充集成
依赖项
~0.7–1.4MB
~33K SLoC