13 个版本 (8 个破坏性版本)
0.10.2 | 2022年2月10日 |
---|---|
0.10.0 | 2022年1月2日 |
0.8.0 | 2021年12月31日 |
0.7.0 | 2021年11月14日 |
0.2.1 | 2020年12月8日 |
#551 在 异步
每月下载量 2,117
用于 3 个 Crates (2 个直接使用)
235KB
5K SLoC
par-stream: 异步并行流处理
par-stream
是一个用于 Rust 的异步并行流处理库。
示例
访问 示例 目录以探索更多示例。
许可证
MIT 许可证。请参阅 LICENSE 文件。
lib.rs
:
异步流并行处理库。
运行时配置
以下 cargo 功能选择并行工作者的后端运行时。最多指定其中一个,否则 crate 会引发编译错误。
runtime-tokio
启用 [tokio] 多线程运行时。runtime-async-std
启用 async-std 默认运行时。
如果您想提供自定义运行时,请阅读 使用自定义运行时。
扩展特质
扩展特质扩展现有的 Stream,为现有流提供额外的组合器。它们可以从 [prelude] 中导入以便于使用。
use par_stream::prelude::*;
根据流的特性,由不同的特质提供流组合器。
非并行流项目操作特质
- StreamExt 需要
Self: Stream
- TryStreamExt 需要
Self: TryStream
流元素排序的特质
- IndexStreamExt 需要
Stream<Item = Result<(usize, T), E>>
- TryIndexStreamExt 需要
Stream<Item = Result<(usize, T), E>>
并行处理的特质
- ParStreamExt 需要
Self: 'static + Send + Stream
和Self::Item: 'static + Send
- TryParStreamExt 需要
Self: 'static + Send + Stream<Item =Result<T, E>>
,T: 'static + Send
和E: 'static + Send
并行处理
这些组合器在流上并行执行任务,可以是按顺序/无序和可失败或不可失败的方式。
par_map()
执行并行阻塞任务,同时尊重输入顺序。par_then()
执行并行异步任务,同时尊重输入顺序。par_map_unordered()
执行并行阻塞任务,不尊重输入顺序。par_then_unordered()
执行并行异步任务,不尊重输入顺序。try_par_map()
、try_par_then()
、try_par_then_unordered()
是可失败版本。
将上述组合器链接在一起可以建立并行处理的数据流。
use futures::stream::{self, StreamExt as _};
use par_stream::{IndexStreamExt as _, ParStreamExt as _};
let vec: Vec<_> = stream::iter(0i64..1000)
// a series of unordered parallel tasks
.par_then(None, |val| async move { val.pow(2) })
.par_then(None, |val| async move { val * 2 })
.par_then(None, |val| async move { val + 1 })
.collect()
.await;
itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));
无序并行处理
该库提供了项目重新排序组合器。
reorder_enumerated()
根据索引号重新排序项目(index, value)
try_reorder_enumerated()
是可失败版本。
它们可以与来自 [futures] 库的 enumerate() 或本库的可失败版本 try_enumerate() 结合使用,以建立无序数据处理流程。
use futures::stream::{self, StreamExt as _};
use par_stream::{IndexStreamExt as _, ParStreamExt as _};
let vec: Vec<_> = stream::iter(0i64..1000)
// add index number to each item
.enumerate()
// a series of unordered parallel tasks
.par_then_unordered(None, |(index, val)| async move { (index, val.pow(2)) })
.par_then_unordered(None, |(index, val)| async move { (index, val * 2) })
.par_then_unordered(None, |(index, val)| async move { (index, val + 1) })
// reorder the items back by index number
.reorder_enumerated()
.collect()
.await;
itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));
单播模式
shared()
创建可以发送到多个接收器的流句柄。轮询句柄将以无锁方式轮询底层流。通过消耗句柄,接收器将获取流项的一部分。spawned()
将创建一个活动的工作进程,将流项目转发到通道。该通道可以被克隆并发送给多个接收者,以便每个接收者接收流项目的一部分。
两者 shared()
和 spawned()
都将流的所有权分割成多个接收者。它们在性能方面有所不同。其中 spawned()
方法创建一个活动工作进程并分配额外的缓冲区,而 shared()
则不这样做。在大多数情况下,它们的性能相当。
组合器可以与 select()
一起使用,构建一个散列收集数据流。
use futures::stream::{self, StreamExt as _};
use par_stream::{ParStreamExt as _, StreamExt as _};
use std::collections::HashSet;
let stream = futures::stream::iter(0..1000);
// scatter stream items to two receivers
let share1 = stream.shared(); // or stream.scatter(buf_size)
let share2 = share1.clone();
// process elements in separate parallel workers
let receiver1 = share1.map(|val| val * 2).spawned(None);
let receiver2 = share2.map(|val| val * 2).spawned(None);
// gather values back from receivers
let mut vec: Vec<_> = stream::select(receiver1, receiver2).collect().await;
// verify output values
vec.sort();
itertools::assert_equal(vec, (0..2000).step_by(2));
广播模式
broadcast()
将流项目的副本广播到接收者。在开始接收项目之前注册接收者,并保证从第一个项目开始。tee()
与broadcast()
类似,但在开始接收项目后可以注册新的接收者。接收者不一定从第一个项目开始。
broadcast()
可以与 zip() 一起使用,构建广播连接数据流。
use futures::prelude::*;
use par_stream::prelude::*;
let data = vec![2, -1, 3, 5];
let stream = futures::stream::iter(data.clone());
// broadcast the stream into three receivers
let mut builder = stream.broadcast(None, true);
let rx1 = builder.register();
let rx2 = builder.register();
let rx3 = builder.register();
builder.build(); // finish the builder to start consuming items
// spawn a parallel processor for each receiver
let stream1 = rx1.map(|v| v * 2).spawned(None);
let stream2 = rx2.map(|v| v * 3).spawned(None);
let stream3 = rx3.map(|v| v * 5).spawned(None);
// collect output values
let vec: Vec<_> = stream1
.zip(stream2)
.zip(stream3)
.map(|((v1, v2), v3)| (v1, v2, v3))
.collect()
.await;
// verify output values
assert_eq!(vec, [(4, 6, 10), (-2, -3, -5), (6, 9, 15), (10, 15, 25)]);
并行数据生成
以下组合器创建并行工作进程,每个工作进程独立生成项目。
par_unfold
从未来生成值。par_unfold_blocking
从阻塞函数生成值。try_par_unfold
和try_par_unfold_blocking
是它们的错误版本。
参数
组合器可能需要额外的参数来配置工作进程的数量和缓冲区大小。
N: Into<NumWorkers>
用于par_for_each<N, F>(n: N, f: F)
B: Into<BufSize>
用于scatter<B>(b: B)
P: Into<ParParams>
用于par_then<P, F>(p: P, f: F)
N: Into<NumWorkers>
接受以下值。
None
:默认值,设置为逻辑系统处理器的数量。8
(整数):固定的工作线程数量。2.0
(浮点数):设置为逻辑系统处理器数量的缩放值。
B: Into<BufSize>
接受以下值。
None
:默认值,设置为逻辑系统处理器的两倍。8
(整数):固定缓冲区大小。2.0
(浮点数):设置为逻辑系统处理器数量的缩放值。
P: Into<ParParms>
是工作线程大小和缓冲区大小的组合。它接受以下值。
None
:默认值,设置为工作线程大小和缓冲区大小的默认值。8
(整数):固定工作线程大小,缓冲区大小为工作线程大小的常数倍。2.0
(浮点数):将工作线程大小设置为逻辑系统处理器的缩放值,缓冲区大小为工作线程大小的常数倍。ParParamsConfig
:手动配置。
实用组合器
该软件包提供了一些实用的流组合器,可以让你更轻松地完成任务 :).
with_state
将一个流与一个状态值绑定。wait_until
允许流等待直到一个future解析。reduce
将流项目减少到单个值。batching
为每个输出项目消耗任意数量的输入项目。stateful_then
、stateful_map
、stateful_batching
是有状态的对应版本。take_until_error
在出现错误后停止流接受值。catch_error
将结果流拆分为未包装值流和一个可能解析为错误的future。
使用自定义运行时
要提供自定义运行时实现,声明一个实现 Runtime 的类型。然后,为该类型创建一个实例并将其传递给 set_global_runtime()。全局运行时最多只能设置一次,并且仅在未启用任何运行时Cargo功能时有效。否则 set_global_runtime() 返回错误。
use futures::future::BoxFuture;
use par_stream::rt::{Runtime, SleepHandle, SpawnHandle};
use std::{any::Any, time::Duration};
pub struct MyRuntime {/* omit */}
impl MyRuntime {
pub fn new() -> Self {
Self { /* omit */ }
}
}
unsafe impl Runtime for MyRuntime {
fn block_on<'a>(
&self,
fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
) -> Box<dyn Send + Any + 'static> {
todo!()
}
fn block_on_executor<'a>(
&self,
fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
) -> Box<dyn Send + Any + 'static> {
todo!()
}
fn spawn(
&self,
fut: BoxFuture<'static, Box<dyn Send + Any + 'static>>,
) -> Box<dyn SpawnHandle> {
todo!()
}
fn spawn_blocking(
&self,
f: Box<dyn FnOnce() -> Box<dyn Send + Any + 'static> + Send>,
) -> Box<dyn SpawnHandle> {
todo!()
}
fn sleep(&self, dur: Duration) -> Box<dyn SleepHandle> {
todo!()
}
}
par_stream::rt::set_global_runtime(MyRuntime::new()).unwrap();
依赖关系
~5–16MB
~210K SLoC