7个版本 (4个重大更新)

0.12.0 2021年3月10日
0.11.1 2019年11月18日
0.10.0 2019年7月9日
0.9.0 2019年3月31日
0.7.0 2018年9月17日

#209 in 调试

Download history 321/week @ 2024-03-24 373/week @ 2024-03-31 254/week @ 2024-04-07 346/week @ 2024-04-14 351/week @ 2024-04-21 282/week @ 2024-04-28 288/week @ 2024-05-05 293/week @ 2024-05-12 331/week @ 2024-05-19 347/week @ 2024-05-26 346/week @ 2024-06-02 312/week @ 2024-06-09 301/week @ 2024-06-16 215/week @ 2024-06-23 131/week @ 2024-06-30 168/week @ 2024-07-07

846 每月下载量
13 个工具包中使用了 (2 直接使用)

MIT 许可证

10KB
135

Timely Dataflow

Timely dataflow 是一个低延迟循环数据流计算模型,在论文 Naiad: a timely dataflow system 中引入。本项目是对 timely dataflow 在 Rust 中的扩展和更模块化的实现。

该项目类似于一个分布式数据并行计算引擎,可以将同一程序从您笔记本电脑上的单个线程扩展到跨计算机集群的分布式执行。主要目标是表达能力和高性能。如果您尚未使用 timely dataflow,它可能比您当前使用的任何工具都更强大、更快。

请务必阅读 timely dataflow 的文档。它仍在进行中,但大多数都在改进。在 mdbook 格式中还有更多 长篇文章,其中包含针对当前构建进行测试的示例。还有一系列博客文章(第1部分第2部分第3部分)以不同的方式介绍 timely dataflow,但请注意,那里的示例可能需要调整才能与当前代码兼容。

示例

要使用 timely dataflow,请将以下内容添加到您项目 Cargo.toml 文件的依赖关系部分

[dependencies]
timely="*"

