23 个不稳定版本

0.12.0 2021年3月10日
0.11.1 2019年11月18日
0.10.0 2019年7月9日
0.9.0 2019年3月31日
0.0.8 2015年7月30日

#421算法

Download history 363/week @ 2024-04-20 280/week @ 2024-04-27 282/week @ 2024-05-04 298/week @ 2024-05-11 316/week @ 2024-05-18 320/week @ 2024-05-25 341/week @ 2024-06-01 305/week @ 2024-06-08 294/week @ 2024-06-15 218/week @ 2024-06-22 143/week @ 2024-06-29 187/week @ 2024-07-06 255/week @ 2024-07-13 240/week @ 2024-07-20 260/week @ 2024-07-27 221/week @ 2024-08-03

1,000 每月下载量
12 crates 中使用

MIT 许可证

585KB
8K SLoC

Timely Dataflow

Timely dataflow 是一个低延迟的循环数据流计算模型,在论文 Naiad: a timely dataflow system 中提出。该项目是 Rust 中 timely dataflow 的扩展和更模块化的实现。

该项目类似于一个分布式数据并行计算引擎,它将单个线程在您的笔记本电脑上的程序扩展到计算机集群的分布式执行。主要目标是表达能力和高性能。如果您尚未使用 timely dataflow,那么它可能比您当前使用的任何东西都更加严格和快速。

请务必阅读 timely dataflow 的文档。它是一个正在进行中的作品,但主要是在改进。在 mdbook 格式的 长文本 中有更多示例,这些示例已通过当前构建进行测试。还有一系列博客文章(第一部分第二部分第三部分),以不同的方式介绍 timely dataflow,但请注意,那里的示例可能需要调整才能与当前代码一起构建。

示例

要使用 timely dataflow,请将以下内容添加到项目 Cargo.toml 文件的依赖项部分

[dependencies]
timely="*"

这将从 crates.io 引入 timely crate,这应该允许您开始编写类似于以下(也可在 timely/examples/simple.rs)的 timely dataflow 程序

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

您可以通过在 timely-dataflow 仓库的根目录中键入以下内容来运行此示例

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

这是一个非常简单的示例(名字就是一切),它只是展示了您如何编写数据流程序。

做更多事情

对于更复杂的一个示例,考虑非常相似但更明确的 examples/hello.rs,它分别创建和驱动数据流

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

此示例做了很多工作,以展示 timely 可以为您做什么。

我们首先构建一个数据流图,创建一个输入流(使用 input_from),并将它的输出 exchange 以驱动记录在工作者之间(使用数据本身来指示要路由到哪个工作者)。我们 inspect 数据并打印工作者索引以指示哪个工作者接收了哪些数据,然后 probe 结果,以便每个工作者可以看到给定轮次的所有数据是否都已处理。

然后通过反复引入数据轮次来驱动计算,其中 round 本身被用作数据。在每一轮中,每个工作者引入相同的数据,然后反复进行数据流步骤,直到 probe 揭示所有工作者都已处理了该轮次的所有工作,此时计算继续进行。

对于两个工作者,输出看起来像

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

请注意,尽管工作者零引入了数据 (0..10),但每个元素都路由到特定的工作者,正如我们所期望的。

执行

上面的 hello.rs 程序默认使用单个工作者线程。要在一个进程中使用多个线程,请使用 -w--workers 选项后跟您想要使用的线程数。(注意:simple.rs 程序始终使用一个工作者线程;它使用 timely::example,该示例忽略了用户提供的输入)。

要使用多个进程,您需要使用 -h--hostfile 选项来指定一个文本文件,该文件的行是 hostname:port 条目,对应于您计划在其中生成进程的位置。您需要使用 -n--processes 参数来指示您将生成多少进程(主机文件的子序列),并且每个进程必须使用 -p--process 参数来指示它们在这数量中的索引。

换句话说,您想要一个如下所示的宿主机文件,

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

然后按照如下方式启动进程

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

每个进程的工作者数应该相同。

生态系统

Timely数据流旨在支持多个抽象级别,从最低级别的手动数据流组装到更高级别的“声明性”抽象。

目前有一些编写Timely数据流程序的选择。理想情况下,这个集合将随着时间的推移而扩展,因为有兴趣的人会编写他们自己的层(或基于他人的层)。

  • Timely数据流:Timely数据流包括几个原始操作符,包括标准操作符如 mapfilterconcat。它还包括更奇特的操作符,如进入和退出循环(enterleave),以及可以使用闭包提供实现的通用操作符(unarybinary)。

  • 差分数据流:在Timely数据流之上构建的高级语言,差分数据流包括 groupjoiniterate 等操作符。它的实现是完全增量化的,细节相当酷(如果有些神秘)。

还有一些基于及时数据流的程序,包括一个流式最坏情况最优连接实现PageRank实现,这两个示例都应该能帮助您编写及时数据流程序。

贡献

如果您对与及时数据流合作或提供帮助感兴趣,那真是太好了!

