#future #async #stream-processing

nightly deluge

一个高度并发的流库,驱动底层 futures 以并发或并行方式处理流操作,尽可能快地完成

13 个版本

0.2.1 2023 年 11 月 15 日
0.2.0 2023 年 1 月 22 日
0.1.10 2022 年 11 月 1 日
0.1.9 2022 年 10 月 27 日
0.1.2 2022 年 9 月 29 日

#423异步

每月 22 次下载

MPL-2.0 许可证

86KB
2K SLoC

Deluge 是(不是)一个 Stream

Crate Info API Docs Rustc Version 1.64.0+

DelugeStream 之上构建,提供默认并行或并发的流操作。它允许用户拥有一个有序的 futures 流,这些 futures 是并发评估的,所有复杂性都隐藏在 Deluge 本身内部。我们通过比流高一个级别的工作来实现这一点。我们不是返回可能在未来的某个时刻出现的值,而是立即返回一个未评估的 futures 迭代器,然后由收集器进行评估。

下面的动画显示了在高度并发的六元素集合上映射的一个示例。📘 表示底层元素变得可用的耗时,而 📗 表示应用映射操作的时间。

Example of processing using Deluge and Streams

此库仍然是实验性的,使用时请自行承担风险

设计决策

这是一个有观点的库,将易用性和外部简单性放在首位。应用于单个元素的操作,如 maps 和 filters 不会 分配内存。它们只是将每个元素包装在另一个 future 中,但不会控制这些处理后的元素的评估方式。评估策略由收集器控制。目前提供了两种基本的收集器:一个是并发的,另一个是并行的。在性能和易用性之间做出选择时,我们可能会偏向于易用性。

并发收集器接受一个可选的并发限制。如果指定了该限制,则最多同时评估与该限制相等数量的 futures。

let result = [1, 2, 3, 4]
    .into_deluge()
    .map(|x| async move { x * 2 })
    .collect::<Vec<usize>>(None)
    .await;

assert_eq!(vec![2, 4, 6, 8], result);

并行收集器会启动一定数量的工作线程。如果没有指定工作线程的数量,则默认为逻辑 cpu 的数量。如果没有指定并发限制,则每个工作线程默认为 total_futures_to_evaluate / number_of_workers。请注意,您需要启用 tokioasync-std 功能以支持并行收集器。

let result = (0..150)
    .into_deluge()
    .map(|idx| async move {
        tokio::time::sleep(Duration::from_millis(50)).await;
        idx
    })
    .collect_par::<Vec<usize>>(10, None)
    .await;

assert_eq!(result.len(), 150);

请参阅 测试 了解更多使用该库的示例。

转换为 Stream

《collect》和《collect_par》都实现了《Stream》。如果您想使用任何现有的《Stream》扩展或《Deluge》中缺少的函数,可以先使用《collect》,然后将它用作其他任何流。在《Deluge》中收集之前的所有内容都将获得并行运行将来的 futures 的好处。

问题

我想向 DelugeExt 添加另一个操作。我应该这样做吗?

当然应该。请确保在转换单个元素的运算中不要在堆上分配内存。您会发现任何有用的运算都是公平的游戏,欢迎贡献。

我发现了一个性能提升,我应该提交一个 PR 吗?

绝对应该!只要用户暴露的 API 不变得更复杂,分配的数量不会增加,中间内存使用不会增加。如果您觉得绝对有必要打破上述任何规则,请先打开一个问题,我们将讨论。

依赖项

约 3-17MB
约 172K SLoC