这将从 timely 工具包 中引入 crates.io,这应该允许您开始编写类似以下这样的 timely dataflow 程序(也见 timely/examples/simple.rs

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

您可以从《timely-dataflow》仓库的根目录运行此示例,只需输入以下命令:

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

这是一个非常简单的示例(名称中就有体现),它仅仅建议您如何编写数据流程序。

做更多的事情

对于更复杂的示例,可以考虑非常相似但更明确的examples/hello.rs,它分别创建和驱动数据流。

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

此示例做了更多的事情,以展示timely可以为您做更多的事情。

我们首先构建一个数据流图,创建一个输入流(使用input_from),然后将其输出exchange到驱动记录之间,使用数据本身来指示哪个工作进程路由到。我们inspect数据并打印工作进程索引以指示哪个工作进程接收了哪些数据,然后probe结果,以便每个工作进程都可以看到给定一轮数据已经全部处理完毕。

然后通过反复引入数据轮次来驱动计算,其中round本身用作数据。在每一轮中,每个工作进程引入相同的数据,然后反复执行数据流步骤,直到probe显示所有工作进程已处理完该时代的所有工作,此时计算继续进行。

对于两个工作进程,输出如下:

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

请注意,尽管工作进程零引入了数据(0..10),但每个元素都按照预期路由到特定的工作进程。

执行

默认情况下,上述hello.rs程序将使用单个工作线程。要在一个进程中使用多个线程,请使用-w--workers选项后跟您希望使用的线程数。(注意:simple.rs程序始终使用一个工作线程;它使用timely::example,该程序忽略用户提供的输入)。

要使用多个进程,您需要使用-h--hostfile选项来指定一个文本文件,该文件的行是hostname:port条目,对应于您计划启动进程的位置。您需要使用-n--processes参数来指示您将启动多少个进程(主机文件的名称前缀),并且每个进程都必须使用-p--process参数来指示它们在这个数字中的索引。

换句话说,您希望主机文件看起来像这样:

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

然后按照以下方式启动进程:

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

每个进程中的工作进程数应相同。

生态系统

Timely dataflow旨在支持多个抽象级别,从最低级别的手动数据流组装到高级别的“声明性”抽象。

目前有几个选项可以编写timely dataflow程序。理想情况下,随着时间的推移,这个集合将扩大,因为感兴趣的人将编写他们自己的层(或建立在他人的基础上)。

  • Timely数据流:Timely数据流包括多个原始运算符,包括像mapfilterconcat这样的标准运算符。它还包括用于循环进入和退出的更奇特的运算符(如enterleave),以及可以通过闭包提供的泛型运算符(如unarybinary)。

  • 微分数据流:基于Timely数据流构建的高级语言,微分数据流包括groupjoiniterate等运算符。它的实现是完全增量化的,细节相当酷(如果有点神秘)。

还有一些基于Timely数据流的应用程序,包括流式最坏情况最优连接实现PageRank实现,这些都应该为编写Timely数据流程序提供有价值的示例。

贡献

如果您对使用或帮助Timely数据流感兴趣,太好了!

有一些工作类别对我们有帮助,也可能对您感兴趣。有几个广泛的类别,然后是一个不断变化的各种复杂性的问题堆。

  • 如果您想使用Timely数据流编写程序,这对我们来说非常有趣。理想情况下,Timely数据流旨在是一种对非平凡类数据流计算的人体工程学方法。随着人们的使用并反馈他们的经验,我们了解他们发现的错误类别、人体工程学痛点以及其他我们事先甚至都没有想象到的事情。了解Timely数据流、尝试使用它并反馈是有帮助的!

  • 如果您喜欢编写小型的示例程序或文档测试,Timely数据流中有很多地方示例相对较少,或者实际上并没有测试演示的功能。这些通常很容易上手,充实,并推动,而无需大量的前期承诺。这也许也是我们中的一些人向您详细解释某事的绝佳方式,如果您需要的话。

  • 如果您喜欢在Timely数据流中“大显身手”,问题跟踪器有各种问题,涉及堆栈的不同层次。例如

    • Timely目前比必须的复制更多数据,出于直接满足Rust的所有权纪律的目的。其中一些复制可以通过在资源管理中更加小心地处理(例如,使用与bytes crate类似的方式共享一个Vec<u8>的区域)来省略。这里并不是所有事情都那么明显,因此这里也有进行一些设计工作的机会。

    • 我们最近实施了一系列日志更改,但仍有一些期望拥有的特性尚未实现。如果您对通过探索记录它所做事情的基础设施来探索Timely的工作方式感兴趣,这可能是一个很好的匹配!它还有额外的优势,即日志本身就是Timely流,您甚至可以在Timely上做一些日志处理。哇...

    • 在将Rust的所有权习惯用法集成到timely数据流中存在一个开放问题。这里。目前,timely流是可克隆对象,当流被重复使用时,项目将被克隆。我们可以使其更加明确,并要求调用一个.cloned()方法来获取所有权的对象,就像迭代器需要它一样。同时,在不获取所有权的情况下使用流引用应让您有机会查看经过的记录(而不需要克隆,就像目前所做的那样)。这通常对需要序列化数据且不能充分利用所有权的交换通道来说已经足够了。

    • 在调度timely数据流算子方面有许多有趣的工作,当我们有机会调度许多算子时,我们可能会稍作思考,意识到其中一些没有任何工作要做,可以跳过。更好的是,我们可以维护一个有工作要做算子的列表,对于没有工作要做的不做任何事情。

还有一些更大的工作主题,其解决方案不是一目了然的,并且每个都可能有整理各种性能问题的潜力。

输出速率控制

目前,unarybinary算子的实现允许它们的闭包发送无界的输出量。这可能导致不受欢迎的资源耗尽,并且通常会导致性能不佳,如果运行时需要分配大量新内存来缓冲发送的大量数据而没有机会消化它。通常情况下,当产生大量数据时,它们最终会在有机会时减少。

使用当前的接口,没有太多可以做的。一个可能的变化是让inputnotificator对象分别请求从输入消息或时间戳到输出迭代器的闭包。这给了系统机会以他们认为合适的方式来播放迭代器。由于许多算子产生基于独立键的数据并行输出,构建这样的迭代器可能不会造成太大的负担。

缓冲区管理

timely通信层目前在交换通道中丢弃它移动的大多数缓冲区,因为它没有合理的方法来控制输出速率,也没有合理的方法来确定应该缓存多少缓冲区。如果这两个问题中的任何一个得到解决,那么回收缓冲区以避免随机分配,特别是对于小批量来说是有意义的。这些更改对dataflow-join三角形计算工作负载有大约10%-20%的性能影响。

对非序列化类型的支持

通信层基于一个类型Content<T>,它可以是类型化或二进制数据。因此,它要求它支持的类型是可序列化的,因为它需要逻辑来处理数据是二进制的情况,即使这种情况下没有使用。看起来Stream类型应该是可扩展的,以便在数据使用的存储类型上是参数化的,这样我们就可以表达某些类型不可序列化的这一事实,并且这是可以的。

注意:差分数据流在operators/arrange.rs中演示了如何在用户级别这样做,尽管可能有点草率(使用一个撒谎的包装器来描述它所携带的类型属性)。

这将允许我们安全地传递Rc类型,只要我们使用Pipeline并行化合同。

粗粒度与细粒度时间戳

进度跟踪机械涉及每个时间戳的非平凡开销。这意味着使用非常细粒度的时间戳,例如处理记录的纳秒级时间戳,可能会淹没进度跟踪逻辑。相比之下,日志基础设施将纳秒降级为数据,成为日志有效负载的一部分,并用批次中最小的时间戳来近似事件批次。这在进度跟踪方面不太准确,但性能更好。可能可以将此推广,使用户能够编写无需考虑时间戳粒度的程序,系统在可能的情况下自动进行粗化(本质上是对时间进行箱形编码)。

注意:差分数据流在 collection.rs 中展示了在用户层面如何实现这一点。缺乏系统支持意味着用户最终需要指示粒度,这虽然不是问题,但可能可以改进。也可能是因为让用户控制粒度,他们可以更好地控制延迟/吞吐量之间的权衡,这可能对系统来说是个好事。

无运行时依赖