5 个版本
0.1.4 | 2023年7月25日 |
---|---|
0.1.3 | 2021年4月17日 |
0.1.2 | 2021年4月7日 |
0.1.1 | 2021年3月21日 |
0.1.0 | 2021年3月4日 |
#232 在 并发
每月 632 次下载
44KB
777 行
work-queue
用于构建调度器的并发工作窃取队列。
示例
在线程池中分发一些任务
use work_queue::{Queue, LocalQueue};
struct Task(Box<dyn Fn(&mut LocalQueue<Task>) + Send>);
let threads = 4;
let queue: Queue<Task> = Queue::new(threads, 128);
// Push some tasks to the queue.
for _ in 0..500 {
queue.push(Task(Box::new(|local| {
do_work();
local.push(Task(Box::new(|_| do_work())));
local.push(Task(Box::new(|_| do_work())));
})));
}
// Spawn threads to complete the tasks.
let handles: Vec<_> = queue
.local_queues()
.map(|mut local_queue| {
std::thread::spawn(move || {
while let Some(task) = local_queue.pop() {
task.0(&mut local_queue);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
与 crossbeam-deque 的比较
此 crate 的目的是与 crossbeam-deque
相似,它也提供了并发工作窃取队列。但是有一些明显的区别
- 此 crate 更高级 - 工作窃取在调用
pop
时自动执行,而不是您必须手动调用它。 - 因此,我们不像
crossbeam-deque
那样支持太多的自定义 - 但是算法本身可以优化得更好。 - 队列支持固定数量的本地队列,并且这个数量不能增长。
- 每个本地队列都有一个固定的容量,与
crossbeam-deque
不同,后者支持本地队列的增长。这使得我们的本地队列更快。
实现
此 crate 的队列实现基于 Tokio 的当前调度器。想法是每个线程持有一个固定容量的本地队列,还有一个所有线程都可以访问的无界全局队列。在一般情况下,每个工作线程只会与其本地队列交互,避免大量的同步 - 但是如果一个工作线程的工作量比另一个工作线程少得多,它将通过工作窃取均匀分配。
此外,每个本地队列存储一个 不可窃取的 LIFO 槽,以优化消息传递模式,因此如果一个任务创建了另一个任务,该创建的任务将立即被轮询,而不是在它到达本地队列前端时才被轮询得晚得多。
测试
- 使用
cargo test
正常测试 - 使用 Miri 进行测试
cargo +nightly miri test
- 使用ThreadSanitizer进行测试,命令如下:
RUSTFLAGS="-Zsanitizer=thread --cfg tsan" cargo +nightly test --tests -Zbuild-std --target={你的目标三元组}
- 使用Loom进行测试,命令如下:
RUSTFLAGS="--cfg loom" cargo test --tests --release
许可证
MIT或Apache-2.0
依赖
~0.1–26MB
~330K SLoC