1 个不稳定版本

0.13.0-dev.12023 年 12 月 15 日

2171Rust 模式 中排名

Download history 423/week @ 2024-03-13 431/week @ 2024-03-20 461/week @ 2024-03-27 604/week @ 2024-04-03 730/week @ 2024-04-10 566/week @ 2024-04-17 384/week @ 2024-04-24 424/week @ 2024-05-01 411/week @ 2024-05-08 482/week @ 2024-05-15 576/week @ 2024-05-22 634/week @ 2024-05-29 500/week @ 2024-06-05 543/week @ 2024-06-12 537/week @ 2024-06-19 528/week @ 2024-06-26

每月下载 2,306
3 个包中使用(通过 timely-master

MIT 许可证

18KB
357

Timely Dataflow

Timely Dataflow 是一种低延迟循环数据流计算模型,由论文 Naiad: a timely dataflow system 提出。本项目是 Rust 中 Timely Dataflow 的扩展和更模块化的实现。

本项目类似于一个分布式数据并行计算引擎,可以将相同的程序从笔记本电脑上的单个线程扩展到集群的分布式执行。主要目标是表达能力和高性能。在没有使用 Timely Dataflow 的情况下,它可能比您目前使用的任何东西都更加表达丰富且速度更快。

请务必阅读 Timely Dataflow 文档。它仍在进行中,但大部分都在改进。还有更多 长篇文本mdbook 格式提供,其中包含针对当前构建测试的示例。还有一系列博客文章(第 1 部分第 2 部分第 3 部分),以不同的方式介绍 Timely Dataflow,但请注意,那里的示例可能需要调整才能与当前代码一起构建。

示例

要使用 Timely Dataflow,请在项目的 Cargo.toml 文件的依赖项部分添加以下内容

[dependencies]
timely="*"

这将从 crates.io 添加 timely,这应该允许您开始编写像这样(也在 timely/examples/simple.rs 中可用)的 Timely Dataflow 程序。

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

您可以通过在 timely-dataflow 仓库的根目录中键入以下内容来运行此示例

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

这是一个非常简单的示例(它在名称中),仅表明您如何编写数据流程序。

做更多事情

考虑一个更复杂的例子,它非常相似(但更加明确),例如examples/hello.rs,该例子分别创建和驱动数据流。

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

此示例做了很多工作,以展示timely能为您做更多的事情。

我们首先构建一个数据流图,创建一个输入流(使用input_from),我们将输出exchange到工人之间以驱动记录(使用数据本身来指示要路由到哪个工人)。我们inspect数据并打印工人索引以指示哪个工人接收了哪些数据,然后probe结果,以便每个工人可以看到何时已处理完给定轮次的所有数据。

然后我们通过反复引入数据轮次来驱动计算,其中round本身用作数据。在每一轮中,每个工人引入相同的数据,然后重复执行数据流步骤,直到probe显示所有工人已处理完该纪元的所有工作,此时计算继续。

对于两个工人,输出如下

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

请注意,尽管工号零引入了数据(0..10),但每个元素都路由到特定的工人,正如我们所期望的那样。

执行

上面的hello.rs程序默认使用单个工作线程。要在一个进程中使用多个线程,请使用-w--workers选项后跟您希望使用的线程数。(注意:simple.rs程序始终使用一个工作线程;它使用timely::example,该程序忽略用户提供的输入)。

要使用多个进程,您需要使用-h--hostfile选项来指定一个文本文件,该文件的行是hostname:port条目,对应于您计划启动进程的位置。您需要使用-n--processes参数来指示您将启动多少个进程(主机文件的子串),并且每个进程必须使用-p--process参数来指示它们在这个数字中的索引。

换句话说,您需要一个类似以下内容的宿主文件

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

然后像这样启动进程

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

每个进程的工人数量应该相同。

生态系统

Timely数据流旨在支持多级抽象,从最低级别的手动数据流组装到更高层次的“声明式”抽象。

目前有几个选项可以编写Timely数据流程序。理想情况下,这个集合将随着时间的推移而扩展,因为有兴趣的人编写他们自己的层(或基于他人的层)。

  • Timely数据流:Timely数据流包括几个原始运算符,包括标准运算符如mapfilterconcat。它还包括用于进入和退出循环(enterleave)等任务的更奇特运算符,以及可以通过闭包提供的实现(unarybinary)的泛型运算符。

  • 微分数据流:建立在及时数据流之上的高级语言,微分数据流包括 groupjoiniterate 等算子。它的实现是完全增量化的,细节相当酷(如果有点神秘)。

还有几个应用程序是基于及时数据流构建的,包括 流式最坏情况最优连接实现 和一个 PageRank 实现,这两个都应该提供编写及时数据流程序的 helpful 示例。

贡献

如果你对与及时数据流合作或帮助工作感兴趣,太好了!

有一些工作类别对我们有帮助,也可能对你感兴趣。有几个广泛的类别,然后是一个不断变化的、各种复杂性的问题堆。

  • 如果你想用及时数据流编写程序,这对我们来说非常有趣。理想情况下,及时数据流旨在以一种符合人体工程学的非平凡数据流计算方法。随着人们使用它并报告他们的体验,我们了解了他们发现的类别、人体工程学的痛点以及其他我们事先甚至没有想象到的事情。了解及时数据流、尝试使用它并报告回来是有帮助的!

  • 如果你喜欢编写小的示例程序或文档测试,及时数据流中有很多地方示例相对稀疏,或者实际上并没有测试演示的功能。这些通常很容易上手、完善并推进,而无需大量的前期义务。这可能也是一个很好的方法,让我们其中之一详细地解释一些内容,如果你需要的话。

  • 如果你喜欢在及时数据流中亲自动手,问题跟踪器 有各种问题,涉及堆栈的不同级别。例如

    • 及时目前 比必须的多复制数据,这是为了直接满足 Rust 的所有权纪律。其中一些复制可以通过在资源管理中更加注意(例如,使用一个 Vec<u8> 的共享区域,就像 bytes crate 所做的那样)来省略。这里不是所有的事情都那么明显,因此这里也有设计工作的机会。

    • 我们最近实施了一系列日志更改,但仍有一些 期望的功能 还没有实现。如果你对通过探索记录其行为的底层基础设施来了解及时的工作方式感兴趣,这可能是一个不错的选择!它有一个额外的优势,即日志本身就是及时流,因此你甚至可以在及时上做一些日志处理。哇...

    • 关于 将 Rust 所有权习惯融入到及时数据流中 的一个开放问题。目前,及时流是可复制的对象,当一个流被重新使用时,项将被复制。我们可以使其更明确,并要求调用 cloned() 方法以获得所有权的对象,就像迭代器需要它一样。同时,不获取所有权而使用流引用应该给你一个机会查看经过的记录(而不需要复制,就像目前所做的那样)。对于可能需要序列化数据但不能充分利用所有权的交换通道来说,这通常已经足够了。

    • 在调度及时数据流算子方面,有很多有趣的研究工作。当我们有机会调度许多算子时,我们可能会稍微思考一下,意识到其中一些算子没有任何工作要做,可以跳过。更好的是,我们可以维护一个有工作要做算子的列表,对于没有工作要做的算子,则不进行任何操作。

也有一些更大的研究主题,其解决方案不是立即显而易见的,每个主题都可能有潜力解决各种性能问题。

输出速率控制

目前,unarybinary 算子的实现允许它们的闭包发送无界数量的输出。这可能会导致不希望的资源耗尽,如果运行时需要分配大量新内存来缓冲大量发送的数据而没有机会消化它,通常会导致性能下降。通常情况下,当产生大量数据时,如果有机会,它们最终会被减少。

在当前接口中,没有什么可以做的。一种可能的变化是让 inputnotificator 对象分别从输入消息或时间戳请求一个闭包到一个输出迭代器。这给了系统以适当的速度播放迭代器的机会。由于许多算子产生基于独立键的数据并行输出,构造这样的迭代器可能并不那么困难。

缓冲区管理

及时通信层目前丢弃它通过交换通道移动的大多数缓冲区,因为它没有合理的方式来控制输出速率,也没有合理的方式来确定应该缓存多少缓冲区。如果这两个问题中的任何一个得到解决,就有意义地回收缓冲区以避免随机分配,特别是对于小批量。这些更改在 dataflow-join 三角计算工作量中大约有 10%-20% 的性能影响。

支持不可序列化类型

通信层基于一个类型 Content<T>,它可以由类型化或二进制数据支持。因此,它要求它支持的类型必须是可序列化的,因为它需要处理数据是二进制的情况的逻辑,即使这种情况没有被使用。看起来 Stream 类型应该可扩展,以便对数据使用的存储类型进行参数化,这样我们就可以表达某些类型不可序列化的事实,并且这是可以接受的。

注意:差分数据流在它的 operators/arrange.rs 中展示了如何在用户级别这样做,虽然有些草率(使用一个包装器来谎称它所传输类型的属性)。

这将允许我们安全地传递 Rc 类型,只要我们使用 Pipeline 并行化合同。

粗粒度与细粒度时间戳

进度跟踪机制在每个时间戳上涉及一些不复杂的开销。这意味着使用非常细粒度的时间戳,例如处理记录的纳秒,可能会淹没进度跟踪逻辑。相比之下,日志基础设施将纳秒降级为数据,成为日志有效负载的一部分,并用批处理事件的最小时间戳来近似批处理事件。这在进度跟踪方面不那么准确,但性能更好。可能可以将这一点泛化,以便用户可以编写程序而无需考虑时间戳的粒度,系统在可能的情况下自动细化(本质上类似于箱式滤波器时间)。

注意:差分数据流在它的 collection.rs 中演示了如何在用户级别完成这项操作。缺乏系统支持意味着用户最终需要指定粒度,这并不糟糕,但可能还有改进的空间。这也可能意味着让用户控制粒度,他们可以拥有更多对延迟/吞吐量权衡的控制,这对系统来说可能是一件好事。

依赖关系

~150–400KB