9 个稳定版本
1.2.1 | 2024年1月13日 |
---|---|
1.2.0 | 2019年2月18日 |
1.1.1 | 2017年11月30日 |
1.0.4 | 2017年11月24日 |
1.0.1 | 2017年11月23日 |
#56 in 并发
556 每月下载量
用于 cargo-test-all
57KB
743 行
workerpool
一个用于并行执行多个作业的线程池,这些作业在状态化的工作线程上运行。它启动指定数量的工作线程,并在任何工作线程崩溃时补充线程池。
单个 Worker
在自己的线程中运行,需要根据特实现
pub trait Worker : Default {
type Input: Send;
type Output: Send;
fn execute(&mut self, Self::Input) -> Self::Output;
}
使用方法
[dependencies]
workerpool = "1.2"
要使用 crossbeam 的通道 而不是 std::sync::mpsc
,请启用 crossbeam
功能
[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }
此 crate 提供 Pool<W> where W: Worker
。使用线程池,有四个主要的函数
Pool::<MyWorker>::new(n_threads)
为特定的Worker
创建一个新的池。pool.execute(inp)
非阻塞 执行工作线程并忽略返回值。pool.execute_to(tx, inp)
非阻塞 执行工作线程并将返回值发送到指定的 Sender。pool.join()
阻塞 等待所有任务(来自execute
和execute_to
)完成。
在workerpool::thunk
中提供了一个工作线程,它是一个无状态的ThunkWorker<T>
。它接受Thunk<T>
类型的输入,实际上是无参数函数,它们是Sized + Send
。这些thunks是通过使用Thunk::of
包装返回类型为T
的函数来创建的。
use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
let (tx, rx) = channel();
for i in 0..n_jobs {
pool.execute_to(tx.clone(), Thunk::of(move || i * i));
}
assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}
对于有状态的线程池,您需要自己实现Worker
。
假设有一个按行分隔的进程,如cat
或tr
,您希望它在多个线程上运行,以便像线程池一样使用。您可以使用以下方式创建并使用一个工作线程,以维护进程的stdin/stdout状态:
use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;
struct LineDelimitedProcess {
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
fn default() -> Self {
let child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
Self {
stdin: child.stdin.unwrap(),
stdout: BufReader::new(child.stdout.unwrap()),
}
}
}
impl Worker for LineDelimitedProcess {
type Input = Box<[u8]>;
type Output = io::Result<String>;
fn execute(&mut self, inp: Self::Input) -> Self::Output {
self.stdin.write_all(&*inp)?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<LineDelimitedProcess>::new(n_workers);
let (tx, rx) = channel();
for i in 0..n_jobs {
let inp = Box::new([97 + i]);
pool.execute_to(tx.clone(), inp);
}
// output is a permutation of "abcdefgh"
let mut output = rx.iter()
.take(n_jobs as usize)
.fold(String::new(), |mut a, b| {
a.push_str(&b.unwrap());
a
})
.into_bytes();
output.sort();
assert_eq!(output, b"abcdefgh");
}
类似库
许可证
本作品是threadpool的衍生作品。
根据您的选择,许可如下:
- Apache License, Version 2.0, (LICENSE-APACHE或https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT或http://opensource.org/licenses/MIT)
。
贡献
除非您明确声明,否则根据Apache-2.0许可证定义,您有意提交的任何贡献,旨在包含在作品中,应如上所述双许可,不附加任何额外条款或条件。
依赖关系
~0.4–5.5MB
~13K SLoC