13 个版本 (8 个破坏性版本)

0.10.2 2022年2月10日
0.10.0 2022年1月2日
0.8.0 2021年12月31日
0.7.0 2021年11月14日
0.2.1 2020年12月8日

#551异步

Download history 485/week @ 2024-03-14 515/week @ 2024-03-21 459/week @ 2024-03-28 546/week @ 2024-04-04 482/week @ 2024-04-11 734/week @ 2024-04-18 924/week @ 2024-04-25 479/week @ 2024-05-02 492/week @ 2024-05-09 540/week @ 2024-05-16 455/week @ 2024-05-23 406/week @ 2024-05-30 504/week @ 2024-06-06 552/week @ 2024-06-13 584/week @ 2024-06-20 403/week @ 2024-06-27

每月下载量 2,117
用于 3 个 Crates (2 个直接使用)

MIT 许可证

235KB
5K SLoC

par-stream: 异步并行流处理

par-stream 是一个用于 Rust 的异步并行流处理库。

[ crates.io | API 文档 ]

示例

  • 有序并行处理数据流 (代码)
  • 无序并行处理数据流 (代码)
  • 分散和收集 (代码)
  • 并行归并排序 (代码)
  • 并行洗牌 (代码)

访问 示例 目录以探索更多示例。

许可证

MIT 许可证。请参阅 LICENSE 文件。


lib.rs:

异步流并行处理库。

运行时配置

以下 cargo 功能选择并行工作者的后端运行时。最多指定其中一个,否则 crate 会引发编译错误。

  • runtime-tokio 启用 [tokio] 多线程运行时。
  • runtime-async-std 启用 async-std 默认运行时。

如果您想提供自定义运行时,请阅读 使用自定义运行时

扩展特质

扩展特质扩展现有的 Stream,为现有流提供额外的组合器。它们可以从 [prelude] 中导入以便于使用。

use par_stream::prelude::*;

根据流的特性,由不同的特质提供流组合器。

非并行流项目操作特质

流元素排序的特质

并行处理的特质

  • ParStreamExt 需要
    • Self: 'static + Send + Stream
    • Self::Item: 'static + Send
  • TryParStreamExt 需要
    • Self: 'static + Send + Stream<Item =Result<T, E>>,
    • T: 'static + Send
    • E: 'static + Send

并行处理

这些组合器在流上并行执行任务,可以是按顺序/无序和可失败或不可失败的方式。

将上述组合器链接在一起可以建立并行处理的数据流。

use futures::stream::{self, StreamExt as _};
use par_stream::{IndexStreamExt as _, ParStreamExt as _};

let vec: Vec<_> = stream::iter(0i64..1000)
    // a series of unordered parallel tasks
    .par_then(None, |val| async move { val.pow(2) })
    .par_then(None, |val| async move { val * 2 })
    .par_then(None, |val| async move { val + 1 })
    .collect()
    .await;

itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));

无序并行处理

该库提供了项目重新排序组合器。

它们可以与来自 [futures] 库的 enumerate() 或本库的可失败版本 try_enumerate() 结合使用,以建立无序数据处理流程。

use futures::stream::{self, StreamExt as _};
use par_stream::{IndexStreamExt as _, ParStreamExt as _};

let vec: Vec<_> = stream::iter(0i64..1000)
    // add index number to each item
    .enumerate()
    // a series of unordered parallel tasks
    .par_then_unordered(None, |(index, val)| async move { (index, val.pow(2)) })
    .par_then_unordered(None, |(index, val)| async move { (index, val * 2) })
    .par_then_unordered(None, |(index, val)| async move { (index, val + 1) })
    // reorder the items back by index number
    .reorder_enumerated()
    .collect()
    .await;

itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));

单播模式

  • shared() 创建可以发送到多个接收器的流句柄。轮询句柄将以无锁方式轮询底层流。通过消耗句柄,接收器将获取流项的一部分。
  • spawned() 将创建一个活动的工作进程,将流项目转发到通道。该通道可以被克隆并发送给多个接收者,以便每个接收者接收流项目的一部分。

两者 shared()spawned() 都将流的所有权分割成多个接收者。它们在性能方面有所不同。其中 spawned() 方法创建一个活动工作进程并分配额外的缓冲区,而 shared() 则不这样做。在大多数情况下,它们的性能相当。

组合器可以与 select() 一起使用,构建一个散列收集数据流。

use futures::stream::{self, StreamExt as _};
use par_stream::{ParStreamExt as _, StreamExt as _};
use std::collections::HashSet;

let stream = futures::stream::iter(0..1000);

// scatter stream items to two receivers
let share1 = stream.shared(); // or stream.scatter(buf_size)
let share2 = share1.clone();

