9 个不稳定版本 (3 个破坏性更新)
0.4.0 | 2023 年 5 月 29 日 |
---|---|
0.3.2 | 2022 年 12 月 20 日 |
0.2.2 | 2022 年 11 月 6 日 |
0.2.0 | 2022 年 4 月 11 日 |
0.1.1 | 2022 年 1 月 18 日 |
#248 in 机器学习
每月 66 次下载
用于 dataflow_nlp
42KB
1K SLoC
数据流
数据流是一个数据处理库,提供可组合的基本工具来构建灵活、快速且静态类型的数据处理管道。管道是一个有向无环数据流图,数据加载器可以在单独的线程上运行,以向对数据需求大的应用程序提供数据。
使用方法
要构建管道,首先从加载器节点开始
use dataflow::prelude::*;
fn main() {
let pipeline = FileLoader::from_directory("my_data_directory");
}
FileLoader 按随机顺序从目录中加载文件。接下来,使用 map()
函数向其添加转换
let pipeline = FileLoader::from_directory("my_data_directory")
.map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
map()
接收一个每次处理一个样本的节点。如果我们想进行批处理,我们可以使用 .chain()
,它接收一次处理一个批次的节点。
重要提示:所有函数和闭包也都是节点! 这意味着每当我们要添加无状态的转换时,我们只需使用一个函数即可。在这种情况下,闭包接收一个数据点并输出一个数据点。
现在我们已经在每行添加了 "Hello ",让我们在管道中使用来自 dataflow_nlp
的分词器
// Our tokenizer
let tokenizer = WordpieceTokenizer::load();
// Our pipeline
let pipeline = FileLoader::from_directory("my_data_directory")
.map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
.chain(tokenizer); // Tokenize the lines
太好了!现在我们的数据在批量中有效地分词了。现在,我们将一次从管道中获取一个分词后的句子。但如果我们想获取批量呢?让我们使用一个批处理节点
// Our tokenizer
let tokenizer = dataflow_nlp::tokenization::WordpieceTokenizer::load();
// Our pipeline
let pipeline = FileLoader::from_directory("my_data_directory")
.map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
.chain(tokenizer) // Tokenize the files
.chain(Batch::new(64)); // Create batches of 64
这就完成了!现在我们将获得 64 个分词句子的批量。
加载器节点
如前所述,管道中的所有内容都实现了 Node
特性。RandomLoader 也是一个节点!因此,由于数据来自它,并且节点需要一个 输入 和一个 输出,它接收什么作为输入?很简单,它接收 Vec<()> 作为输入,这是管道将开始的,并产生数据 (Vec) 以通过管道发送。这种模式在所有从数据源开始的节点中都是相同的。
自定义节点
实际上,您也可以通过实现 Node
特性来实现您自己的节点!
pub trait Node<Input> {
type Output;
/// Process a batch of data
fn process(&mut self, input: Input) -> Self::Output;
/// Reset signal propogates through pipeline
fn reset(&mut self) {}
/// Get number of examples left
fn data_remaining(&self, before: usize) -> usize {
before // Defaults to same as previous remaining data
}
}
然后,您可以直接将自定义节点插入到管道中!
数据加载器
既然我们构建了这个酷炫的管道,我们能用它做什么呢?首先,我们可以简单地调用 process() 函数并传入一些数据
// The RandomLoader takes in a () for each sample, so we pass in a batch as Vec<()>
let output: Vec<Vec<Vec<String>>> = pipeline.process(vec![(); 128])
// Output should now contain 2 batches of 64 tokenized sentences from our files with "Hello" prepended.
让我们来做点更酷的事情。让我们将它放入数据加载器中,并在机器学习训练循环中使用它
// Make the dataloader
let mut dataloader = Dataloader(pipeline);
// Training loop
for example in &mut dataloader {
// Now example is a vector of tokenized strings!
// Do with them what you please...
}
待办事项
- 让数据加载器使用多队列,而不是将所有示例都排空到主线程的缓冲区
- 使用 rayon 制作自动并行管道节点
- 添加异步功能和远程来源。(由稳定的异步特性阻止)
依赖关系
~775KB
~14K SLoC