#thread-pool #queue #scheduler #lock-free #tasks #building

work-queue

用于构建调度器的并发工作窃取队列

5 个版本

0.1.4 2023年7月25日
0.1.3 2021年4月17日
0.1.2 2021年4月7日
0.1.1 2021年3月21日
0.1.0 2021年3月4日

#232并发

Download history 40/week @ 2024-03-13 8/week @ 2024-03-20 19/week @ 2024-03-27 37/week @ 2024-04-03 35/week @ 2024-04-10 22/week @ 2024-04-17 35/week @ 2024-04-24 9/week @ 2024-05-01 11/week @ 2024-05-08 237/week @ 2024-05-15 62/week @ 2024-05-22 215/week @ 2024-05-29 212/week @ 2024-06-05 116/week @ 2024-06-12 113/week @ 2024-06-19 145/week @ 2024-06-26

每月 632 次下载

MIT/Apache

44KB
777

work-queue

用于构建调度器的并发工作窃取队列。

示例

在线程池中分发一些任务

use work_queue::{Queue, LocalQueue};

struct Task(Box<dyn Fn(&mut LocalQueue<Task>) + Send>);

let threads = 4;

let queue: Queue<Task> = Queue::new(threads, 128);

// Push some tasks to the queue.
for _ in 0..500 {
    queue.push(Task(Box::new(|local| {
        do_work();

        local.push(Task(Box::new(|_| do_work())));
        local.push(Task(Box::new(|_| do_work())));
    })));
}

// Spawn threads to complete the tasks.
let handles: Vec<_> = queue
    .local_queues()
    .map(|mut local_queue| {
        std::thread::spawn(move || {
            while let Some(task) = local_queue.pop() {
                task.0(&mut local_queue);
            }
        })
    })
    .collect();

for handle in handles {
    handle.join().unwrap();
}

与 crossbeam-deque 的比较

此 crate 的目的是与 crossbeam-deque 相似,它也提供了并发工作窃取队列。但是有一些明显的区别

  • 此 crate 更高级 - 工作窃取在调用 pop 时自动执行,而不是您必须手动调用它。
  • 因此,我们不像 crossbeam-deque 那样支持太多的自定义 - 但是算法本身可以优化得更好。
  • 队列支持固定数量的本地队列,并且这个数量不能增长。
  • 每个本地队列都有一个固定的容量,与 crossbeam-deque 不同,后者支持本地队列的增长。这使得我们的本地队列更快。

实现

此 crate 的队列实现基于 Tokio 的当前调度器。想法是每个线程持有一个固定容量的本地队列,还有一个所有线程都可以访问的无界全局队列。在一般情况下,每个工作线程只会与其本地队列交互,避免大量的同步 - 但是如果一个工作线程的工作量比另一个工作线程少得多,它将通过工作窃取均匀分配。

此外,每个本地队列存储一个 不可窃取的 LIFO 槽,以优化消息传递模式,因此如果一个任务创建了另一个任务,该创建的任务将立即被轮询,而不是在它到达本地队列前端时才被轮询得晚得多。

测试

  • 使用 cargo test 正常测试
  • 使用 Miri 进行测试 cargo +nightly miri test
  • 使用ThreadSanitizer进行测试,命令如下:RUSTFLAGS="-Zsanitizer=thread --cfg tsan" cargo +nightly test --tests -Zbuild-std --target={你的目标三元组}
  • 使用Loom进行测试,命令如下:RUSTFLAGS="--cfg loom" cargo test --tests --release

许可证

MIT或Apache-2.0

依赖

~0.1–26MB
~330K SLoC