7 个版本

0.1.1 2023 年 8 月 21 日
0.1.0 2023 年 7 月 1 日
0.0.6-alpha2023 年 6 月 1 日
0.0.5-alpha2023 年 5 月 9 日
0.0.2-alpha2023 年 4 月 30 日

#220并发


2 crates 中使用

MIT/Apache

58KB
1K SLoC

Maintenance

command-executor

Rust 中的线程池实现

命令执行器是 Rust 中的一种多生产者/多消费者线程池实现,它提供了一个具有背压功能的并发命令执行接口。此实现的的主要目标是:

  • 控制内存消耗 - 通过使用有界阻塞队列实现
  • 指示工作完成 - 通过“所有工作完成后关闭”实现
  • 保持可预测的执行模型 - 命令按 FIFO 顺序提交执行,具有背压,直到所有工作完成。请注意,实际执行顺序取决于系统调度程序,但是,您可以假设执行线程将在 'n' 命令出队后出队 n + 1 命令。

用例

此并行方法适用于处理非常大的数据集,这些数据集无法装入内存,或者内存消耗必须在指定的范围内。当数据集可以装入内存或内存消耗不是问题的情况下,通常使用 Rayon 或其他使用非阻塞队列的数据并行库会得到更好的结果。例如,将一个大型且非常密集的 Protobuf 文件(约 67 GB 的排序数据)转换为约 1 TB 的 pg_dump 文件,以便加载到 PostgreSQL 数据库中。使用此库可以创建一个管道,并行化 Protobuf 解码和 pg_dump 编码,并最终写入合并的结果,同时保持初始顺序并控制内存大小。

问题

欢迎并感谢问题提交。请提交至 https://github.com/navigatorsguild/command-executor/issues

基准测试

基准测试benchmark-rs 生成

link

示例

提交命令以执行并等待完成

use std::thread;
use std::time::Duration;
use command_executor::command::Command;
use command_executor::shutdown_mode::ShutdownMode;
use command_executor::thread_pool_builder::ThreadPoolBuilder;

struct ExampleCommand {
   payload: i32,
}

impl ExampleCommand {
   pub fn new(payload: i32) -> ExampleCommand {
       ExampleCommand {
           payload,
       }
   }
}

impl Command for ExampleCommand {
   fn execute(&self) -> Result<(), anyhow::Error> {
       println!("processing {} in {}", self.payload, thread::current().name().unwrap_or("unnamed"));
       thread::sleep(Duration::from_millis(10));
       Ok(())
   }
}

pub fn main() -> Result<(), anyhow::Error> {
   let mut thread_pool_builder = ThreadPoolBuilder::new();
   let mut tp = thread_pool_builder
       .with_name("example".to_string())
       .with_tasks(4)
       .with_queue_size(16)
       .with_shutdown_mode(ShutdownMode::CompletePending)
       .build()
       .unwrap();

   for i in 0..16 {
       tp.submit(Box::new(ExampleCommand::new(i)));
   }

   tp.shutdown();
   tp.join()
}

在线程池的所有线程中安装线程局部值

use std::thread;
use std::time::Duration;
use std::cell::RefCell;
use command_executor::command::Command;
use command_executor::shutdown_mode::ShutdownMode;
use command_executor::thread_pool_builder::ThreadPoolBuilder;

#[derive(Clone)]
struct Config {
   sleep_time: u64,
}

impl Default for Config {
   fn default() -> Self {
       Config {
           sleep_time: 1,
       }
   }
}

thread_local! {
   static THREAD_LOCAL_CONFIG: RefCell<Option<Config>> = RefCell::new(None);
}

struct ThreadLocalExampleCommand {
   payload: i32,
}

impl ThreadLocalExampleCommand {
   pub fn new(payload: i32) -> ThreadLocalExampleCommand {
       ThreadLocalExampleCommand { payload }
   }
}

impl Command for ThreadLocalExampleCommand {
   fn execute(&self) -> Result<(), anyhow::Error> {
       THREAD_LOCAL_CONFIG.with(
           |config| {
               let sleep_time = config.borrow().as_ref().unwrap().sleep_time;
               thread::sleep(Duration::from_millis(sleep_time));
               println!(
                   "processing {} in {}",
                   self.payload,
                   thread::current().name().unwrap_or("unnamed")
               );
           }
       );
       Ok(())
   }
}

fn main() -> Result<(), anyhow::Error> {
   let mut thread_pool_builder = ThreadPoolBuilder::new();
   let mut tp = thread_pool_builder
       .with_name("example".to_string())
       .with_tasks(4)
       .with_queue_size(16)
       .with_shutdown_mode(ShutdownMode::CompletePending)
       .build()
       .unwrap();

   tp.set_thread_local(&THREAD_LOCAL_CONFIG, Some(Config::default()));

   for i in 0..16 {
       tp.submit(Box::new(ThreadLocalExampleCommand::new(i)));
   }

   tp.shutdown();
   tp.join()
}

许可:MIT OR Apache-2.0

依赖项

~255KB