#worker-thread #thread-pool #thread #worker #pool #parallelism #threading

workerpool

一个用于在固定数量的有状态工作线程上运行多个作业的线程池

9 个稳定版本

1.2.1 2024年1月13日
1.2.0 2019年2月18日
1.1.1 2017年11月30日
1.0.4 2017年11月24日
1.0.1 2017年11月23日

#56 in 并发

Download history 56/week @ 2024-04-20 67/week @ 2024-04-27 71/week @ 2024-05-04 121/week @ 2024-05-11 213/week @ 2024-05-18 89/week @ 2024-05-25 80/week @ 2024-06-01 27/week @ 2024-06-08 93/week @ 2024-06-15 72/week @ 2024-06-22 40/week @ 2024-06-29 31/week @ 2024-07-06 75/week @ 2024-07-13 79/week @ 2024-07-20 178/week @ 2024-07-27 223/week @ 2024-08-03

556 每月下载量
用于 cargo-test-all

MIT/Apache

57KB
743

workerpool

CI crates.io docs.rs

一个用于并行执行多个作业的线程池,这些作业在状态化的工作线程上运行。它启动指定数量的工作线程,并在任何工作线程崩溃时补充线程池。

单个 Worker 在自己的线程中运行,需要根据特实现

pub trait Worker : Default {
    type Input: Send;
    type Output: Send;

    fn execute(&mut self, Self::Input) -> Self::Output;
}

使用方法

[dependencies]
workerpool = "1.2"

要使用 crossbeam 的通道 而不是 std::sync::mpsc,请启用 crossbeam 功能

[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }

此 crate 提供 Pool<W> where W: Worker。使用线程池,有四个主要的函数

  • Pool::<MyWorker>::new(n_threads) 为特定的 Worker 创建一个新的池。
  • pool.execute(inp) 非阻塞 执行工作线程并忽略返回值。
  • pool.execute_to(tx, inp) 非阻塞 执行工作线程并将返回值发送到指定的 Sender
  • pool.join() 阻塞 等待所有任务(来自 executeexecute_to)完成。

workerpool::thunk中提供了一个工作线程,它是一个无状态的ThunkWorker<T>。它接受Thunk<T>类型的输入,实际上是无参数函数,它们是Sized + Send。这些thunks是通过使用Thunk::of包装返回类型为T的函数来创建的。

use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        pool.execute_to(tx.clone(), Thunk::of(move || i * i));
    }
    
    assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}

对于有状态的线程池,您需要自己实现Worker

假设有一个按行分隔的进程,如cattr,您希望它在多个线程上运行,以便像线程池一样使用。您可以使用以下方式创建并使用一个工作线程,以维护进程的stdin/stdout状态:

use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;

struct LineDelimitedProcess {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
    fn default() -> Self {
        let child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        Self {
            stdin: child.stdin.unwrap(),
            stdout: BufReader::new(child.stdout.unwrap()),
        }
    }
}
impl Worker for LineDelimitedProcess {
    type Input = Box<[u8]>;
    type Output = io::Result<String>;

    fn execute(&mut self, inp: Self::Input) -> Self::Output {
        self.stdin.write_all(&*inp)?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<LineDelimitedProcess>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        let inp = Box::new([97 + i]);
        pool.execute_to(tx.clone(), inp);
    }
    
    // output is a permutation of "abcdefgh"
    let mut output = rx.iter()
        .take(n_jobs as usize)
        .fold(String::new(), |mut a, b| {
            a.push_str(&b.unwrap());
            a
        })
        .into_bytes();
    output.sort();
    assert_eq!(output, b"abcdefgh");
}

类似库

许可证

本作品是threadpool的衍生作品。

根据您的选择,许可如下:

贡献

除非您明确声明,否则根据Apache-2.0许可证定义,您有意提交的任何贡献,旨在包含在作品中,应如上所述双许可,不附加任何额外条款或条件。

依赖关系

~0.4–5.5MB
~13K SLoC