// process elements in separate parallel workers
let receiver1 = share1.map(|val| val * 2).spawned(None);
let receiver2 = share2.map(|val| val * 2).spawned(None);

// gather values back from receivers
let mut vec: Vec<_> = stream::select(receiver1, receiver2).collect().await;

// verify output values
vec.sort();
itertools::assert_equal(vec, (0..2000).step_by(2));

广播模式

  • broadcast() 将流项目的副本广播到接收者。在开始接收项目之前注册接收者,并保证从第一个项目开始。
  • tee()broadcast() 类似,但在开始接收项目后可以注册新的接收者。接收者不一定从第一个项目开始。

broadcast() 可以与 zip() 一起使用,构建广播连接数据流。

use futures::prelude::*;
use par_stream::prelude::*;

let data = vec![2, -1, 3, 5];
let stream = futures::stream::iter(data.clone());

// broadcast the stream into three receivers
let mut builder = stream.broadcast(None, true);
let rx1 = builder.register();
let rx2 = builder.register();
let rx3 = builder.register();
builder.build(); // finish the builder to start consuming items

// spawn a parallel processor for each receiver
let stream1 = rx1.map(|v| v * 2).spawned(None);
let stream2 = rx2.map(|v| v * 3).spawned(None);
let stream3 = rx3.map(|v| v * 5).spawned(None);

// collect output values
let vec: Vec<_> = stream1
    .zip(stream2)
    .zip(stream3)
    .map(|((v1, v2), v3)| (v1, v2, v3))
    .collect()
    .await;

// verify output values
assert_eq!(vec, [(4, 6, 10), (-2, -3, -5), (6, 9, 15), (10, 15, 25)]);

并行数据生成

以下组合器创建并行工作进程,每个工作进程独立生成项目。

参数

组合器可能需要额外的参数来配置工作进程的数量和缓冲区大小。

  • N: Into<NumWorkers> 用于 par_for_each<N, F>(n: N, f: F)
  • B: Into<BufSize> 用于 scatter<B>(b: B)
  • P: Into<ParParams> 用于 par_then<P, F>(p: P, f: F)

N: Into<NumWorkers> 接受以下值。

  • None:默认值,设置为逻辑系统处理器的数量。
  • 8(整数):固定的工作线程数量。
  • 2.0(浮点数):设置为逻辑系统处理器数量的缩放值。

B: Into<BufSize> 接受以下值。

  • None:默认值,设置为逻辑系统处理器的两倍。
  • 8(整数):固定缓冲区大小。
  • 2.0(浮点数):设置为逻辑系统处理器数量的缩放值。

P: Into<ParParms> 是工作线程大小和缓冲区大小的组合。它接受以下值。

  • None:默认值,设置为工作线程大小和缓冲区大小的默认值。
  • 8(整数):固定工作线程大小,缓冲区大小为工作线程大小的常数倍。
  • 2.0(浮点数):将工作线程大小设置为逻辑系统处理器的缩放值,缓冲区大小为工作线程大小的常数倍。
  • ParParamsConfig:手动配置。

实用组合器

该软件包提供了一些实用的流组合器,可以让你更轻松地完成任务 :).

使用自定义运行时

要提供自定义运行时实现,声明一个实现 Runtime 的类型。然后,为该类型创建一个实例并将其传递给 set_global_runtime()。全局运行时最多只能设置一次,并且仅在未启用任何运行时Cargo功能时有效。否则 set_global_runtime() 返回错误。

use futures::future::BoxFuture;
use par_stream::rt::{Runtime, SleepHandle, SpawnHandle};
use std::{any::Any, time::Duration};

pub struct MyRuntime {/* omit */}

impl MyRuntime {
    pub fn new() -> Self {
        Self { /* omit */ }
    }
}

unsafe impl Runtime for MyRuntime {
    fn block_on<'a>(
        &self,
        fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
    ) -> Box<dyn Send + Any + 'static> {
        todo!()
    }

    fn block_on_executor<'a>(
        &self,
        fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
    ) -> Box<dyn Send + Any + 'static> {
        todo!()
    }

    fn spawn(
        &self,
        fut: BoxFuture<'static, Box<dyn Send + Any + 'static>>,
    ) -> Box<dyn SpawnHandle> {
        todo!()
    }

    fn spawn_blocking(
        &self,
        f: Box<dyn FnOnce() -> Box<dyn Send + Any + 'static> + Send>,
    ) -> Box<dyn SpawnHandle> {
        todo!()
    }

    fn sleep(&self, dur: Duration) -> Box<dyn SleepHandle> {
        todo!()
    }
}

par_stream::rt::set_global_runtime(MyRuntime::new()).unwrap();

依赖关系

~5–16MB
~210K SLoC