7 个版本
0.1.1 | 2023 年 8 月 21 日 |
---|---|
0.1.0 | 2023 年 7 月 1 日 |
0.0.6-alpha | 2023 年 6 月 1 日 |
0.0.5-alpha | 2023 年 5 月 9 日 |
0.0.2-alpha | 2023 年 4 月 30 日 |
#220 在 并发
在 2 crates 中使用
58KB
1K SLoC
command-executor
Rust 中的线程池实现
命令执行器是 Rust 中的一种多生产者/多消费者线程池实现,它提供了一个具有背压功能的并发命令执行接口。此实现的的主要目标是:
- 控制内存消耗 - 通过使用有界阻塞队列实现
- 指示工作完成 - 通过“所有工作完成后关闭”实现
- 保持可预测的执行模型 - 命令按 FIFO 顺序提交执行,具有背压,直到所有工作完成。请注意,实际执行顺序取决于系统调度程序,但是,您可以假设执行线程将在 'n' 命令出队后出队
n + 1
命令。
用例
此并行方法适用于处理非常大的数据集,这些数据集无法装入内存,或者内存消耗必须在指定的范围内。当数据集可以装入内存或内存消耗不是问题的情况下,通常使用 Rayon 或其他使用非阻塞队列的数据并行库会得到更好的结果。例如,将一个大型且非常密集的 Protobuf 文件(约 67 GB 的排序数据)转换为约 1 TB 的 pg_dump 文件,以便加载到 PostgreSQL 数据库中。使用此库可以创建一个管道,并行化 Protobuf 解码和 pg_dump 编码,并最终写入合并的结果,同时保持初始顺序并控制内存大小。
问题
欢迎并感谢问题提交。请提交至 https://github.com/navigatorsguild/command-executor/issues
基准测试
基准测试 由 benchmark-rs 生成
示例
提交命令以执行并等待完成
use std::thread;
use std::time::Duration;
use command_executor::command::Command;
use command_executor::shutdown_mode::ShutdownMode;
use command_executor::thread_pool_builder::ThreadPoolBuilder;
struct ExampleCommand {
payload: i32,
}
impl ExampleCommand {
pub fn new(payload: i32) -> ExampleCommand {
ExampleCommand {
payload,
}
}
}
impl Command for ExampleCommand {
fn execute(&self) -> Result<(), anyhow::Error> {
println!("processing {} in {}", self.payload, thread::current().name().unwrap_or("unnamed"));
thread::sleep(Duration::from_millis(10));
Ok(())
}
}
pub fn main() -> Result<(), anyhow::Error> {
let mut thread_pool_builder = ThreadPoolBuilder::new();
let mut tp = thread_pool_builder
.with_name("example".to_string())
.with_tasks(4)
.with_queue_size(16)
.with_shutdown_mode(ShutdownMode::CompletePending)
.build()
.unwrap();
for i in 0..16 {
tp.submit(Box::new(ExampleCommand::new(i)));
}
tp.shutdown();
tp.join()
}
在线程池的所有线程中安装线程局部值
use std::thread;
use std::time::Duration;
use std::cell::RefCell;
use command_executor::command::Command;
use command_executor::shutdown_mode::ShutdownMode;
use command_executor::thread_pool_builder::ThreadPoolBuilder;
#[derive(Clone)]
struct Config {
sleep_time: u64,
}
impl Default for Config {
fn default() -> Self {
Config {
sleep_time: 1,
}
}
}
thread_local! {
static THREAD_LOCAL_CONFIG: RefCell<Option<Config>> = RefCell::new(None);
}
struct ThreadLocalExampleCommand {
payload: i32,
}
impl ThreadLocalExampleCommand {
pub fn new(payload: i32) -> ThreadLocalExampleCommand {
ThreadLocalExampleCommand { payload }
}
}
impl Command for ThreadLocalExampleCommand {
fn execute(&self) -> Result<(), anyhow::Error> {
THREAD_LOCAL_CONFIG.with(
|config| {
let sleep_time = config.borrow().as_ref().unwrap().sleep_time;
thread::sleep(Duration::from_millis(sleep_time));
println!(
"processing {} in {}",
self.payload,
thread::current().name().unwrap_or("unnamed")
);
}
);
Ok(())
}
}
fn main() -> Result<(), anyhow::Error> {
let mut thread_pool_builder = ThreadPoolBuilder::new();
let mut tp = thread_pool_builder
.with_name("example".to_string())
.with_tasks(4)
.with_queue_size(16)
.with_shutdown_mode(ShutdownMode::CompletePending)
.build()
.unwrap();
tp.set_thread_local(&THREAD_LOCAL_CONFIG, Some(Config::default()));
for i in 0..16 {
tp.submit(Box::new(ThreadLocalExampleCommand::new(i)));
}
tp.shutdown();
tp.join()
}
许可:MIT OR Apache-2.0
依赖项
~255KB