1 个不稳定发布
0.13.0-dev.1 | 2023年12月15日 |
---|
#169 在 数据结构
2,339 每月下载量
在 cargo-tally 中使用
4MB
10K SLoC
差分数据流
在 Rust 上对 差分数据流 和 timely 数据流 的实现。
背景
差分数据流是一种数据并行编程框架,旨在高效处理大量数据,并快速响应对输入集合的任意更改。您可以在 差分数据流 mdbook 和 差分数据流文档 中了解更多信息。
差分数据流程序以数据集合的函数变换形式编写,使用熟悉的操作符如 map
、filter
、join
和 reduce
。差分数据流还包括更复杂的操作符,如 iterate
,该操作符反复将差分数据流片段应用于集合。程序被编译为 timely 数据流 计算过程。
例如,以下是一个计算有向图出度分布的差分数据流片段(对于每个度,具有那么多出度节点的数量)
let out_degr_dist =
edges.map(|(src, _dst)| src) // extract source
.count() // count occurrences of source
.map(|(_src, deg)| deg) // extract degree
.count(); // count occurrences of degree
或者,以下是一个计算从起始节点集合 roots
可到达的节点集的片段
let reachable =
roots.iterate(|reach|
edges.enter(&reach.scope())
.semijoin(reach)
.map(|(src, dst)| dst)
.concat(reach)
.distinct()
)
一旦编写,差分数据流就会对其初始为空的输入集合的任意更改做出响应,向每个输出集合报告相应的更改。差分数据流可以快速响应,因为它只在集合发生变化的地方执行操作,而不会在其他地方工作。
在上面的示例中,我们可以向和从 edges
中添加和移除,动态地改变图,并且能够立即看到结果的变化:如果度分布发生了变化,我们会看到变化,如果节点现在(或不再)可达,我们也会被告知。我们还可以向和从 roots
中添加和移除,这会从根本上改变可达性查询本身。
请务必查看 微分数据流文档,它正在不断改进。
示例:在一个图中计算度数。
让我们看看这个计算输出度分布的示例,以了解微分数据流是如何实际工作的。如果您想跟随示例,请参考这个存储库中的 examples/hello.rs。
图是一系列对 (Node, Node)
的集合,一个标准分析是确定每个 Node
在第一个位置出现的次数,即其“度”。具有每个度的节点数量是帮助统计图的有用数据。
为了确定输出度分布,我们在一个新的及时数据流范围内创建计算,并描述我们的计算以及我们如何与之交互。
// create a degree counting differential dataflow
let (mut input, probe) = worker.dataflow(|scope| {
// create edge input, count a few ways.
let (input, edges) = scope.new_collection();
let out_degr_distr =
edges.map(|(src, _dst)| src) // extract source
.count() // count occurrences of source
.map(|(_src, deg)| deg) // extract degree
.count(); // count occurrences of degree
// show us something about the collection, notice when done.
let probe =
out_degr_distr
.inspect(|x| println!("observed: {:?}", x))
.probe();
(input, probe)
});
我们返回的 input
和 probe
是我们将数据输入数据流的方式,以及我们如何注意到某些计算已完成。这些都是及时数据流的惯例,我们不会在这里详细介绍(请参考 及时数据流存储库)。
如果我们用一些随机的图数据来喂这个计算,比如说十个节点中五十个随机的边,我们会得到如下输出
Echidnatron% cargo run --release --example hello -- 10 50 1 inspect
Finished release [optimized + debuginfo] target(s) in 0.05s
Running `target/release/examples/hello 10 50 1 inspect`
observed: ((3, 1), 0, 1)
observed: ((4, 2), 0, 1)
observed: ((5, 4), 0, 1)
observed: ((6, 2), 0, 1)
observed: ((7, 1), 0, 1)
round 0 finished after 772.464µs (loading)
这显示了通过 inspect
操作符传递的记录,揭示了集合的内容:有五个不同的度,从三到七。具有每个度的节点数量是帮助统计图的有用数据。
让我们通过移除一条边并添加一条新的随机边来更新输入
observed: ((2, 1), 1, 1)
observed: ((3, 1), 1, -1)
observed: ((7, 1), 1, -1)
observed: ((8, 1), 1, 1)
round 1 finished after 149.701µs
我们在这里看到一些变化!那些度数为三和七的节点被度数为二和八的节点所取代;看起来有一个节点失去了一条边并将其给了另一个节点!
再做一些变化怎么样?
round 2 finished after 127.444µs
round 3 finished after 100.628µs
round 4 finished after 130.609µs
observed: ((5, 3), 5, 1)
observed: ((5, 4), 5, -1)
observed: ((6, 2), 5, -1)
observed: ((6, 3), 5, 1)
observed: ((7, 1), 5, 1)
observed: ((8, 1), 5, -1)
round 5 finished after 161.82µs
这里发生了一些奇怪的事情。首先,第2、3和4轮没有打印任何东西。真的吗?结果是我们所做的随机变化并没有影响任何度数计数,我们在节点之间移动了边,保留了度数。这是可能的。
第二个奇怪的事情是在第5轮中,只有两个边的变化,但是输出有六个变化!结果是我们最多可以有八个变化。度数八被转换回七,度数五被转换回六。但是:从五到六的变化会改变每个的计数,并且每个变化需要两个记录差异。八和七更加简洁,因为它们的计数只有一,这意味着只是记录的到达和离开,而不是变化。
扩展
微分数据流吸引人的地方在于它只在发生变化的区域工作,所以即使有大量数据,如果变化不多,它仍然可以非常快。让我们将我们的10个节点和50条边扩展到一百万倍
Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 inspect
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 1 inspect`
observed: ((1, 336908), 0, 1)
observed: ((2, 843854), 0, 1)
observed: ((3, 1404462), 0, 1)
observed: ((4, 1751921), 0, 1)
observed: ((5, 1757099), 0, 1)
observed: ((6, 1459805), 0, 1)
observed: ((7, 1042894), 0, 1)
observed: ((8, 653178), 0, 1)
observed: ((9, 363983), 0, 1)
observed: ((10, 181423), 0, 1)
observed: ((11, 82478), 0, 1)
observed: ((12, 34407), 0, 1)
observed: ((13, 13216), 0, 1)
observed: ((14, 4842), 0, 1)
observed: ((15, 1561), 0, 1)
observed: ((16, 483), 0, 1)
observed: ((17, 143), 0, 1)
observed: ((18, 38), 0, 1)
observed: ((19, 8), 0, 1)
observed: ((20, 3), 0, 1)
observed: ((22, 1), 0, 1)
round 0 finished after 15.470465014s (loading)
这里有很多不同的级别。我排序了它们,因为看未排序的数据太痛苦了。通常你会看到未排序的输出,因为它们只是集合中值的变化。
让我们再次进行一次单一的变化。
observed: ((5, 1757098), 1, 1)
observed: ((5, 1757099), 1, -1)
observed: ((6, 1459805), 1, -1)
observed: ((6, 1459807), 1, 1)
observed: ((7, 1042893), 1, 1)
observed: ((7, 1042894), 1, -1)
round 1 finished after 228.451µs
尽管初始计算大约需要十五秒钟,但我们获取变化的时间大约为230微秒;这比重新运行计算快了大约十万倍。这很不错。实际上,它足够小,以至于将东西打印到屏幕上的时间有点昂贵,所以我们停止这样做。
现在我们可以看着变化滚动过去,看看时间。
Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 no_inspect
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 1 no_inspect`
round 0 finished after 15.586969662s (loading)
round 1 finished after 1.070239ms
round 2 finished after 2.303187ms
round 3 finished after 208.45µs
round 4 finished after 163.224µs
round 5 finished after 118.792µs
...
不错。每次更新大约有几百微秒,这意味着可能每秒有几千次更新。对于我的笔记本电脑来说,这不是一个糟糕的数字,但这还不是正确的答案。
扩展...
微分数据流旨在除了延迟外,还具有吞吐量。我们可以增加它同时处理的更新轮数,这可以增加其实际吞吐量。这不会改变计算输出,除了我们一次看到更大的输出变化批次。
注意,上面的时间对于每次单一更新大约是几百微秒。如果我们一次处理十轮更新,我们得到的时间看起来像这样
Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 10 no_inspect`
round 0 finished after 15.556475008s (loading)
round 10 finished after 421.219µs
round 20 finished after 1.56369ms
round 30 finished after 338.54µs
round 40 finished after 351.843µs
round 50 finished after 339.608µs
...
这很有吸引力,因为十轮并不比单一更新贵多少,而且我们完成前十轮的时间比单独执行前十次更新的时间要少得多。之后的每一轮都是额外的时间。
我们一次处理一百轮更新
Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 100 no_inspect`
round 0 finished after 15.528724145s (loading)
round 100 finished after 2.567577ms
round 200 finished after 1.861168ms
round 300 finished after 1.753794ms
round 400 finished after 1.528285ms
round 500 finished after 1.416605ms
...
我们仍然在提高性能,并且随着批量大小的增加而继续提高。一次处理10万个更新时,每个批次大约需要半秒钟。这不太“交互式”,但吞吐量更高。
Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 100000 no_inspect`
round 0 finished after 15.65053789s (loading)
round 100000 finished after 505.210924ms
round 200000 finished after 524.069497ms
round 300000 finished after 470.77752ms
round 400000 finished after 621.325393ms
round 500000 finished after 472.791742ms
...
平均下来大约是五微秒;比单个更新的百微秒快得多!而且我现在想想,每次更新实际上是有两个变化,不是吗?微分数据流做得很好!
扩展到更多
微分数据流建立在timely dataflow之上,这是一个分布式数据并行运行时。微分数据流可以扩展到多个独立的工作者,增加系统的容量(以牺牲一些降低延迟的协调为代价)。
如果我们投入两个工作者,我们1千万个节点,5千万条边计算的时间从十五秒缩短到不到八秒。
Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 1 no_inspect -w2`
round 0 finished after 8.065386177s (loading)
round 1 finished after 275.373µs
round 2 finished after 759.632µs
round 3 finished after 171.671µs
round 4 finished after 745.078µs
round 5 finished after 213.146µs
...
这是一个普通的减少。你可能注意到后续轮次的时间实际上增加了。结果是,当没有太多工作要做时,多个工作者只是相互干扰。
幸运的是,随着我们同时处理越来越多的更新轮次,多个工作者的好处也在增加。这里有每次处理十轮的数据
Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 10 no_inspect -w2`
round 0 finished after 8.083000954s (loading)
round 10 finished after 1.901946ms
round 20 finished after 3.092976ms
round 30 finished after 889.63µs
round 40 finished after 409.001µs
round 50 finished after 320.248µs
...
一次处理一百轮
Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 100 no_inspect -w2`
round 0 finished after 8.121800831s (loading)
round 100 finished after 2.52821ms
round 200 finished after 3.119036ms
round 300 finished after 1.63147ms
round 400 finished after 1.008668ms
round 500 finished after 941.426µs
...
一次处理一百万轮
Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 100000 no_inspect -w2`
round 0 finished after 8.200755198s (loading)
round 100000 finished after 275.262419ms
round 200000 finished after 279.291957ms
round 300000 finished after 259.137138ms
round 400000 finished after 340.624124ms
round 500000 finished after 259.870938ms
...
这些最后的数字是一个工作者时的半秒左右,并且随着第二个工作者的加入而有了相当大的改进。
更快地前进
微分数据流中包含几个性能优化,旨在尽可能使底层操作符接近你所期望编写的操作符。此外,通过建立在timely dataflow之上,你可以在你最能了解的地方插入自己的实现。
例如,我们在这个案例中知道底层集合会经历一系列的变化,这意味着它们的戳是完全有序的。在这种情况下,我们可以使用一个更简单的实现,count_total
。这大大减少了更新时间,对于每个批次大小都如此。
Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 10 no_inspect -w2`
round 0 finished after 5.985084002s (loading)
round 10 finished after 1.802729ms
round 20 finished after 2.202838ms
round 30 finished after 192.902µs
round 40 finished after 198.342µs
round 50 finished after 187.725µs
...
Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 100 no_inspect -w2`
round 0 finished after 5.588270073s (loading)
round 100 finished after 3.114716ms
round 200 finished after 2.657691ms
round 300 finished after 890.972µs
round 400 finished after 448.537µs
round 500 finished after 384.565µs
...
Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/examples/hello 10000000 50000000 100000 no_inspect -w2`
round 0 finished after 6.486550581s (loading)
round 100000 finished after 89.096615ms
round 200000 finished after 79.469464ms
round 300000 finished after 72.568018ms
round 400000 finished after 93.456272ms
round 500000 finished after 73.954886ms
...
这些时间已经从我们开始的地方下降了很多;我们现在每秒吸收超过一百万轮更新,即使分布式在多个工作者之间也能产生正确的(不仅仅是连续的)答案。
第二个例子:k-核心计算
图的核心是其边的一个最大子集,使得所有与其相连的顶点的度至少为k。找到k-核心的一种方法是通过重复删除所有度小于k的顶点上的所有边。这些消失的边可能会降低其他顶点的度数,因此我们需要迭代地丢弃度小于k的顶点上的边,直到我们停止。也许我们会丢弃所有边,也许我们会留下一些。
以下是一个直接实现,我们反复确定活动节点集(那些至少有k
条边指向或从它们出发的节点),并将edges
集限制在那些在active
中同时存在src
和dst
的边。
let k = 5;
// iteratively thin edges.
edges.iterate(|inner| {
// determine the active vertices /-- this is a lie --\
let active = inner.flat_map(|(src,dst)| [src,dst].into_iter())
.map(|node| (node, ()))
.group(|_node, s, t| if s[0].1 > k { t.push(((), 1)); })
.map(|(node,_)| node);
// keep edges between active vertices
edges.enter(&inner.scope())
.semijoin(active)
.map(|(src,dst)| (dst,src))
.semijoin(active)
.map(|(dst,src)| (src,dst))
});
为了完全清楚,使用into_iter()
的语法不起作用,因为Rust,而是需要一个更糟糕的语法来获取两个元素的非堆分配迭代器。但是,它起作用,并且
Running `target/release/examples/degrees 10000000 50000000 1 5 kcore1`
Loading finished after 72204416910
那是一个东西。谁也不知道72秒是否足够好?(ed:比之前这个readme版本中的数字要差)。
然而,接下来发生的事情才是惊人的
worker 0, round 1 finished after Duration { secs: 0, nanos: 567171 }
worker 0, round 2 finished after Duration { secs: 0, nanos: 449687 }
worker 0, round 3 finished after Duration { secs: 0, nanos: 467143 }
worker 0, round 4 finished after Duration { secs: 0, nanos: 480019 }
worker 0, round 5 finished after Duration { secs: 0, nanos: 404831 }
我们花费大约半个毫秒来更新k-核心计算。每条边的添加和删除都可能使其他边从k-核心中消失或更令人困惑的是返回k-核心,而微分数据流会正确地为你更新所有这些。而且它是以亚毫秒的时间尺度完成的。
如果我们把批处理提高到一千,我们可以显著提高吞吐量
Running `target/release/examples/degrees 10000000 50000000 1000 5 kcore1`
Loading finished after Duration { secs: 73, nanos: 507094824 }
worker 0, round 1000 finished after Duration { secs: 0, nanos: 55649900 }
worker 0, round 2000 finished after Duration { secs: 0, nanos: 51793416 }
worker 0, round 3000 finished after Duration { secs: 0, nanos: 57733231 }
worker 0, round 4000 finished after Duration { secs: 0, nanos: 50438934 }
worker 0, round 5000 finished after Duration { secs: 0, nanos: 55020469 }
每个批次在超过50毫秒内完成一千轮更新,平均每次更新大约50微秒,相当于每秒大约20,000个不同的更新。
我认为这都很棒,不仅因为它能工作,而且它甚至看起来工作得很好。
路线图
问题跟踪器有几个与当前性能缺陷或缺少功能相关的问题。如果你有兴趣贡献,那将非常棒!如果你有其他问题,请不要犹豫,与我们联系。
致谢
除了对这个存储库的贡献外,微分数据流还基于硅谷已关闭的微软研究实验室的工作,并在苏黎世联邦理工学院的系统组继续进行。每个机构(包括其他)的许多合作者(等等)都贡献了思想和实现。
依赖关系
~2.5–3.5MB
~58K SLoC