12 个版本 (破坏性更新)

使用旧的 Rust 2015

0.12.0 2021年3月10日
0.11.0 2019年11月18日
0.10.0 2019年7月9日
0.9.0 2019年3月31日
0.0.1 2015年5月29日

#333算法

Download history 276/week @ 2024-04-01 162/week @ 2024-04-08 170/week @ 2024-04-15 216/week @ 2024-04-22 157/week @ 2024-04-29 150/week @ 2024-05-06 159/week @ 2024-05-13 150/week @ 2024-05-20 142/week @ 2024-05-27 180/week @ 2024-06-03 164/week @ 2024-06-10 115/week @ 2024-06-17 98/week @ 2024-06-24 101/week @ 2024-07-01 68/week @ 2024-07-08 92/week @ 2024-07-15

每月 368 次下载
8 crates 中使用

MIT 许可证

4MB
8K SLoC

差分数据流

在 Rust 上实现的差分数据流,基于 timely dataflow

背景

差分数据流是一种数据并行编程框架,旨在高效处理大量数据,并快速响应对输入集合中任意变化的操作。您可以在 差分数据流 mdbook差分数据流文档 中了解更多信息。

差分数据流程序以数据集合的功能转换编写,使用熟悉的操作符,如 mapfilterjoinreduce。差分数据流还包括更奇特的操作符,如 iterate,该操作符将差分数据流片段反复应用于集合。程序被编译为 timely dataflow 计算过程。

例如,这是一个用于计算有向图的出度分布的差分数据流片段(对于每个度,具有该度数的节点的数量)

let out_degr_dist =
edges.map(|(src, _dst)| src)    // extract source
     .count()                   // count occurrences of source
     .map(|(_src, deg)| deg)    // extract degree
     .count();                  // count occurrences of degree

或者,这是一个计算从起始节点集合 roots 可达的节点集的片段

let reachable =
roots.iterate(|reach|
    edges.enter(&reach.scope())
         .semijoin(reach)
         .map(|(src, dst)| dst)
         .concat(reach)
         .distinct()
)

一旦编写,差分数据流将响应对其最初为空的输入集合的任意更改,并向其每个输出集合报告相应的更改。由于差分数据流仅在集合发生更改的地方进行操作,所以它能够快速响应。

在上面的示例中,我们可以向 edges 中添加和删除内容,动态地改变图,并立即得到结果如何变化的反馈:如果度分布发生变化,我们将看到变化,如果节点现在(或不再)可达,我们也会得知。

请务必查看 差分数据流文档,它将持续改进。

示例:计算图中的度数。

让我们来看看这个出度分布的计算,以便了解微分数据流实际是如何工作的。如果您想跟随操作,请参考这个仓库中的examples/hello.rs

图是一组节点对的集合 (Node, Node),一个标准的分析是确定每个Node在第一个位置出现的次数,即其“度”。具有每个度的节点数是图统计信息的有用指标。

为了确定出度分布,我们在一个新的时间数据流作用域中创建它,以描述我们的计算以及我们如何与之交互。

// create a degree counting differential dataflow
let (mut input, probe) = worker.dataflow(|scope| {

    // create edge input, count a few ways.
    let (input, edges) = scope.new_collection();

    let out_degr_distr =
    edges.map(|(src, _dst)| src)    // extract source
         .count()                   // count occurrences of source
         .map(|(_src, deg)| deg)    // extract degree
         .count();                  // count occurrences of degree

    // show us something about the collection, notice when done.
    let probe =
    out_degr_distr
        .inspect(|x| println!("observed: {:?}", x))
        .probe();

    (input, probe)
});

我们返回的inputprobe是数据流入数据流的方式,以及我们如何注意到某些计算已完成。这些都是时间数据流惯用用法,这里不会详细介绍(请参考时间数据流仓库)。

如果我们用一些随机图数据对这个计算进行馈送,比如十个节点中的五十条随机边,我们得到如下输出

Echidnatron% cargo run --release --example hello -- 10 50 1 inspect
    Finished release [optimized + debuginfo] target(s) in 0.05s
    Running `target/release/examples/hello 10 50 1 inspect`
observed: ((3, 1), 0, 1)
observed: ((4, 2), 0, 1)
observed: ((5, 4), 0, 1)
observed: ((6, 2), 0, 1)
observed: ((7, 1), 0, 1)
round 0 finished after 772.464µs (loading)

