#thread-pool #pool #job #join #fork #jobs

jobsteal

用 Rust 编写的基于工作窃取的 Fork-Join 线程池

11 个版本

使用旧的 Rust 2015

0.5.1 2016 年 7 月 30 日
0.5.0 2016 年 6 月 20 日
0.4.3 2016 年 5 月 16 日
0.4.2 2016 年 3 月 6 日
0.1.0 2015 年 11 月 15 日

#728 in 并发

Download history 34/week @ 2024-03-11 25/week @ 2024-03-18 15/week @ 2024-03-25 58/week @ 2024-04-01 13/week @ 2024-04-08 29/week @ 2024-04-15 31/week @ 2024-04-22 29/week @ 2024-04-29 30/week @ 2024-05-06 26/week @ 2024-05-13 21/week @ 2024-05-20 14/week @ 2024-05-27 23/week @ 2024-06-03 19/week @ 2024-06-10 38/week @ 2024-06-17 37/week @ 2024-06-24

119 每月下载量
用于 3 crates

MIT/Apache 许可

83KB
1.5K SLoC

Jobsteal jobsteal

用 Rust 编写的基于工作窃取的 Fork-Join 线程池。它提供了低级 API,可以直接向线程池提交任务,还有一个名为 Spliterator 的高级并行迭代 API,与 Rust 的迭代器非常相似。

查看文档

示例

这是一个基本的示例,展示了使用工作池的简单方法。

use jobsteal::make_pool;
fn main() {
    // Build a pool with 4 threads, including this one.
    let mut pool = make_pool(4).unwrap();

    // spawn 100 jobs
    for i in 0..100 {
        // You can only submit jobs with a static lifetime this way.
        pool.submit(move || println!("Job {}", i));
    }
}

以下是一个更有用的示例,我们将向量拆分为块,并为每个部分提交一个作业。这使用了作用域功能。

use jobsteal::make_pool;

fn main(){
    // Build a pool
    let mut pool = make_pool(4).unwrap();

    let mut v = vec![0; 256];

    // Create a scoped spawner.
    pool.scope(|scope| {
        for chunk in v.chunks_mut(32) {

            // Jobs spawned by the scope are only allowed to access
            // data which strictly outlives the call to "scope".
            scope.submit(move || {
                for i in chunk { *i += 1 }
            });
        }

        for i in v {
            assert_eq!(i, 1);
        }
    });
    // all jobs within the scope are forced to complete before the scope function returns.

}

传递给 "scope" 闭包的创建者可以用来创建更多的作用域 - 如您所愿的嵌套。每个作业函数也可以通过使用 Spawner::recurse 来接收创建者,以递归地创建作业,这样就可以非常容易地递归地分割任务。递归工作分割通常会导致工作线程之间更好的工作分配。

然而,像上面示例中那样按顺序提交作业要容易得多。Jobsteal 的 Spliterator 让我们感觉像是在做那样的事情,实际上是在线程之间优化地分割工作!下面是如何做

use jobsteal::{make_pool, BorrowSpliteratorMut, Spliterator};

fn main() {
    let mut pool = make_pool(4).unwrap();

    let mut v = vec![0; 256];

    // iterate over the vector in parallel, incrementing each item by one.
    // the `for_each` function takes a spawner so it can dispatch jobs onto
    // the thread pool.
    v.split_iter_mut().for_each(&pool.spawner(), |i| *i += 1);

    for i in v {
        assert_eq!(i, 1);
    }
}

撤销安全

一个工作线程中的 panic 被设计为传播到主线程。但是,代码尚未经过安全审查,因此请尽量避免在作业中 panic。作业函数可能应该有一个 UnwindSafe 约束。这将需要 nightly,而且 UnwindSafe 也非常繁琐。

依赖项

~330–560KB