4 个版本
0.1.3 | 2023年7月31日 |
---|---|
0.1.2 | 2023年7月31日 |
0.1.1 | 2022年12月21日 |
0.1.0 | 2022年10月5日 |
#775 在 并发
每月 34 次下载
74KB
1.5K SLoC
帕瓦瓦维兹姆
这是一个非常严肃的用于结构化并发的工具,我保证:3
lib.rs
:
一个用于结构化并发和异构线程并行处理的简单库。
(如果您在寻找使用迭代器接口进行同构并行处理,请查看 rayon
;如果您在寻找并发运行大量 I/O 任务而不是并发运行少量计算密集型任务,那么一个 async
运行时可能更适合您)
概述
本库提供三种主要的方式进行结构化并发
- [
background
][background()],这是一个在后台线程上运行闭包的简单方法。 Worker
和Promise
,它们允许构建任意管道计算图,并从拥有线程接收工作包进行处理。reader::Reader
,一个从可取消流中读取并处理或转发结果的背景线程。
Worker 和 Promise
Worker
Worker
是围绕操作系统级别的线程的包装,它强制执行 结构化并发:即并发操作应该像其他控制流结构一样进行组织。当 Worker
被丢弃时,底层线程将被通知退出并加入。如果线程崩溃,崩溃将被转发到丢弃 Worker
的线程。
这些“拥有线程”确保在完成并发操作后不会留下任何过时的线程。转发 Worker
的崩溃确保启动计算(通过生成 Worker
)的代码将被正确地销毁,就像它直接执行计算一样,而不是生成一个 Worker
来执行。
Worker
使用基于消息的接口,类似于actor。它们不使用用户编写的处理循环,而是接收用户定义类型的一些消息。这鼓励将使用Worker
的代码视为数据处理管道:启动Worker
的代码需要向其提交输入数据,然后这些数据可以经过转换并传递到其他地方。
承诺
Promise
提供了一种机制,用于将计算结果传回启动它的代码或处理管道的下一部分。一旦计算完成,其结果可以通过Promise::fulfill
提交,并且持有相应PromiseHandle
的线程可以检索它。
使用
use pawawwewism::{Worker, Promise, promise};
let mut worker = Worker::builder().spawn(|(input, promise): (i32, Promise<i32>)| {
println!("Doing heavy task...");
let output = input + 1;
promise.fulfill(output);
}).unwrap();
let (promise, handle) = promise();
worker.send((1, promise));
// <do other work concurrently>
let output = handle.block().expect("worker has dropped the promise; this should be impossible");
assert_eq!(output, 2);
可以将多个Worker
线程链接起来以管道化计算
use std::collections::VecDeque;
use pawawwewism::{Worker, Promise, PromiseHandle, promise};
// This worker is identical to the one in the first example
let mut worker1 = Worker::builder().spawn(|(input, promise): (i32, Promise<i32>)| {
println!("Doing heavy task 1...");
let output = input + 1;
promise.fulfill(output);
}).unwrap();
// The second worker is passed a `PromiseHandle` instead of a direct value
let mut next = 1;
let mut worker2 = Worker::builder().spawn(move |handle: PromiseHandle<i32>| {
let input = handle.block().unwrap();
assert_eq!(input, next);
next += 1;
}).unwrap();
for input in [0,1,2,3] {
let (promise1, handle1) = promise();
worker1.send((input, promise1));
// On the second iteration and later, this `send` will give `worker1` work to do, while
// `worker2` still processes the previous element, achieving pipelining.
worker2.send(handle1);
}
依赖关系
~435KB