这显示了通过inspect运算符的记录,揭示了集合的内容:有五个不同的度数,从三到七。记录的形式为((degree, count), time, delta),其中time字段表示这是第一轮数据,而delta字段告诉我们每条记录都是新出现的。如果相应的记录正离开集合,则该数字为负数。

让我们通过删除一条边并添加一条新的随机边来更新输入

observed: ((2, 1), 1, 1)
observed: ((3, 1), 1, -1)
observed: ((7, 1), 1, -1)
observed: ((8, 1), 1, 1)
round 1 finished after 149.701µs

我们在这里看到了一些变化!那些度数为三和七的节点已被替换为度数为二和八的节点;看起来有一个节点失去了一条边并将其给了另一个节点!

再来一些变化怎么样?

round 2 finished after 127.444µs
round 3 finished after 100.628µs
round 4 finished after 130.609µs
observed: ((5, 3), 5, 1)
observed: ((5, 4), 5, -1)
observed: ((6, 2), 5, -1)
observed: ((6, 3), 5, 1)
observed: ((7, 1), 5, 1)
observed: ((8, 1), 5, -1)
round 5 finished after 161.82µs

这里发生了一些奇怪的事情。首先,第2、3和4轮都没有打印任何东西。真的吗?结果是我们所做的随机更改并没有影响任何度数计数,我们在节点之间移动了边,保持了度数。这种情况可能发生。

第二件奇怪的事情是在第5轮中,只有两个边的变化,输出却有了六个变化!结果是我们最多可以有八个变化。度数八被转换回七,五被转换回六。但是:从五到六的改变改变了每个的计数,每个变化需要两个记录差异。八和七更简洁,因为它们的计数只有一个,这意味着只是记录的到达和离开,而不是变化。

扩展规模

微分数据流的吸引之处在于它只在发生变化的地方执行工作,所以即使有大量数据,如果变化不大,它仍然可以非常快地执行。让我们将10个节点和50条边扩展到一百万倍

Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 1 inspect`
observed: ((1, 336908), 0, 1)
observed: ((2, 843854), 0, 1)
observed: ((3, 1404462), 0, 1)
observed: ((4, 1751921), 0, 1)
observed: ((5, 1757099), 0, 1)
observed: ((6, 1459805), 0, 1)
observed: ((7, 1042894), 0, 1)
observed: ((8, 653178), 0, 1)
observed: ((9, 363983), 0, 1)
observed: ((10, 181423), 0, 1)
observed: ((11, 82478), 0, 1)
observed: ((12, 34407), 0, 1)
observed: ((13, 13216), 0, 1)
observed: ((14, 4842), 0, 1)
observed: ((15, 1561), 0, 1)
observed: ((16, 483), 0, 1)
observed: ((17, 143), 0, 1)
observed: ((18, 38), 0, 1)
observed: ((19, 8), 0, 1)
observed: ((20, 3), 0, 1)
observed: ((22, 1), 0, 1)
round 0 finished after 15.470465014s (loading)

这里有更多不同的度数。我排序了它们,因为看未排序的数据太痛苦了。您通常会看到未排序的输出,因为它们只是集合中值的更改。

让我们再次执行一个单次更改。

observed: ((5, 1757098), 1, 1)
observed: ((5, 1757099), 1, -1)
observed: ((6, 1459805), 1, -1)
observed: ((6, 1459807), 1, 1)
observed: ((7, 1042893), 1, 1)
observed: ((7, 1042894), 1, -1)
round 1 finished after 228.451µs

尽管初始计算大约需要十五秒,但我们得到的更改大约在230微秒内;这比重新运行计算快了大约一百万倍。这真是太好了。实际上,它小到打印到屏幕上的时间有点昂贵,所以让我们停止这样做。

现在我们可以看着变化滚动过去,看看时间。

Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 1 no_inspect`
round 0 finished after 15.586969662s (loading)
round 1 finished after 1.070239ms
round 2 finished after 2.303187ms
round 3 finished after 208.45µs
round 4 finished after 163.224µs
round 5 finished after 118.792µs
...

很好。每次更新可能有几百微秒,这意味着可能每秒有上万次更新。对于我的笔记本电脑来说,这并不是一个糟糕的数字,但这还不是正确的答案。

按比例“延伸”吗?

