3 个版本
0.1.4 | 2022年2月18日 |
---|---|
0.1.3 | 2022年2月14日 |
0.1.2 | 2022年2月11日 |
#416 在 并发
30KB
520 行
ThreadLake
Thread lake 是一个高级的同构线程池管理器
非常适合将任务分解为相似的任务,并在线程之间进行并行执行
这个crate不仅仅会创建几个线程,它还能做到
为什么?
我发现大多数多线程任务都是相似的,每次我都在写相同的样板代码。使用 arcs 共享资源,利用可用的并行性以获取最佳线程数,所以我决定创建一个管理器类型来帮我完成这些。
Thread lake 不是什么
这绝对不是一个典型的线程池 crate。Thread lake 适用于特定类型的任务,这些任务可以分解为相同的小任务,并在线程之间共享。
它不如线程池通用,但可能(?)更快。
特性
- 根据可用的并发性启动
n
个线程 - 通过 Arc 自动将数据从主线程移动到启动的线程
- 从启动的线程向线程湖管理器发送消息
- 设置线程使用的播放/暂停/停止标志
- 在迭代器中收集线程的返回值
- 在线程连接后从湖中移除数据
- 将向量拆分为可变切片
使用方法
首先,我们创建一个湖,它启动 5 个线程,这些线程在屏幕上打印一条消息
let lake = Builder::new(5)
.spawn(|_: ThreadUtilities<_>| {
println!("Hello thread");
});
lake.join();
就是这样! Builder::new
创建了一个包含 5 个线程的新湖,我们传递一个闭包,该闭包被克隆并发送到每个线程以执行。最后 ThreadLake::join
将所有线程连接起来。大多数时候,用户会想最大限度地利用可用的并行性。这可以通过闭包来完成
let lake = Builder::new(|ac: Option<usize>| ac.unwrap())
.spawn(|_: ThreadUtilities<_>| {
println!("Hello thread");
});
lake.join();
或者,可以使用类型 FullParallelism
来代替闭包。使用 ThreadUtilities
对象,它公开了一些线程的有用属性和函数,我们得到线程的索引并打印它
let lake = Builder::new(|ac: Option<usize>| ac.unwrap())
.spawn(|x: ThreadUtilities<_>| {
println!("Hello thread number {}", x.index());
});
lake.join();
接下来,我们创建一个湖,它计算向量 v
的元素之和
let lake: ThreadLake<_, i64> = Builder::with_data(|ac: Option<usize>| ac.unwrap(), v)
.spawn(|x: ThreadUtilities<_>| {
x.split_slice(x.data()).iter().sum()
});
println!("sum: {}", lake.join_iter().map(|x| x.unwrap()).sum::<i64>());
首先,我们将向量 v
移动到湖中,我们使用 Builder::with_data
来声明它,这样我们就可以向湖中发送数据。通过 Builder::with_data
发送的数据会被移动到湖中,并且每个线程都会克隆一个 Arc
。然后我们使用 ThreadUtilities::split_slice
,它根据线程索引将切片分割成子切片。最后,我们调用 ThreadLake::join_iter
,它将每个线程的结果收集到最终的和中。我们的最终示例使用了 Disjointer
,它安全地将向量分割成互斥的可变切片。由于切片是互斥的,且没有两个线程会被分配到相同的切片,因此没有竞态条件。
let lake = Builder::with_data(|ac: Option<usize>| ac.unwrap(), Disjointer::new(v))
.spawn(|x: ThreadUtilities<_>| {
for element in x.data().piece(&x) {
*element = *element * 2 + 1;
}
});
let v = lake.join().unwrap().take();
println!("Results: {:?}", &v[..100]);
首先,我们将向量 v
移动到一个由包装器 Disjointer
创建的湖中,该包装器负责以线程安全的方式分割向量。我们调用 Disjointer::piece
,它与 ThreadUtilities::split_slice
类似,将向量分割成可变切片。注意:它通过引用 ThreadUtilities
对象来获取线程索引,并防止用户在 ThreadLake
对象之外分割数组。最后,我们通过调用 join 等待线程完成,尝试获取底层数据(如果存在其他数据引用,则会失败),然后从 Disjointer
中取出原始向量。