#data-processing #data-pipeline #machine-learning #directed-acyclic-graph #data-loader #node #composable

dataflow

数据流是一个数据处理库,主要用于机器学习

9 个不稳定版本 (3 个破坏性更新)

0.4.0 2023 年 5 月 29 日
0.3.2 2022 年 12 月 20 日
0.2.2 2022 年 11 月 6 日
0.2.0 2022 年 4 月 11 日
0.1.1 2022 年 1 月 18 日

#248 in 机器学习

Download history 5/week @ 2024-03-09 1/week @ 2024-03-16 14/week @ 2024-03-30 5/week @ 2024-04-06

每月 66 次下载
用于 dataflow_nlp

MIT/Apache

42KB
1K SLoC

数据流

image

CI Status Current Crates.io Version Documentation

数据流是一个数据处理库,提供可组合的基本工具来构建灵活、快速且静态类型的数据处理管道。管道是一个有向无环数据流图,数据加载器可以在单独的线程上运行,以向对数据需求大的应用程序提供数据。

使用方法

要构建管道,首先从加载器节点开始

use dataflow::prelude::*;

fn main() {
  let pipeline = FileLoader::from_directory("my_data_directory");
}

FileLoader 按随机顺序从目录中加载文件。接下来,使用 map() 函数向其添加转换

let pipeline = FileLoader::from_directory("my_data_directory")
      .map(|(_, text)| format!("Hello {}", text)) // Add hello to each file

map() 接收一个每次处理一个样本的节点。如果我们想进行批处理,我们可以使用 .chain(),它接收一次处理一个批次的节点。

重要提示:所有函数和闭包也都是节点! 这意味着每当我们要添加无状态的转换时,我们只需使用一个函数即可。在这种情况下,闭包接收一个数据点并输出一个数据点。

现在我们已经在每行添加了 "Hello ",让我们在管道中使用来自 dataflow_nlp 的分词器

// Our tokenizer
let tokenizer = WordpieceTokenizer::load();

// Our pipeline
let pipeline = FileLoader::from_directory("my_data_directory")
      .map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
      .chain(tokenizer); // Tokenize the lines

太好了!现在我们的数据在批量中有效地分词了。现在,我们将一次从管道中获取一个分词后的句子。但如果我们想获取批量呢?让我们使用一个批处理节点


// Our tokenizer
let tokenizer = dataflow_nlp::tokenization::WordpieceTokenizer::load();

// Our pipeline
let pipeline = FileLoader::from_directory("my_data_directory")
      .map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
      .chain(tokenizer) // Tokenize the files
      .chain(Batch::new(64)); // Create batches of 64

这就完成了!现在我们将获得 64 个分词句子的批量。

加载器节点

如前所述,管道中的所有内容都实现了 Node 特性。RandomLoader 也是一个节点!因此,由于数据来自它,并且节点需要一个 输入 和一个 输出,它接收什么作为输入?很简单,它接收 Vec<()> 作为输入,这是管道将开始的,并产生数据 (Vec) 以通过管道发送。这种模式在所有从数据源开始的节点中都是相同的。

自定义节点

实际上,您也可以通过实现 Node 特性来实现您自己的节点!

pub trait Node<Input> {
    type Output;

    /// Process a batch of data
    fn process(&mut self, input: Input) -> Self::Output;
    /// Reset signal propogates through pipeline
    fn reset(&mut self) {}
    /// Get number of examples left
    fn data_remaining(&self, before: usize) -> usize {
        before // Defaults to same as previous remaining data
    }
}

然后,您可以直接将自定义节点插入到管道中!

数据加载器

既然我们构建了这个酷炫的管道,我们能用它做什么呢?首先,我们可以简单地调用 process() 函数并传入一些数据

// The RandomLoader takes in a () for each sample, so we pass in a batch as Vec<()>
let output: Vec<Vec<Vec<String>>> = pipeline.process(vec![(); 128])

// Output should now contain 2 batches of 64 tokenized sentences from our files with "Hello" prepended.

让我们来做点更酷的事情。让我们将它放入数据加载器中,并在机器学习训练循环中使用它

// Make the dataloader
let mut dataloader = Dataloader(pipeline);

// Training loop
for example in &mut dataloader {
   // Now example is a vector of tokenized strings!
   // Do with them what you please...
}

待办事项

  • 让数据加载器使用多队列,而不是将所有示例都排空到主线程的缓冲区
  • 使用 rayon 制作自动并行管道节点
  • 添加异步功能和远程来源。(由稳定的异步特性阻止)

依赖关系

~775KB
~14K SLoC