2 个版本
0.2.1 | 2023年5月5日 |
---|---|
0.2.0 | 2023年5月5日 |
0.1.3 |
|
#581 在 并发
33 每月下载次数
9KB
93 行
Workerpool
一个简单的 Rust 工作池实现,使用通道来同步作业。它可以启动固定数量的工作线程,等待作业队列消费。
- 使用
use workerpool_rs::pool::WorkerPool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
let n_workers = 4;
let n_jobs = 8;
let pool = WorkerPool::new(n_workers);
let (tx, rx) = channel();
let atx = Arc::new(Mutex::new(tx));
for _ in 0..n_jobs {
let atx = atx.clone();
pool.execute(move|| {
let tx = atx.lock().unwrap();
tx.send(1).expect("channel will be there waiting for the pool");
});
}
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
- 测试
$ cargo test
lib.rs
:
工作池
此模块包含用于处理并发任务的构造。它可以启动任意数量的工作线程,并将它们与其他通道同步。
示例
与其他通道同步
use workerpool_rs::pool::WorkerPool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
let n_workers = 4;
let n_jobs = 8;
let pool = WorkerPool::new(n_workers);
let (tx, rx) = channel();
let atx = Arc::new(Mutex::new(tx));
for _ in 0..n_jobs {
let atx = atx.clone();
pool.execute(move|| {
let tx = atx.lock().unwrap();
tx.send(1).expect("channel will be there waiting for the pool");
});
}
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
与 Barrier 同步
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use workerpool_rs::pool::WorkerPool;
let n_workers = 42;
let n_jobs = 23;
let pool = WorkerPool::new(n_workers);
let an_atomic = Arc::new(AtomicUsize::new(0));
assert!(n_jobs <= n_workers, "too many jobs, will deadlock");
let barrier = Arc::new(Barrier::new(n_jobs + 1));
for _ in 0..n_jobs {
let barrier = barrier.clone();
let an_atomic = an_atomic.clone();
pool.execute(move|| {
// do the heavy work
an_atomic.fetch_add(1, Ordering::Relaxed);
// then wait for the other threads
barrier.wait();
});
}
barrier.wait();
assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);