2 个版本

0.2.1 2023年5月5日
0.2.0 2023年5月5日
0.1.3 2020年10月11日

#581并发

33 每月下载次数

MIT 许可证

9KB
93

Workerpool

一个简单的 Rust 工作池实现,使用通道来同步作业。它可以启动固定数量的工作线程,等待作业队列消费。

  • 使用
 use workerpool_rs::pool::WorkerPool;
 use std::sync::mpsc::channel;
 use std::sync::{Arc, Mutex};

 let n_workers = 4;
 let n_jobs = 8;
 let pool = WorkerPool::new(n_workers);

 let (tx, rx) = channel();
 let atx = Arc::new(Mutex::new(tx));
 for _ in 0..n_jobs {
     let atx = atx.clone();
     pool.execute(move|| {
         let tx = atx.lock().unwrap();
         tx.send(1).expect("channel will be there waiting for the pool");
     });
 }

 assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
  • 测试
$ cargo test

lib.rs:

工作池

此模块包含用于处理并发任务的构造。它可以启动任意数量的工作线程,并将它们与其他通道同步。

示例

与其他通道同步

use workerpool_rs::pool::WorkerPool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};

let n_workers = 4;
let n_jobs = 8;
let pool = WorkerPool::new(n_workers);

let (tx, rx) = channel();
let atx = Arc::new(Mutex::new(tx));
for _ in 0..n_jobs {
    let atx = atx.clone();
    pool.execute(move|| {
        let tx = atx.lock().unwrap();
        tx.send(1).expect("channel will be there waiting for the pool");
    });
}

assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);

与 Barrier 同步


 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Barrier};
 use workerpool_rs::pool::WorkerPool;

 let n_workers = 42;
 let n_jobs = 23;
 let pool = WorkerPool::new(n_workers);
 let an_atomic = Arc::new(AtomicUsize::new(0));

 assert!(n_jobs <= n_workers, "too many jobs, will deadlock");

 let barrier = Arc::new(Barrier::new(n_jobs + 1));
 for _ in 0..n_jobs {
     let barrier = barrier.clone();
     let an_atomic = an_atomic.clone();

     pool.execute(move|| {
         // do the heavy work
         an_atomic.fetch_add(1, Ordering::Relaxed);

         // then wait for the other threads
         barrier.wait();
     });
 }

 barrier.wait();
 assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);

无运行时依赖