5个版本 (重大变更)
0.12.0 | 2021年3月10日 |
---|---|
0.11.0 | 2019年11月18日 |
0.10.0 | 2019年7月9日 |
0.9.0 | 2019年3月31日 |
0.7.0 | 2018年9月17日 |
#515 in Rust模式
1,060 每月下载量
在 13 个crate中使用了 (2个直接使用)
9KB
70 行
Timely Dataflow
Timely dataflow 是一种低延迟循环数据流计算模型,在论文 Naiad: a timely dataflow system 中引入。该项目是在Rust中对timely dataflow的扩展和更模块化的实现。
该项目类似于分布式数据并行计算引擎,可以将同一程序从笔记本电脑上的单个线程扩展到计算机集群的分布式执行。主要目标是表达能力和高性能。假设您还没有使用timely dataflow,它可能比您目前正在使用的任何东西都要严格得多,并且更快。
请务必阅读 timely dataflow 的文档。这是一个正在进行的工作,但大多数都在改进。在 mdbook
格式的 长篇文本 中有更多内容,其中包含针对当前构建测试的示例。还有一系列博客文章 (第一部分,第二部分,第三部分),以不同的方式介绍timely dataflow,但请注意,那里的示例可能需要调整以与当前代码构建。
一个示例
要使用timely dataflow,请在您的项目 Cargo.toml
文件的依赖关系部分添加以下内容
[dependencies]
timely="*"
这将从 crates.io 引入 timely
crate,这将允许您开始编写类似以下这样的timely dataflow程序(也可在 timely/examples/simple.rs 中找到)
extern crate timely;
use timely::dataflow::operators::*;
fn main() {
timely::example(|scope| {
(0..10).to_stream(scope)
.inspect(|x| println!("seen: {:?}", x));
});
}
您可以通过在 timely-dataflow
仓库的根目录中键入以下内容来运行此示例
% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9
这是一个非常简单的示例(其名称中就有体现),仅展示了如何编写数据流程序。
进行更多操作
对于更复杂的示例,可以考虑非常相似的(但更明确的)examples/hello.rs,该示例分别创建和驱动数据流。
extern crate timely;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();
// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope.input_from(&mut input)
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe_with(&mut probe);
});
// introduce data and watch!
for round in 0..10 {
if index == 0 {
input.send(round);
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}
这个示例做了更多的事情,以展示Timely能为您做的更多功能。
我们首先构建一个数据流图,创建一个输入流(使用input_from
),我们将输出exchange
以驱动记录在工作之间(使用数据本身来指示哪个工作应该路由到)。我们inspect
数据并打印工作索引以指示哪个工作接收了哪些数据,然后probe
结果,以便每个工作都可以看到一轮给定数据已经被处理。
然后通过重复引入数据轮来驱动计算,其中round
本身被用作数据。在每一轮中,每个工作都引入相同的数据,然后重复进行数据流步骤,直到probe
显示所有工作都已处理完该周期的所有工作,此时计算继续。
对于两个工作,输出如下
% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0: hello 0
worker 1: hello 1
worker 0: hello 2
worker 1: hello 3
worker 0: hello 4
worker 1: hello 5
worker 0: hello 6
worker 1: hello 7
worker 0: hello 8
worker 1: hello 9
请注意,尽管工作0引入了数据(0..10)
,但每个元素都被路由到特定的工作,正如我们预期的那样。
执行
上面的hello.rs
程序默认使用单个工作线程。要在一个进程中使用多个线程,请使用-w
或--workers
选项后跟您希望使用的线程数。(注意:simple.rs
程序始终使用一个工作线程;它使用timely::example
,该示例忽略用户提供的输入)。
要使用多个进程,您需要使用-h
或--hostfile
选项指定一个文本文件,该文件的行是hostname:port
条目,对应于您计划启动进程的位置。您需要使用-n
或--processes
参数来指示您将启动多少进程(主机文件的起始部分),并且每个进程都必须使用-p
或--process
参数来指示其在该数字中的索引。
换句话说,您希望主机文件看起来像这样
% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...
然后按照以下方式启动进程
host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3
每个进程的工作者数量应相同。
生态系统
Timely数据流旨在支持多个抽象级别,从最低级别的手动数据流组装到更高层次的“声明性”抽象。
目前有几种编写Timely数据流程序的方法。理想情况下,随着感兴趣的人编写他们自己的层(或建立在他人的基础上),这个集合将随着时间的推移而扩展。
-
Timely数据流:Timely数据流包括几个原始操作符,包括标准操作符如
map
、filter
和concat
。它还包括用于进入和退出循环(enter
和leave
)等任务的更奇特的操作符,以及可以使用闭包提供实现的通用操作符(unary
和binary
)。 -
微分数据流:建立在及时数据流之上的高级语言,微分数据流包括
group
、join
和iterate
等运算符。其实现完全增量化,细节相当酷(如果有点神秘)。
还有一些基于及时数据流的应用程序,包括 一个流式最坏情况最优连接实现 和一个 PageRank 实现,这两个都应该提供编写及时数据流程序的有帮助示例。
贡献
如果您对与及时数据流合作或提供帮助感兴趣,那太好了!
有一些工作类别对我们很有帮助,也许对您也很有趣。有几个广泛的类别,然后是一系列不断变化的、各种复杂程度的问题。
-
如果您想用及时数据流编写程序,这对我们来说非常有趣。理想情况下,及时数据流旨在成为一种非平凡类数据流计算的人性化方法。随着人们使用它并报告他们的经验,我们了解他们发现的错误类别、人体工程学的痛点以及其他我们事先甚至没有想象到的事情。了解及时数据流、尝试使用它并反馈是有帮助的!
-
如果您喜欢编写小型示例程序或文档测试,及时数据流中的示例相对较少,或者实际上并没有测试所展示的功能。这些通常可以轻松上手、充实并推动,而无需大量前期承诺。这可能也是我们中的一位详细向您解释某些内容的绝佳方式,如果您需要的话。
-
如果您喜欢在及时数据流中“亲自动手”,问题跟踪器 有各种问题,涉及堆栈的不同级别。例如
-
及时当前 比它必须的复制了更多的数据,这是为了最直接地取悦 Rust 的所有权纪律。其中一些复制可以通过在资源管理中更加小心地省略(例如,使用与 bytes crate 相同方式的一个
Vec<u8>
的共享区域)来省略。这里不是所有的事情都显而易见,所以这里也有一些设计工作。 -
我们最近实施了一系列日志更改,但仍有一个 想拥有的功能列表 还没有实现。如果您对通过探索记录它所做事情的基礎设施来了解及时的工作方式感兴趣,这可能是一个不错的选择!它还有一个额外的好处,即日志本身就是及时流,您甚至可以在及时上进行一些日志处理。哇...
-
有一个关于 将 Rust 所有权习惯用法集成到及时数据流中 的开放问题。目前,及时流是可复制的对象,当流被重用时,项目将被复制。我们可以使这更加明确,并要求调用一个
.cloned()
方法来获取所有权的对象,就像迭代器需要它一样。同时,在不获取所有权的情况下使用流引用应该让您有机会查看经过的记录,而不需要复制(并且不需要像目前那样进行复制)。这对于可能需要序列化数据但不能充分利用所有权的交换通道来说已经足够了。 -
在及时数据流操作符的调度方面有许多有趣的工作,当有机会调度许多操作符时,我们可能会稍作思考,意识到其中一些没有工作可做,可以跳过。更好的做法是维护一个有待处理操作符的列表,对于那些没有工作可做的操作符,则什么都不做。
-
还有一些更大的工作主题,其解决方案并不明显,每个都有解决各种性能问题的潜力。
输出速率控制
目前,一元操作符和二元操作符的实现允许它们的闭包发送未限定的输出量。这可能导致资源耗尽,性能下降,如果运行时需要分配大量新内存来缓冲大量发送的数据而没有机会消化它。通常情况下,当产生大量数据时,它们最终会在有机会的情况下减少。
在当前接口中,可以做的事情不多。一个可能的变化是让 input
和 notificator
对象分别从输入消息或时间戳请求一个闭包到一个输出迭代器。这给了系统以它们认为合适的方式来运行迭代器的机会。由于许多操作符产生基于独立键的数据并行输出,构建这样的迭代器可能并不会带来太大的负担。
缓冲区管理
及时通信层目前在交换通道中丢弃它移动的大多数缓冲区,因为它没有合理的输出速率控制方法,也没有合理的确定应该缓存多少缓冲区的方法。如果这两个问题中的任何一个得到解决,那么回收缓冲区以避免随机分配就很有意义,特别是在小批量中。这些更改对 dataflow-join
三角形计算工作负载有大约10%-20%的性能影响。
支持不可序列化类型
通信层基于一个类型 Content<T>
,它可以由类型化或二进制数据支持。因此,它要求它支持的类型是可序列化的,因为它需要为数据是二进制的情况提供逻辑,即使这个情况没有被使用。看起来 Stream
类型应该可以扩展为在用于数据的存储类型上参数化,这样我们就可以表达某些类型不可序列化,这是可以接受的。
注意:差分数据流在其 operators/arrange.rs
中展示了如何在用户级别做这件事,尽管方法有些简略(通过一个封装器来欺骗传输类型的属性)。
这将使我们能够在使用 Pipeline
并行化合同的情况下安全地传递 Rc
类型。
粗粒度与细粒度时间戳
进度跟踪机制涉及每个时间戳的一些非平凡开销。这意味着使用非常细粒度的时间戳,例如处理记录时的纳秒,可能会淹没进度跟踪逻辑。相比之下,日志基础设施将纳秒降低为数据,成为日志有效负载的一部分,并使用批次中最小的时间戳来近似事件批次。这在进度跟踪方面不太准确,但性能更好。可能可以将这一点推广,使得用户可以编写无需考虑时间戳粒度的程序,并且当可能时系统会自动粗化(本质上是对时间进行箱形编码)。
注意:差分数据流在它的 collection.rs
中展示了如何在用户级别实现这一点。由于缺乏系统支持,用户最终需要指定粒度,这虽然不是什么大问题,但可能可以进一步改进。也可能是因为留给用户对粒度的控制,使他们能够在延迟/吞吐量权衡方面有更多的控制,这对系统来说可能是一件好事。
lib.rs
:
对 bytes
crate 的简化实现,具有不同的特性,安全性较低。
示例
use timely_bytes::arc::Bytes;
let bytes = vec![0u8; 1024];
let mut shared1 = Bytes::from(bytes);
let mut shared2 = shared1.extract_to(100);
let mut shared3 = shared1.extract_to(100);
let mut shared4 = shared2.extract_to(60);
assert_eq!(shared1.len(), 824);
assert_eq!(shared2.len(), 40);
assert_eq!(shared3.len(), 100);
assert_eq!(shared4.len(), 60);
for byte in shared1.iter_mut() { *byte = 1u8; }
for byte in shared2.iter_mut() { *byte = 2u8; }
for byte in shared3.iter_mut() { *byte = 3u8; }
for byte in shared4.iter_mut() { *byte = 4u8; }
// memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
assert_eq!(shared4.len(), 1024);