3 个不稳定版本
0.2.0 | 2021 年 3 月 31 日 |
---|---|
0.1.1 | 2017 年 8 月 31 日 |
0.1.0 | 2017 年 8 月 30 日 |
883 在 并发 中排名
每月下载量 199 次
17KB
120 行
workctl
workctl
为控制并发/并行程序提供了一套高级抽象。这些抽象特别关注“控制器/工作线程”范式,其中一到几个“控制器”线程决定需要完成的工作,并使用 WorkQueues
和 SyncFlags
与多个“工作线程”进行通信。
lib.rs
:
workctl
为控制并发/并行程序提供了一套高级抽象。这些抽象特别关注“控制器/工作线程”范式,其中一到几个“控制器”线程决定需要完成的工作,并使用 WorkQueue
和 SyncFlag
与多个“工作线程”进行通信。
workctl
的级别低于如 rayon 这样的 crates,但提供了比标准库中可用的原语更抽象的接口。
示例
以下是一个典型的示例,使用了一个 WorkQueue
、一个 SyncFlag
和一个 std::sync::mpsc
。这个示例比仅处理数字列表所需复杂,但它说明了原理。在查看此示例时,想象你可能
- 有一种机制,让一些工作线程可以添加新的工作,或者
- 控制线程(或另一个线程)期望无限期地生成工作,例如,在一个服务器中。
然后可以在任何未来的时间使用 SyncFlag
来优雅地关闭所有工作线程,例如当控制器收到 SIGTERM
时。
use std::thread;
use workctl::{WorkQueue, new_syncflag};
// Create a new work queue to schedule pieces of work; in this case, i32s.
// The type annotation is not strictly needed.
let mut queue: WorkQueue<i32> = WorkQueue::new();
// Create a channel for the worker threads to send messages back on.
use std::sync::mpsc::channel;
let (results_tx, results_rx) = channel();
// Create a SyncFlag to share whether or not the worker threads should
// keep waiting on jobs.
let (mut more_jobs_tx, more_jobs_rx) = new_syncflag(true);
// This Vec is just for the controller to keep track of the worker threads.
let mut thread_handles = Vec::new();
// Spawn 4 workers.
for _ in 0..4 {
// Create clones of the various control mechanisms for the new thread.
let mut t_queue = queue.clone();
let t_results_tx = results_tx.clone();
let t_more_jobs = more_jobs_rx.clone();
let handle = thread::spawn(move || {
// Loop until the controller says to stop.
while let Some(work_input) = t_queue.wait(&t_more_jobs) {
// Do some work. Totally contrived in this case.
let result = work_input % 1024;
// Send the results of the work to the main thread.
t_results_tx.send((work_input, result)).unwrap();
}
});
// Add the handle to the vec of handles
thread_handles.push(handle);
}
// Put some work in the queue.
let mut total_work = 0;
for _ in 0..10 {
queue.push_work(1023);
total_work += 1;
}
for _ in 0..10 {
queue.push_work(1024);
total_work += 1;
}
// Now, receive all the results.
let mut results = Vec::new();
while total_work > 0 {
// In reality, you'd do something with these results.
let r = results_rx.recv().unwrap();
total_work -= 1;
results.push(r);
}
// All the work is done, so tell the workers to stop looking for work.
more_jobs_tx.set(false);
// Join all the threads.
for thread_handle in thread_handles {
thread_handle.join().unwrap();
}
assert_eq!(results.len(), 20);