#worker-thread #worker #promise #thread #uwu #background-thread

帕瓦瓦维兹姆

这是一个非常严肃的用于结构化并发的工具,我保证:3

4 个版本

0.1.3 2023年7月31日
0.1.2 2023年7月31日
0.1.1 2022年12月21日
0.1.0 2022年10月5日

#775并发

每月 34 次下载

0BSD 许可证

74KB
1.5K SLoC

帕瓦瓦维兹姆

这是一个非常严肃的用于结构化并发的工具,我保证:3


lib.rs:

一个用于结构化并发和异构线程并行处理的简单库。

(如果您在寻找使用迭代器接口进行同构并行处理,请查看 rayon;如果您在寻找并发运行大量 I/O 任务而不是并发运行少量计算密集型任务,那么一个 async 运行时可能更适合您)

概述

本库提供三种主要的方式进行结构化并发

  • [background][background()],这是一个在后台线程上运行闭包的简单方法。
  • WorkerPromise,它们允许构建任意管道计算图,并从拥有线程接收工作包进行处理。
  • reader::Reader,一个从可取消流中读取并处理或转发结果的背景线程。

Worker 和 Promise

Worker

Worker 是围绕操作系统级别的线程的包装,它强制执行 结构化并发:即并发操作应该像其他控制流结构一样进行组织。当 Worker 被丢弃时,底层线程将被通知退出并加入。如果线程崩溃,崩溃将被转发到丢弃 Worker 的线程。

这些“拥有线程”确保在完成并发操作后不会留下任何过时的线程。转发 Worker 的崩溃确保启动计算(通过生成 Worker)的代码将被正确地销毁,就像它直接执行计算一样,而不是生成一个 Worker 来执行。

Worker使用基于消息的接口,类似于actor。它们不使用用户编写的处理循环,而是接收用户定义类型的一些消息。这鼓励将使用Worker的代码视为数据处理管道:启动Worker的代码需要向其提交输入数据,然后这些数据可以经过转换并传递到其他地方。

承诺

Promise提供了一种机制,用于将计算结果传回启动它的代码或处理管道的下一部分。一旦计算完成,其结果可以通过Promise::fulfill提交,并且持有相应PromiseHandle的线程可以检索它。

使用

单个使用Promise来返回结果的Worker

use pawawwewism::{Worker, Promise, promise};

let mut worker = Worker::builder().spawn(|(input, promise): (i32, Promise<i32>)| {
    println!("Doing heavy task...");
    let output = input + 1;
    promise.fulfill(output);
}).unwrap();

let (promise, handle) = promise();
worker.send((1, promise));

// <do other work concurrently>

let output = handle.block().expect("worker has dropped the promise; this should be impossible");
assert_eq!(output, 2);

可以将多个Worker线程链接起来以管道化计算

use std::collections::VecDeque;
use pawawwewism::{Worker, Promise, PromiseHandle, promise};

// This worker is identical to the one in the first example
let mut worker1 = Worker::builder().spawn(|(input, promise): (i32, Promise<i32>)| {
    println!("Doing heavy task 1...");
    let output = input + 1;
    promise.fulfill(output);
}).unwrap();

// The second worker is passed a `PromiseHandle` instead of a direct value
let mut next = 1;
let mut worker2 = Worker::builder().spawn(move |handle: PromiseHandle<i32>| {
    let input = handle.block().unwrap();
    assert_eq!(input, next);
    next += 1;
}).unwrap();

for input in [0,1,2,3] {
    let (promise1, handle1) = promise();
    worker1.send((input, promise1));
    // On the second iteration and later, this `send` will give `worker1` work to do, while
    // `worker2` still processes the previous element, achieving pipelining.

    worker2.send(handle1);
}

依赖关系

~435KB