5 个稳定版本

2.0.0 2020年1月8日
1.0.3 2019年8月6日
1.0.1 2019年8月5日

#502数据库接口


3 crates 使用

MIT/Apache

39KB
602

rettle

LICENSE License Build Status Crates.io Version Minimum rustc version

这个库是一个受 Keras 启发的多线程 ETL(提取、转换、加载),允许“调酒师”定义数据转换和输出的任何顺序。

类型

rettle 提供以下类型,以便在任何项目中“酿造”数据

  • 锅:包含数据源、接收器和转换指令的容器(见下面的成分类型
  • 酿酒厂:包含酿酒师并给他们发送任务和要处理的初始茶的状态的管理器
  • 酿酒师:酿造茶的工作者

特质

  • 成分:定义可以包含在 ETL 菜谱中的步骤
  • 参数:定义一个成分操作可以使用的附加参数(可选)

成分类型

  • 填充:数据输入源
  • 输血:组合在此步骤之前定义的多个源的数据 尚未实现
  • 浸泡:数据转换步骤
  • 撇去:移除一个字段(或茶对象) 尚未实现
  • 倒出:数据输出目标

使用 rettle

在您的自定义项目中,您首先需要定义由 Fill 成分创建的“茶”结构。

示例

pub struct TextTea {
    pub x: i32,
    pub str_val: String,
    pub y: bool,
}

接下来,您可以创建一个新的 Pot 结构,并为其提供源和成分,然后在调用它的 brew() 方法之前,启动酿造过程。成分可以与可选的 Argument 特质结构一起提供,以传递由您的自定义过滤器使用的附加运行时参数。

可选浸泡参数示例

pub struct SteepArgs {
    pub increment: i32,
}

impl Argument for SteepArgs {
    fn as_any(&self) -> &dyn Any {
        self
    }
}

最后,必须创建一个 Brewery 结构来指定要运行的 Brewers(线程)数量,并提供一个 start_time 值以提供运行时间指标。

Fill 操作收集并传递要处理的 Tea 对象到 Brewery,由 Brewers 处理。

示例项目代码

fn main() {
    // Initialize variables
    let mut new_pot = Pot::new();
    let brewery = Brewery::new(2);
    let steep_args = SteepArgs { increment: 10000 };
    
    // Add source to pot
    new_pot = new_pot.add_source(Box::new(Fill{
        name: String::from("fake_tea"),
        source: String::from("hardcoded"),
        computation: Box::new(|_args, brewery, recipe| {
            let total_data = 1000000;
            let batch_size = 200;
            let num_iterations = total_data / batch_size;
            println!("Testing {} iterations", total_data);
            for _ in 0 .. num_iterations {
                let mut tea_batch = Vec::with_capacity(batch_size);
                for _ in 0 .. batch_size {
                    tea_batch.push(Box::new(TextTea::default()) as Box<dyn Tea + Send>);
                }
                let recipe = Arc::clone(&recipe);
                brewery.take_order(|| {
                    make_tea(tea_batch, recipe);
                });
            }
        }),
        params: None,
    }));
    
    // Add ingredients to pot
    new_pot = new_pot.add_ingredient(Box::new(Steep{
        name: String::from("steep1"),
        computation: Box::new(|tea_batch, args| {
            tea_batch.into_iter()
                .map(|tea| {
                    let tea = tea.as_any().downcast_ref::<TextTea>().unwrap();
                    let mut new_tea = tea.clone();
                    match args {
                        None => panic!("No params passed, not editing object!"),
                        Some(box_args) => {
                            let box_args = box_args.as_any().downcast_ref::<SteepArgs>().unwrap();
                            new_tea.x = new_tea.x - box_args.increment;
                        }
                    }
                    Box::new(new_tea) as Box<dyn Tea + Send>
                })
                .collect()
        }),
        params: Some(Box::new(steep_args)),
    }))

    new_pot = new_pot.add_ingredient(Box::new(Pour{
        name: String::from("pour1"),
        computation: Box::new(|tea_batch, _args| {
            tea_batch.into_iter()
                .map(|tea| {
                    //println!("Final Tea: {:?}", tea.as_any().downcast_ref::<TextTea>().unwrap());
                    let tea = tea.as_any().downcast_ref::<TextTea>().unwrap();
                    let same_tea = TextTea { x: tea.x, str_val: String::from(&tea.str_val[..]), y: tea.y };
                    Box::new(same_tea) as Box<dyn Tea + Send>
                })
                .collect()
        }),
        params: None,
    }));
    
    // Process Tea
    new_pot.brew(&brewery);
    
    // Display information
    brewery.get_brewer_info();
    println!("Number of sources: {}", new_pot.get_sources().len());
    println!("Number of steps: {}", new_pot.get_recipe().read().unwrap().len());
}

配料箱

社区可以添加配料箱,这些配料箱可以与该箱一起使用,以简化添加常见集成或转换的配料。一些示例配料箱包括

  • cstea:csv文件填充/倒出集成
  • elastictea:Elasticsearch填充/倒出集成
  • logtea:日志文件填充集成

依赖项

~0.7–1.4MB
~33K SLoC