1 个不稳定版本
0.13.0-dev.1 | 2023年12月15日 |
---|
在 #timely 中排名第 5
每月下载量 2,322
在 4 个crate中使用(其中2个直接使用)
9KB
70 行
Timely Dataflow
Timely dataflow 是一种低延迟循环数据流计算模型,由论文 Naiad: a timely dataflow system 提出。该项目是 Rust 中 timely dataflow 的扩展和更模块化的实现。
该项目类似于一个分布式数据并行计算引擎,可以将相同的程序从笔记本电脑上的单个线程扩展到集群中的分布式执行。主要目标是表达能力和高性能。假设您还没有使用 timely dataflow,它可能比您目前使用的任何东西都更加表达丰富和快速。
请务必阅读 timely dataflow 的文档。它仍在进行中,但大部分都在改进。在 mdbook
格式中有更多 长文本,其中包含针对当前构建进行测试的示例。还有一系列博客文章(第一部分、第二部分、第三部分),以不同的方式介绍 timely dataflow,但请注意,那里的示例可能需要调整才能与当前代码兼容。
示例
要使用 timely dataflow,请将以下内容添加到项目 Cargo.toml
文件的依赖部分
[dependencies]
timely="*"
这将从 crates.io 引入 timely
crate,这将允许您开始编写类似这样的 timely dataflow 程序(也可在 timely/examples/simple.rs 中找到)
extern crate timely;
use timely::dataflow::operators::*;
fn main() {
timely::example(|scope| {
(0..10).to_stream(scope)
.inspect(|x| println!("seen: {:?}", x));
});
}
您可以通过在 timely-dataflow
仓库的根目录中输入以下内容来运行此示例
% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9
这是一个非常简单的示例(名字已经说明了这一点),它只是暗示了您可能如何编写数据流程序。
做更多的事情
对于更复杂的示例,可以考虑非常相似的(但更明确的)examples/hello.rs,它分别创建和驱动数据流
extern crate timely;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();
// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope.input_from(&mut input)
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe_with(&mut probe);
});
// introduce data and watch!
for round in 0..10 {
if index == 0 {
input.send(round);
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}
此示例做了相当多的事情,以展示 timely 可以为您做更多的事情。
我们首先构建一个数据流图,创建一个输入流(使用 input_from
),然后将其输出 exchange
来驱动记录在工作者之间的传输(使用数据本身来指示路由到哪个工作者)。我们 inspect
数据并打印工作者索引,以指示哪个工作者接收了哪些数据,然后 probe
结果,以便每个工作者都能看到一轮给定数据何时已处理完毕。
然后,我们通过重复引入数据轮次来驱动计算,其中 round
本身用作数据。在每一轮中,每个工作者引入相同的数据,然后重复执行数据流步骤,直到 probe
显示所有工作者都已处理完该轮次的所有工作,此时计算继续进行。
对于两个工作者,输出如下
% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0: hello 0
worker 1: hello 1
worker 0: hello 2
worker 1: hello 3
worker 0: hello 4
worker 1: hello 5
worker 0: hello 6
worker 1: hello 7
worker 0: hello 8
worker 1: hello 9
注意,尽管工作者零引入了数据 (0..10)
,但每个元素都路由到特定的工作者,正如我们所期望的那样。
执行
上面的 hello.rs
程序默认使用单个工作者线程。要在一个进程中使用多个线程,请使用 -w
或 --workers
选项,后跟您希望使用的线程数。(注意:simple.rs
程序始终使用一个工作者线程;它使用 timely::example
,该程序忽略用户提供的输入)。
要使用多个进程,您需要使用 -h
或 --hostfile
选项来指定一个文本文件,该文件的行是 hostname:port
条目,对应于您计划启动进程的位置。您需要使用 -n
或 --processes
参数来指示您将启动多少个进程(主机文件的前缀),并且每个进程必须使用 -p
或 --process
参数来指示它们在这个数字中的索引。
换句话说,您想要的主机文件如下所示,
% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...
然后以这种方式启动进程
host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3
每个进程的工作者数应相同。
生态系统
Timely数据流旨在支持多个抽象级别,从最低级别的手动数据流组装,到高级“声明式”抽象。
目前有一些编写Timely数据流程序的选择。理想情况下,随着时间的推移,这个集会不断扩展,因为感兴趣的人会编写自己的层(或基于他人的层)。
-
Timely数据流:Timely数据流包括几个原始运算符,包括标准运算符如
map
、filter
和concat
。它还包括用于进入和退出循环(enter
和leave
)等任务的一些更特殊的运算符,以及可以使用闭包提供实现的通用运算符(unary
和binary
)。 -
差分数据流:Differential数据流是基于Timely数据流的高级语言,包括
group
、join
和iterate
等运算符。其实现完全增量化,细节非常酷(如果有些神秘)。
还有几个基于实时数据流的应用程序,包括一个流式最坏情况最优连接实现和PageRank实现,这两个都应该为编写实时数据流程序提供有用的示例。
贡献
如果您对与实时数据流合作或提供帮助感兴趣,太好了!
有一些工作类别对我们有帮助,也可能对您感兴趣。有几个广泛的类别,然后是一系列不断变化的问题,这些问题具有不同的复杂度。
-
如果您想使用实时数据流编写程序,这对我们来说非常有趣。理想情况下,实时数据流旨在成为一种对非平凡类数据流计算的人性化方法。随着人们使用它并报告他们的经验,我们了解他们发现的bug类别、人体工程学痛点以及其他我们没有提前想象到的事情。了解实时数据流、尝试使用它并报告回信息是有帮助的!
-
如果您喜欢编写小程序或文档测试,在实时数据流中有很多地方示例相对较少,或者实际上并没有测试演示的功能。这些通常很容易上手、充实并推进,而不需要大量的前期承诺。这也许也是我们中的一个详细解释某事的绝佳方式,如果您正在寻找这个的话。
-
如果您喜欢在实时数据流中亲自动手,问题跟踪器有各种问题,涉及堆栈的不同层次。例如
-
为了直接满足Rust的所有权纪律,Timely目前比必须的多做数据复制。其中一些复制可以通过更仔细的资源管理来消除(例如,使用类似于bytes crate的共享区域)。这里不是所有的事情都那么明显,因此这里还有一点设计工作的机会。
-
我们最近实施了一系列日志更改,但仍有一些想要的功能尚未实现。如果您对通过探索记录它所做事情的底层基础设施来了解实时数据流的工作方式感兴趣,这可能是一个不错的选择!它还有一个额外的好处,即日志本身就是实时流,因此您甚至可以在实时上进行一些日志处理。哇...
-
关于将Rust的所有权惯例集成到实时数据流中有一个开放的问题。目前,实时流是可克隆的对象,当流被重用时,项目将被克隆。我们可以使这更加明确,并要求调用一个
.cloned()
方法来获取所有权的对象,就像迭代器需要它一样。同时,使用对流的引用而不是获取所有权应该给您机会查看经过的记录,而不需要所有权(也不需要克隆,就像目前所做的那样)。这通常对需要序列化数据且无法充分利用所有权的交换通道来说已经足够了。 -
在调度及时数据流算子方面有很多有趣的研究工作。当我们有机会调度许多算子时,我们可能会稍作思考,意识到其中一些算子没有工作要做,可以跳过。更好的是,我们可以维护一个待执行算子的列表,对于没有工作的算子则不采取任何操作。
-
还有一些更大的工作主题,它们的解决方案并不明显,每个都有解决各种性能问题的潜力。
输出速率控制
目前,unary
和 binary
算子的实现允许它们的闭包发送无界数量的输出。这可能导致不希望的资源耗尽,并且在需要分配大量新内存来缓冲数据时,如果运行时没有机会消化这些数据,通常会表现较差的性能。通常情况下,当产生大量数据时,如果有机会,它们最终会被减少。
在当前接口中,没有太多可以做的事情。一个可能的变化是让 input
和 notificator
对象分别从一个输入消息或时间戳请求一个闭包到输出迭代器。这给系统提供了以他们认为合适的方式播放迭代器的机会。由于许多算子产生基于独立键的数据并行输出,构建这样的迭代器可能并不那么负担重。
缓冲区管理
及时通信层目前丢弃它通过交换通道移动的大多数缓冲区,因为它没有合理的方法来控制输出速率,也没有合理的方法来确定应该缓存多少个缓冲区。如果这两个问题中的任何一个得到解决,就有意义地回收缓冲区以避免随机分配,特别是对于小批量来说。这些更改在 dataflow-join
三角计算工作量中大约有 10%-20% 的性能影响。
对不可序列化类型的支持
通信层基于一个类型 Content<T>
,它可以由类型化或二进制数据支持。因此,它要求它所支持的类型必须是可序列化的,因为它需要处理数据是二进制的情况的逻辑,即使这种情况没有使用。看起来 Stream
类型应该是可扩展的,以便在使用数据存储类型时是参数化的,这样我们就可以表达一些类型不可序列化的事实,并且这是可以接受的。
注意:差分数据流在 operators/arrange.rs
中演示了如何在用户级别做这件事,虽然有点草率(使用一个包装器来隐瞒它传输的类型属性)。
这将允许我们安全地传递 Rc
类型,只要我们使用 Pipeline
并行化合同。
粗粒度与细粒度时间戳
进度跟踪机制在每个时间戳上都有一些不平凡的开销。这意味着使用非常细粒度的时间戳,例如处理记录的纳秒,可能会淹没进度跟踪逻辑。相比之下,日志基础设施将纳秒降级为数据,成为日志负载的一部分,并用批次中最小的时间戳来近似事件批次。这在进度跟踪方面不够精确,但性能更好。可能可以将这一点推广,让用户编写程序时不必考虑时间戳的粒度,系统在可能的情况下自动细化(本质上是对时间进行 boxcar-ing)。
注意:差分数据流在 collection.rs
中展示了如何在用户级别进行此操作。由于缺乏系统支持,用户最终需要指定粒度,这虽然不是什么大问题,但可能可以通过改进。此外,让用户控制粒度可能使他们能够更好地控制延迟/吞吐量之间的权衡,这对系统来说可能是有益的。