有一些工作类别对我们有帮助,也可能对您来说很有趣。有几个广泛的类别,然后是一个不断变化的、各种复杂性的问题堆。

  • 如果您想使用及时数据流编写程序,这对我们来说非常有趣。理想情况下,及时数据流旨在成为一种非平凡类数据流计算的便捷方法。随着人们使用它并反馈他们的经验,我们了解他们发现的bug类别、人体工程痛点,以及其他我们事先甚至没有想象到的事情。了解及时数据流、尝试使用它并反馈信息是有帮助的!

  • 如果您喜欢编写小示例程序或文档测试,及时数据流中的许多地方示例相对稀少,或者实际上并没有测试演示的功能。这些通常很容易上手、完善并推送,而无需大量的前期义务。如果这也是您想要的一件事,那么这可能是一个很好的方法,让我们中的某个人详细地向您解释某件事。

  • 如果您喜欢在及时数据流中“动手”,问题跟踪器有许多涉及堆栈不同级别的问题。例如

    • Timely目前比它必须的多复制数据,这是为了直接取悦Rust的所有权纪律。其中一些复制可以通过在资源管理中更加谨慎地进行删除(例如,使用一个Vec<u8>的共享区域,就像bytes crate所做的那样)。这里不是所有的事情都很明显,所以这里也有一定的设计空间。

    • 我们最近实施了一系列日志更改,但仍然有一个希望拥有的功能列表尚未实现。如果您对通过探索记录它所做的事情的基础设施来了解Timely是如何工作的感兴趣,这可能是一个很好的选择!它还有一个额外的好处,即日志本身就是及时流,您甚至可以在Timely上进行一些日志处理。哇...

    • 关于将Rust所有权习惯语集成到及时数据流中有一个开放的问题。目前,及时流是可克隆对象,当流被重用时,项目将被克隆。我们可以使这一点更加明确,并要求调用一个.cloned()方法来获取所有权的对象,就像迭代器需要的那样。同时,使用不获取所有权的流引用应该给您一个查看经过的记录的机会,而不需要获取所有权(也不需要克隆,就像目前所做的那样)。这对于可能需要序列化数据但不能充分利用所有权的交换通道来说通常是足够的。

    • 在调度及时数据流操作符方面,有许多有趣的工作。当有机会调度许多操作符时,我们可能会稍作思考,意识到其中一些操作符没有工作要做,可以跳过。更好的方法是维护一个有工作要做操作符的列表,对于没有工作的操作符,我们则不采取任何行动。

还有一些较大的工作主题,其解决方案并不明显,每个都有可能解决各种性能问题。

输出速率控制

目前,unarybinary操作符的实现允许它们的闭包发送无界数量的输出。这可能导致不欢迎的资源耗尽,以及如果运行时需要分配大量新内存来缓冲大量发送的数据而没有机会消化它,通常会导致性能不佳。通常情况下,当产生大量数据时,它们最终会有机会被减少。

在当前接口中,我们做不了太多。一个可能的变化是让inputnotificator对象分别从输入消息或时间戳请求一个闭包到输出迭代器。这给了系统以合适速度播放迭代器的机会。由于许多操作符产生基于独立键的数据并行输出,构建这样的迭代器可能并不那么负担重。

缓冲区管理

及时通信层目前丢弃通过交换通道移动的大多数缓冲区,因为它没有合理的方式来控制输出速率,也没有合理的方式来确定应该缓存多少缓冲区。如果这两个问题中的任何一个得到解决,那么回收缓冲区以避免随机分配将是有意义的,尤其是对于小批量。这些更改在dataflow-join三角计算工作负载中有大约10%-20%的性能影响。

对非序列化类型的支持

通信层基于一个类型Content<T>,它可以由类型化或二进制数据支持。因此,它要求它所支持的类型必须是可序列化的,因为它需要具有处理数据为二进制的情况的逻辑,即使这种情况没有使用。看起来Stream类型应该可以扩展为在数据使用的类型上参数化,这样我们就可以表达某些类型不可序列化的事实,并且这是可以接受的。

注意:差分数据流在operators/arrange.rs中展示了如何在用户级别做这件事,尽管有些简略(使用一个包装器,该包装器对它所传输的类型属性撒谎)。

这将使我们能够安全地传递Rc类型,只要我们使用Pipeline并行化合同。

粗粒度与细粒度时间戳

进度跟踪机制每个时间戳都会产生一些非平凡的开销。这意味着使用非常细粒度的时间戳,例如处理记录的纳秒,可能会淹没进度跟踪逻辑。相比之下,日志基础设施将纳秒降级为数据,作为日志有效负载的一部分,并使用批中最小的时间戳来近似事件批。这在进度跟踪方面不太准确,但性能更好。可能可以将这一点推广,让用户能够编写不需要考虑时间戳粒度的程序,而系统在可能的情况下会自动粗化(本质上是对时间进行boxcar-ing)。

注意:差分数据流在 collection.rs 中展示了在用户级别如何实现这一功能。由于系统支持不足,用户最终需要指定粒度,这并不糟糕,但可能可以得到改进。也可能是因为将粒度控制权留给用户,使他们能够更好地控制延迟/吞吐量之间的权衡,这对系统来说可能是个好事。

依赖项

~3.5MB
~66K SLoC