#worker-thread #scheduling #parallelism #schedule

workctl

提供了一组高级控制机制,用于并发工作调度,这些机制建立在标准库并发原语之上

3 个不稳定版本

0.2.0 2021 年 3 月 31 日
0.1.1 2017 年 8 月 31 日
0.1.0 2017 年 8 月 30 日

883并发 中排名

Download history 1/week @ 2024-04-07 80/week @ 2024-04-21 53/week @ 2024-04-28 21/week @ 2024-05-05 13/week @ 2024-05-19 74/week @ 2024-05-26 40/week @ 2024-06-02 50/week @ 2024-06-09 146/week @ 2024-06-16 39/week @ 2024-06-23 77/week @ 2024-07-07 32/week @ 2024-07-14 89/week @ 2024-07-21

每月下载量 199

MIT 许可证

17KB
120

workctl

Information on crates.io Documentation on docs.rs License: MIT

workctl 为控制并发/并行程序提供了一套高级抽象。这些抽象特别关注“控制器/工作线程”范式,其中一到几个“控制器”线程决定需要完成的工作,并使用 WorkQueuesSyncFlags 与多个“工作线程”进行通信。


lib.rs:

workctl 为控制并发/并行程序提供了一套高级抽象。这些抽象特别关注“控制器/工作线程”范式,其中一到几个“控制器”线程决定需要完成的工作,并使用 WorkQueueSyncFlag 与多个“工作线程”进行通信。

workctl 的级别低于如 rayon 这样的 crates,但提供了比标准库中可用的原语更抽象的接口。

示例

以下是一个典型的示例,使用了一个 WorkQueue、一个 SyncFlag 和一个 std::sync::mpsc。这个示例比仅处理数字列表所需复杂,但它说明了原理。在查看此示例时,想象你可能

  • 有一种机制,让一些工作线程可以添加新的工作,或者
  • 控制线程(或另一个线程)期望无限期地生成工作,例如,在一个服务器中。

然后可以在任何未来的时间使用 SyncFlag 来优雅地关闭所有工作线程,例如当控制器收到 SIGTERM 时。

use std::thread;
use workctl::{WorkQueue, new_syncflag};

// Create a new work queue to schedule pieces of work; in this case, i32s.
// The type annotation is not strictly needed.
let mut queue: WorkQueue<i32> = WorkQueue::new();

// Create a channel for the worker threads to send messages back on.
use std::sync::mpsc::channel;
let (results_tx, results_rx) = channel();

// Create a SyncFlag to share whether or not the worker threads should
// keep waiting on jobs.
let (mut more_jobs_tx, more_jobs_rx) = new_syncflag(true);

// This Vec is just for the controller to keep track of the worker threads.
let mut thread_handles = Vec::new();

// Spawn 4 workers.
for _ in 0..4 {
    // Create clones of the various control mechanisms for the new thread.
    let mut t_queue = queue.clone();
    let t_results_tx = results_tx.clone();
    let t_more_jobs = more_jobs_rx.clone();

    let handle = thread::spawn(move || {
        // Loop until the controller says to stop.
        while let Some(work_input) = t_queue.wait(&t_more_jobs) {
            // Do some work. Totally contrived in this case.
            let result = work_input % 1024;
            // Send the results of the work to the main thread.
            t_results_tx.send((work_input, result)).unwrap();
        }
    });

    // Add the handle to the vec of handles
    thread_handles.push(handle);
}

// Put some work in the queue.
let mut total_work = 0;
for _ in 0..10 {
    queue.push_work(1023);
    total_work += 1;
}

for _ in 0..10 {
    queue.push_work(1024);
    total_work += 1;
}


// Now, receive all the results.
let mut results = Vec::new();
while total_work > 0 {
    // In reality, you'd do something with these results.
    let r = results_rx.recv().unwrap();
    total_work -= 1;
    results.push(r);
}



// All the work is done, so tell the workers to stop looking for work.
more_jobs_tx.set(false);

// Join all the threads.
for thread_handle in thread_handles {
    thread_handle.join().unwrap();
}

assert_eq!(results.len(), 20);

无运行时依赖