3个不稳定版本
0.2.1 | 2022年6月4日 |
---|---|
0.2.0 | 2022年6月3日 |
0.1.0 | 2022年6月2日 |
#930 in 并发
每月21次下载
35KB
572 行
worker-pool
一个Rust crate,用于处理一组工作线程,这些线程需要将结果回传给主线程。
此crate实现了一个actor模型的专业化,具有以下属性:
- 有一个管理者,多个工作者
- 工作者可以无限期工作
- 工作者可以向管理者发送消息;这些消息不能丢失
- 管理者可以向工作者广播消息
- 管理者可以随时要求工作者停止工作,在这种情况下,系统应尽快达到正常状态
- 消息队列可以限制其长度
- 没有死锁¹和活锁
¹: 除非在文档中指定,并且假设每个工作者都处理了Stop
消息并在一段时间后停止
安装和使用
将此库添加到您的Cargo.toml
worker-pool = "0.1.0"
然后,创建一个worker_pool::WorkerPool
的新实例
use std::time::Duration;
use worker_pool::{WorkerPool, DownMsg};
// Here, `u64` is the type of the messages from the workers to the manager
// `()` is the type of the messages from the manager to the workers
// `32` is the maximum length of the message queue
let pool: WorkerPool<u64, ()> = WorkerPool::new(32);
要启动新线程,请使用WorkerPool::execute
或WorkerPool::execute_many
fn is_prime(n: u64) -> bool {
for i in 2..n {
if n % i == 0 {
return false
}
}
true
}
// Start a new thread, that will communicate with the manager
pool.execute(|tx, rx| {
tx.send(2).unwrap();
let mut n = 3;
loop {
match rx.try_recv() {
Ok(DownMsg::Stop) => break,
_ => {}
}
while !is_prime(n) {
n += 2;
}
tx.send(n).unwrap(); // The program may block here; this will not cause a deadlock
}
});
然后,您可以使用WorkerPool
的不同方法与线程交互,特别是recv_burst
、broadcast
和stop
// Give the worker(s) time to boot up
std::thread::sleep(Duration::new(0, 100_000_000));
// The iterator returned by pool.recv_burst() only yields messages received before it was created and is non-blocking,
// so no livelock or deadlock can occur when using it.
for x in pool.recv_burst() {
println!("{}", x);
std::thread::sleep(Duration::new(0, 10_000_000));
}
// `pool.stop()` also returns an iterator, which will send a Stop message
// and gather the worker's messages until they all finish. No deadlock can occur here,
// as long as the threads' only shared, critical resource are the worker/manager channels
// provided by WorkerPool and the threads eventually respond to the `Stop` message.
for x in pool.stop() {
println!("{}", x);
}
许可证
此项目受MIT许可证和Apache v2.0许可证的双重许可。您可以在使用此库时选择其中之一。
对此存储库的任何贡献都必须在两种许可证下提供。