差分数据流旨在除了延迟外还提供吞吐量。我们可以增加它同时处理的更新轮数,这可以增加其实际吞吐量。这不会改变计算输出,除非我们一次性看到更大的输出更改批次。

注意,上面的时间对于每次单个更新是几百微秒。如果我们一次性处理十个更新轮次,我们得到的时间看起来像这样

Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 10 no_inspect`
round 0 finished after 15.556475008s (loading)
round 10 finished after 421.219µs
round 20 finished after 1.56369ms
round 30 finished after 338.54µs
round 40 finished after 351.843µs
round 50 finished after 339.608µs
...

这很有吸引力,因为十轮更新并不比单次更新贵多少,并且我们完成前十个轮次的时间比逐一执行前十个更新要少得多。之后的每一轮都是额外的时间。

当我们增加批量处理时,性能会提高。这里我们一次性处理一百轮更新

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100 no_inspect`
round 0 finished after 15.528724145s (loading)
round 100 finished after 2.567577ms
round 200 finished after 1.861168ms
round 300 finished after 1.753794ms
round 400 finished after 1.528285ms
round 500 finished after 1.416605ms
...

我们仍在改进,并且随着批量大小的增加而继续改进。当我们一次处理10万个更新时,每个批次大约需要半秒钟。这交互性较低,但吞吐量更高。

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100000 no_inspect`
round 0 finished after 15.65053789s (loading)
round 100000 finished after 505.210924ms
round 200000 finished after 524.069497ms
round 300000 finished after 470.77752ms
round 400000 finished after 621.325393ms
round 500000 finished after 472.791742ms
...

平均下来大约是五个微秒;比单个更新的百微秒快得多!现在我想起来,每次更新实际上是有两个变化,不是吗。好样的,差分数据流!

扩展

差分数据流建立在timely数据流之上,这是一个分布式数据并行运行时。timely数据流可以扩展到多个独立的工人,增加系统的容量(以牺牲一些降低延迟的协调为代价)。

如果我们投入两个工人,我们的1000万个节点,5000万个边计算从15秒降低到仅仅超过8秒。

Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 1 no_inspect -w2`
round 0 finished after 8.065386177s (loading)
round 1 finished after 275.373µs
round 2 finished after 759.632µs
round 3 finished after 171.671µs
round 4 finished after 745.078µs
round 5 finished after 213.146µs
...

这是一个一般性的减少。你可能注意到随后的轮次时间反而增加了。结果是,在没有太多工作时,多个工人只是互相干扰。

幸运的是,随着我们同时处理越来越多的更新轮次,多个工人的好处也在增加。这里是一次处理十个轮次的数据

Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 10 no_inspect -w2`
round 0 finished after 8.083000954s (loading)
round 10 finished after 1.901946ms
round 20 finished after 3.092976ms
round 30 finished after 889.63µs
round 40 finished after 409.001µs
round 50 finished after 320.248µs
...

一次处理一百轮

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100 no_inspect -w2`
round 0 finished after 8.121800831s (loading)
round 100 finished after 2.52821ms
round 200 finished after 3.119036ms
round 300 finished after 1.63147ms
round 400 finished after 1.008668ms
round 500 finished after 941.426µs
...

一次处理一百万轮

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100000 no_inspect -w2`
round 0 finished after 8.200755198s (loading)
round 100000 finished after 275.262419ms
round 200000 finished after 279.291957ms
round 300000 finished after 259.137138ms
round 400000 finished after 340.624124ms
round 500000 finished after 259.870938ms
...

这些最后的数据,一个工人时大约是半秒,有第二个工人时有所改善。

更快地前进

差分数据流中有几个性能优化,旨在尽可能让底层运算符接近你期望编写的样子。此外,通过建立在timely数据流之上,你可以在你最擅长的地方添加自己的实现。

例如,我们在这个案例中也知道底层集合经历了一系列变化,这意味着它们的戳是完全有序的。在这种情况下,我们可以使用一个更简单的实现,count_total。这显著减少了每个批次的更新时间。

Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 10 no_inspect -w2`
round 0 finished after 5.985084002s (loading)
round 10 finished after 1.802729ms
round 20 finished after 2.202838ms
round 30 finished after 192.902µs
round 40 finished after 198.342µs
round 50 finished after 187.725µs
...

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100 no_inspect -w2`
round 0 finished after 5.588270073s (loading)
round 100 finished after 3.114716ms
round 200 finished after 2.657691ms
round 300 finished after 890.972µs
round 400 finished after 448.537µs
round 500 finished after 384.565µs
...

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100000 no_inspect -w2`
round 0 finished after 6.486550581s (loading)
round 100000 finished after 89.096615ms
round 200000 finished after 79.469464ms
round 300000 finished after 72.568018ms
round 400000 finished after 93.456272ms
round 500000 finished after 73.954886ms
...

