#timely-dataflow #timely #dataflow

timely-communication-master

及时数据流通信层

1 个不稳定版本

0.13.0-dev.12023 年 12 月 15 日

233并发 中排名 #233

Download history 425/week @ 2024-03-13 432/week @ 2024-03-20 467/week @ 2024-03-27 610/week @ 2024-04-03 614/week @ 2024-04-10 562/week @ 2024-04-17 386/week @ 2024-04-24 425/week @ 2024-05-01 413/week @ 2024-05-08 483/week @ 2024-05-15 576/week @ 2024-05-22 632/week @ 2024-05-29 500/week @ 2024-06-05 540/week @ 2024-06-12 539/week @ 2024-06-19 527/week @ 2024-06-26

2,304 每月下载量
3 个 Crates 中使用(通过 timely-master

MIT 许可证

135KB
2K SLoC

及时数据流

及时数据流是一种低延迟的循环数据流计算模型,由论文 Naiad: a timely dataflow system 提出。本项目是及时数据流在 Rust 中的扩展和更模块化的实现。

本项目类似于一个分布式数据并行计算引擎,可以将相同的程序从您笔记本电脑上的单个线程扩展到跨计算机集群的分布式执行。主要目标是表达能力和高性能。如果您还没有使用及时数据流,那么它可能比您目前使用的任何东西都更加表达性强和运行速度快。

请务必阅读 及时数据流的文档。它仍在进行中,但大部分都在改进。在 mdbook 格式的 长文本 中有更多内容,包括针对当前构建测试的示例。还有一个介绍及时数据流的博客文章系列(第 1 部分第 2 部分第 3 部分),尽管警告,那里的示例可能需要调整才能与当前代码兼容。

示例

要使用及时数据流,请将以下内容添加到项目 Cargo.toml 文件中的依赖项部分

[dependencies]
timely="*"

这将从 crates.io 引入 timely crate,这应该允许您开始编写类似这样的及时数据流程序(也在 timely/examples/simple.rs 中提供)

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

您可以通过在 timely-dataflow 仓库的根目录中键入以下内容来运行此示例

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

这是一个非常简单的示例(这已经在名称中说明了),它只是表明您可以如何编写数据流程序。

做更多的事情

对于更复杂的示例,可以考虑非常相似但更明确的 examples/hello.rs,它分别创建和驱动数据流

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

此示例做了很多,以展示及时可以为您做些什么。

我们首先构建一个数据流图,创建一个输入流(使用 input_from),然后将输出交换(使用 exchange)来驱动记录在工人之间(使用数据本身来指示要路由到哪个工人)。我们 inspect 数据并打印工人索引,以指示哪个工人接收了哪些数据,然后 probe 结果,以便每个工人都可以看到给定一轮数据是否已经处理完毕。

然后,通过反复引入数据轮次来驱动计算,其中 round 本身用作数据。在每一轮中,每个工人引入相同的数据,然后反复执行数据流步骤,直到 probe 显示所有工人已经处理了该时代的所有工作,此时计算继续。

对于两个工人,输出看起来像

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

请注意,尽管工人类 0 引入了数据 (0..10),但每个元素都路由到特定的工人,正如我们预期的。

执行

上面的 hello.rs 程序默认使用单个工人线程。要在一个进程中使用多个线程,请使用 -w--workers 选项,后跟您希望使用的线程数。(注意:simple.rs 程序始终使用一个工人线程;它使用 timely::example,该示例忽略用户提供的输入)。

要使用多个进程,您需要使用 -h--hostfile 选项来指定一个文本文件,其行是 hostname:port 条目,对应于您计划启动进程的位置。您需要使用 -n--processes 参数来指示您将启动多少进程(主机文件的子串),并且每个进程必须使用 -p--process 参数来指示它们在这数量中的索引。

换句话说,您希望主机文件看起来像这样,

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

然后像这样启动进程

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

每个进程的工人数量应该相同。

生态系统

Timely 数据流旨在支持多个抽象级别,从最低级的手动数据流组装到高级的“声明式”抽象。

目前有一些用于编写 timely 数据流程序的选项。理想情况下,随着感兴趣的人编写他们自己的层(或基于他人的层),这个集合会随着时间的推移而扩展。

  • Timely 数据流:Timely 数据流包括几个原始操作员,包括标准操作员如 mapfilterconcat。它还包括用于进入和退出循环(enterleave)等任务的更奇特的操作员,以及可以使用闭包提供实现的通用操作员(unarybinary)。

  • 微分数据流:建立在 timely 数据流之上的高级语言,微分数据流包括 groupjoiniterate 等操作员。其实现完全增量化,而且细节相当酷(如果神秘的话)。

还有一些基于 timely 数据流的应用程序,包括 一个流式最坏情况最优连接实现 和一个 PageRank 实现,这两者都应提供编写 timely 数据流程序的有用示例。

贡献

如果您对参与或协助及时数据流感兴趣,那太棒了!

有一些对我们有帮助的工作类别,也可能对您感兴趣。有几个广泛的类别,然后是一堆不断变化的、各种复杂性的问题。

  • 如果您想使用及时数据流编写程序,这对我们来说非常有趣。理想情况下,及时数据流旨在成为一种非平凡类数据流计算的人体工程学方法。随着人们使用并及时反馈他们的经验,我们了解了他们发现的错误类别、人体工程学的痛点,以及其他我们事先甚至没有想象到的事情。了解及时数据流、尝试使用它并及时反馈是有帮助的!

  • 如果您喜欢编写小的示例程序或文档测试,及时数据流中有很多地方示例相对稀疏,或者实际上没有测试展示的功能。这些通常很容易上手,完善,并推动前进,而不需要大量的前期义务。这可能也是让我们中的某位为您详细解释某事的好方法。

  • 如果您喜欢在及时数据流中亲自动手,问题跟踪器有各种问题,涉及堆栈的不同层次。例如

    • 及时当前做的数据复制比必要的多,直接满足Rust的所有权纪律。其中一些复制可以通过更仔细的资源管理来省略(例如,使用与bytes crate类似的方式共享一个Vec<u8>的区域)。这里并不是所有的事情都一目了然,所以这里也有一些设计工作的机会。

    • 我们最近实施了一系列日志更改,但仍有一些想要的功能列表尚未实现。如果您对通过探索记录其行为的底层基础设施来了解及时的工作方式感兴趣,这可能是一个不错的选择!它还有额外的优势,即日志本身就是及时流,您甚至可以在及时上执行一些日志处理。哇...

    • 有一个关于将Rust的所有权惯例集成到及时数据流中的开放问题。目前,及时流是可克隆对象的,当一个流被重新使用时,项目将被克隆。我们可以使这更明确,并要求调用一个.cloned()方法来获取所有权的对象,就像迭代器需要它一样。同时,使用不获取所有权的流引用应该让您有机会查看经过的记录,而无需获取所有权(也不需要克隆,如当前所做的那样)。这对于可能需要序列化数据但不能充分利用所有权的交换通道来说已经足够了。

    • 在调度及时数据流算子方面也有一些有趣的工作,当有机会调度许多算子时,我们可能会思考一下,并意识到其中几个没有工作要做,可以跳过。更好的是,我们可以维护一个有工作要做的算子列表,并对那些没有工作的算子什么都不做。

还有一些更大的工作主题,它们的解决方案并不明显,并且每个都可能有解决各种性能问题的潜力

输出速率控制

目前,unarybinary 运算符的实现允许它们的闭包发送无界数量的输出。这可能导致资源耗尽,并且如果运行时需要分配大量新内存来缓冲发送的大量数据而没有机会处理它,通常会导致性能下降。通常情况下,当产生大量数据时,如果有机会,最终会减小。

使用当前的接口,没有太多可做的事情。一个可能的改变是让 inputnotificator 对象分别从一个输入消息或时间戳请求闭包到输出迭代器。这给系统一个机会以他们认为合适的速度播放迭代器。由于许多运算符产生基于独立键的数据并行输出,构建这样的迭代器可能并不会带来太多负担。

缓冲区管理

及时通信层目前丢弃了它通过交换通道移动的大多数缓冲区,因为它没有一种合理的方式来控制输出速率,也没有一种合理的方式来确定应该缓存多少个缓冲区。如果这两个问题中的任何一个得到解决,那么回收缓冲区以避免随机分配就很有意义,特别是对于小批量。这些更改对 dataflow-join 三角计算工作负载的影响大约是 10%-20%。

非可序列化类型的支持

通信层基于一个类型 Content<T>,它可以由类型化数据或二进制数据支持。因此,它要求它支持的类型是可序列化的,因为它需要为数据是二进制的情况编写逻辑,即使这个情况没有使用。看起来 Stream 类型应该可以扩展到在数据使用的存储类型上参数化,这样我们就可以表达一些类型不可序列化,这是可以接受的。

注意:Differential dataflow 在其 operators/arrange.rs 中展示了如何在用户级别做这件事,尽管有些草率(使用一个包装器来误导它所传输类型的属性)。

只要我们使用 Pipeline 并行化合同,这就可以让我们安全地传递 Rc 类型。

粗粒度与细粒度时间戳

进度跟踪机制在每个时间戳上都有一些非平凡的开销。这意味着使用非常细粒度的时间戳,例如处理记录的纳秒,可能会淹没进度跟踪逻辑。相比之下,日志基础设施将纳秒降低到数据,成为已记录的有效负载的一部分,并用批中最小的时间戳来近似事件批次。这在进度跟踪方面不够准确,但性能更好。可能可以将这一点推广,使用户能够编写无需考虑时间戳粒度的程序,并且系统在可能的情况下自动粗化(本质上就是 boxcar-ing 时间)。

注意:Differential dataflow 在其 collection.rs 中展示了如何在用户级别做这件事。缺乏系统支持意味着用户最终会指示粒度,这并不糟糕,但可能会得到改进。这也可能意味着让用户控制粒度,让他们对延迟/吞吐量权衡有更多控制,这对系统来说可能是一件好事。

依赖项

~1.2–2MB
~35K SLoC