4 个版本

0.2.0 2020年11月17日
0.1.2 2020年11月6日
0.1.1 2020年11月6日
0.1.0 2020年11月4日

1042并发

MIT/Apache

13KB
231

Para

此项目仍在开发中。欢迎贡献。

目标

通过简单接口,在多核上高效并行化管道计算。

界面应类似于

let data = vec!(0, 1, 2);
let mut sum = 0;
pipeline!(4, data
          => (|x| (x * 2, x) => |(x2, x)| x2 - x,
              |x| -x)
          => |x| sum += x);

在这个例子中,要执行的任务是

  • 迭代 data 中的元素
  • 克隆每个元素,并将其传递给 |x| (x * 2, x)|x| -x 闭包。
  • |x| (x * 2, x) 的输出应用于 |(x2, x)| x2 - x
  • |(x2, x)| x2 - x|x| -x 的所有输出加到 sum 变量中。

这构建了一个图,其中每个节点都是一个闭包。数据在闭包之间流动并得到处理。除了本例中的第一个和最后一个节点(迭代和求和节点)外,所有节点都可以完全并行化。`pipeline!` 宏将实例化一个调度器,该调度器将使用 4 个线程运行所有节点,并最大化计算吞吐量。

当前状态

查看 集成测试

待办事项

功能

  • 改进宏。目前用户必须手动在堆上创建消费者节点。
  • 调度器是否可以拥有节点?我们是否真的想要它?(它将不允许其他线程使用函数,但实际上其他线程不应运行)
  • 支持为节点添加优先级
  • 支持标记节点在外部硬件上运行(例如GPU)
  • 支持无状态生产者?Rayon风格的可分割迭代器?

优化

  • 在运行短任务时,线程会花费大量时间在任务队列的推送和弹出上进行同步。通常的解决方案是实现工作窃取。
  • 具有状态的消费者被锁定在互斥锁中,因此线程可能会花费时间阻塞。一个可能的改进是修改consume接口以返回布尔值,并在互斥锁中执行函数时使用try_lock。一旦我们实现了工作窃取,这可能会变得不那么有问题。此外,我们可能还想用refcell代替互斥锁。
  • 任务包含一个指向节点的指针,这很浪费,因为它有64位,而我们通常只有大约3位节点的位数。我们可以用id代替引用,或者按节点收集任务。按节点收集任务可能使我们能够进一步优化具有状态的任务的执行。
  • 我们希望所有核心始终工作。可能存在这样一种情况,即有很多任务,但所有任务都是针对特定具有状态的节点,因此它们不能并行化。为此,具有状态的节点应始终优先考虑。
  • 通过了解哪些节点产生更多工作量以及哪些节点消耗更多工作量,智能地管理拥有许多就绪任务和内存使用之间的平衡,并根据当前工作量优先考虑它们。

进行中

  • 在宏中支持扇出
  • 在宏中支持多个生产者

依赖项

~370KB