#thread-pool #thread #lake #high-level #manager #iterator

thread_lake

一个非常高级的线程池管理器

3 个版本

0.1.4 2022年2月18日
0.1.3 2022年2月14日
0.1.2 2022年2月11日

#416并发

自定义许可

30KB
520

ThreadLake

Thread lake 是一个高级的同构线程池管理器

非常适合将任务分解为相似的任务,并在线程之间进行并行执行

这个crate不仅仅会创建几个线程,它还能做到

为什么?

我发现大多数多线程任务都是相似的,每次我都在写相同的样板代码。使用 arcs 共享资源,利用可用的并行性以获取最佳线程数,所以我决定创建一个管理器类型来帮我完成这些。

Thread lake 不是什么

这绝对不是一个典型的线程池 crate。Thread lake 适用于特定类型的任务,这些任务可以分解为相同的小任务,并在线程之间共享。

它不如线程池通用,但可能(?)更快。

特性

  • 根据可用的并发性启动 n 个线程
  • 通过 Arc 自动将数据从主线程移动到启动的线程
  • 从启动的线程向线程湖管理器发送消息
  • 设置线程使用的播放/暂停/停止标志
  • 在迭代器中收集线程的返回值
  • 在线程连接后从湖中移除数据
  • 将向量拆分为可变切片

使用方法

首先,我们创建一个湖,它启动 5 个线程,这些线程在屏幕上打印一条消息

let lake = Builder::new(5)
    .spawn(|_: ThreadUtilities<_>| {
        println!("Hello thread");
    });

lake.join();

就是这样! Builder::new 创建了一个包含 5 个线程的新湖,我们传递一个闭包,该闭包被克隆并发送到每个线程以执行。最后 ThreadLake::join 将所有线程连接起来。大多数时候,用户会想最大限度地利用可用的并行性。这可以通过闭包来完成

let lake = Builder::new(|ac: Option<usize>| ac.unwrap())
    .spawn(|_: ThreadUtilities<_>| {
        println!("Hello thread");
    });

lake.join();

或者,可以使用类型 FullParallelism 来代替闭包。使用 ThreadUtilities 对象,它公开了一些线程的有用属性和函数,我们得到线程的索引并打印它

let lake = Builder::new(|ac: Option<usize>| ac.unwrap())
    .spawn(|x: ThreadUtilities<_>| {
        println!("Hello thread number {}", x.index());
    });

lake.join();

接下来,我们创建一个湖,它计算向量 v 的元素之和

let lake: ThreadLake<_, i64> = Builder::with_data(|ac: Option<usize>| ac.unwrap(), v)
    .spawn(|x: ThreadUtilities<_>| {
        x.split_slice(x.data()).iter().sum()
    });

println!("sum: {}", lake.join_iter().map(|x| x.unwrap()).sum::<i64>());

首先,我们将向量 v 移动到湖中,我们使用 Builder::with_data 来声明它,这样我们就可以向湖中发送数据。通过 Builder::with_data 发送的数据会被移动到湖中,并且每个线程都会克隆一个 Arc。然后我们使用 ThreadUtilities::split_slice,它根据线程索引将切片分割成子切片。最后,我们调用 ThreadLake::join_iter,它将每个线程的结果收集到最终的和中。我们的最终示例使用了 Disjointer,它安全地将向量分割成互斥的可变切片。由于切片是互斥的,且没有两个线程会被分配到相同的切片,因此没有竞态条件。

let lake = Builder::with_data(|ac: Option<usize>| ac.unwrap(), Disjointer::new(v))
    .spawn(|x: ThreadUtilities<_>| {
        for element in x.data().piece(&x) {
            *element = *element * 2 + 1;
        }
    });

let v = lake.join().unwrap().take();

println!("Results: {:?}", &v[..100]);

首先,我们将向量 v 移动到一个由包装器 Disjointer 创建的湖中,该包装器负责以线程安全的方式分割向量。我们调用 Disjointer::piece,它与 ThreadUtilities::split_slice 类似,将向量分割成可变切片。注意:它通过引用 ThreadUtilities 对象来获取线程索引,并防止用户在 ThreadLake 对象之外分割数组。最后,我们通过调用 join 等待线程完成,尝试获取底层数据(如果存在其他数据引用,则会失败),然后从 Disjointer 中取出原始向量。

无运行时依赖