#thread-pool #worker-thread #thread #actor-model #pool #actors

nightly worker-pool

一个Rust crate,用于处理一组工作线程,这些线程需要将结果回传给主线程。

3个不稳定版本

0.2.1 2022年6月4日
0.2.0 2022年6月3日
0.1.0 2022年6月2日

#930 in 并发

每月21次下载

MIT/Apache

35KB
572

worker-pool

一个Rust crate,用于处理一组工作线程,这些线程需要将结果回传给主线程。

此crate实现了一个actor模型的专业化,具有以下属性:

  • 有一个管理者,多个工作者
  • 工作者可以无限期工作
  • 工作者可以向管理者发送消息;这些消息不能丢失
  • 管理者可以向工作者广播消息
  • 管理者可以随时要求工作者停止工作,在这种情况下,系统应尽快达到正常状态
  • 消息队列可以限制其长度
  • 没有死锁¹和活锁

¹: 除非在文档中指定,并且假设每个工作者都处理了Stop消息并在一段时间后停止

安装和使用

将此库添加到您的Cargo.toml

worker-pool = "0.1.0"

然后,创建一个worker_pool::WorkerPool的新实例

use std::time::Duration;
use worker_pool::{WorkerPool, DownMsg};

// Here, `u64` is the type of the messages from the workers to the manager
// `()` is the type of the messages from the manager to the workers
// `32` is the maximum length of the message queue
let pool: WorkerPool<u64, ()> = WorkerPool::new(32);

要启动新线程,请使用WorkerPool::executeWorkerPool::execute_many

fn is_prime(n: u64) -> bool {
    for i in 2..n {
        if n % i == 0 {
            return false
        }
    }
    true
}

// Start a new thread, that will communicate with the manager
pool.execute(|tx, rx| {
    tx.send(2).unwrap();
    let mut n = 3;
    loop {
        match rx.try_recv() {
            Ok(DownMsg::Stop) => break,
            _ => {}
        }

        while !is_prime(n) {
            n += 2;
        }

        tx.send(n).unwrap(); // The program may block here; this will not cause a deadlock
    }
});

然后,您可以使用WorkerPool的不同方法与线程交互,特别是recv_burstbroadcaststop

// Give the worker(s) time to boot up
std::thread::sleep(Duration::new(0, 100_000_000));

// The iterator returned by pool.recv_burst() only yields messages received before it was created and is non-blocking,
// so no livelock or deadlock can occur when using it.
for x in pool.recv_burst() {
    println!("{}", x);
    std::thread::sleep(Duration::new(0, 10_000_000));
}

// `pool.stop()` also returns an iterator, which will send a Stop message
// and gather the worker's messages until they all finish. No deadlock can occur here,
// as long as the threads' only shared, critical resource are the worker/manager channels
// provided by WorkerPool and the threads eventually respond to the `Stop` message.
for x in pool.stop() {
    println!("{}", x);
}

许可证

此项目受MIT许可证和Apache v2.0许可证的双重许可。您可以在使用此库时选择其中之一。

对此存储库的任何贡献都必须在两种许可证下提供。

无运行时依赖