这些时间已经从起点下降了很多;我们现在每秒吸收超过一百万轮更新,即使在多个工人之间分布式处理也能产生正确的(不仅仅是一致的)答案。

第二个例子:k-核心计算

图中的k-核心是它的边中最大的子集,使得所有具有任何关联边的顶点的度至少为k。找到k-核心的一种方法是通过重复删除所有关联于度小于k的顶点的边。这些消失的边可能会降低其他顶点的度,因此我们需要迭代地删除具有度小于k的顶点的边,直到我们停止。也许我们会丢弃所有的边,也许我们会留下一些。

这是一个直接实现,我们反复确定活动节点集(至少有 k 条指向或来自它们的边),并将集合 edges 限制为同时存在于 active 中的 srcdst

let k = 5;

// iteratively thin edges.
edges.iterate(|inner| {

    // determine the active vertices        /-- this is a lie --\
    let active = inner.flat_map(|(src,dst)| [src,dst].into_iter())
                      .map(|node| (node, ()))
                      .group(|_node, s, t| if s[0].1 > k { t.push(((), 1)); })
                      .map(|(node,_)| node);

    // keep edges between active vertices
    edges.enter(&inner.scope())
         .semijoin(active)
         .map(|(src,dst)| (dst,src))
         .semijoin(active)
         .map(|(dst,src)| (src,dst))
});

为了完全清楚,使用 into_iter() 的语法不起作用,因为 Rust 需要,而且需要更糟糕的语法来获取两个元素的非堆分配迭代器。但是,它确实有效,

Running `target/release/examples/degrees 10000000 50000000 1 5 kcore1`
Loading finished after 72204416910

好吧,这是一个东西。谁知道72秒是否好呢?(ed:比之前这个readme中的数字要差)。

真正惊人的是接下来发生的事情

worker 0, round 1 finished after Duration { secs: 0, nanos: 567171 }
worker 0, round 2 finished after Duration { secs: 0, nanos: 449687 }
worker 0, round 3 finished after Duration { secs: 0, nanos: 467143 }
worker 0, round 4 finished after Duration { secs: 0, nanos: 480019 }
worker 0, round 5 finished after Duration { secs: 0, nanos: 404831 }

我们用大约半毫秒的时间来 更新 k-core 计算。每次边添加和删除都可能导致其他边从 k-core 中掉出,或者更令人困惑的是 返回 到 k-core,而微分数据流会正确地为你更新所有这些。而且它是以亚毫秒的时间尺度完成的。

如果我们将批处理增加一千倍,吞吐量会有很大提升

Running `target/release/examples/degrees 10000000 50000000 1000 5 kcore1`
Loading finished after Duration { secs: 73, nanos: 507094824 }
worker 0, round 1000 finished after Duration { secs: 0, nanos: 55649900 }
worker 0, round 2000 finished after Duration { secs: 0, nanos: 51793416 }
worker 0, round 3000 finished after Duration { secs: 0, nanos: 57733231 }
worker 0, round 4000 finished after Duration { secs: 0, nanos: 50438934 }
worker 0, round 5000 finished after Duration { secs: 0, nanos: 55020469 }

每个批次在50多毫秒内进行1000次更新,平均每次更新约50微秒,相当于每秒大约20,000个不同的更新。

我认为这都很棒,不仅因为它能够工作,而且还似乎工作得很好。

路线图

问题跟踪器有几个与当前性能缺陷或缺失功能相关的问题。如果您有兴趣贡献,那就太好了!如果您有其他问题,请不要犹豫,与我们联系。

致谢

除了对这个仓库的贡献外,微分数据流还基于硅谷已关闭的微软研究院的工作,并在苏黎世联邦理工学院(ETH Zürich)的系统组继续进行。每个机构(包括其他机构)的许多合作者(包括其他机构)都贡献了思想和实现。

依赖关系

~4MB
~74K SLoC