4 个版本
0.2.0 | 2020年11月17日 |
---|---|
0.1.2 | 2020年11月6日 |
0.1.1 | 2020年11月6日 |
0.1.0 | 2020年11月4日 |
1042 在 并发
13KB
231 行
Para
此项目仍在开发中。欢迎贡献。
目标
通过简单接口,在多核上高效并行化管道计算。
界面应类似于
let data = vec!(0, 1, 2);
let mut sum = 0;
pipeline!(4, data
=> (|x| (x * 2, x) => |(x2, x)| x2 - x,
|x| -x)
=> |x| sum += x);
在这个例子中,要执行的任务是
- 迭代
data
中的元素 - 克隆每个元素,并将其传递给
|x| (x * 2, x)
和|x| -x
闭包。 - 将
|x| (x * 2, x)
的输出应用于|(x2, x)| x2 - x
- 将
|(x2, x)| x2 - x
和|x| -x
的所有输出加到sum
变量中。
这构建了一个图,其中每个节点都是一个闭包。数据在闭包之间流动并得到处理。除了本例中的第一个和最后一个节点(迭代和求和节点)外,所有节点都可以完全并行化。`pipeline!` 宏将实例化一个调度器,该调度器将使用 4 个线程运行所有节点,并最大化计算吞吐量。
当前状态
查看 集成测试
待办事项
功能
- 改进宏。目前用户必须手动在堆上创建消费者节点。
- 调度器是否可以拥有节点?我们是否真的想要它?(它将不允许其他线程使用函数,但实际上其他线程不应运行)
- 支持为节点添加优先级
- 支持标记节点在外部硬件上运行(例如GPU)
- 支持无状态生产者?Rayon风格的可分割迭代器?
优化
- 在运行短任务时,线程会花费大量时间在任务队列的推送和弹出上进行同步。通常的解决方案是实现工作窃取。
- 具有状态的消费者被锁定在互斥锁中,因此线程可能会花费时间阻塞。一个可能的改进是修改
consume
接口以返回布尔值,并在互斥锁中执行函数时使用try_lock
。一旦我们实现了工作窃取,这可能会变得不那么有问题。此外,我们可能还想用refcell代替互斥锁。 - 任务包含一个指向节点的指针,这很浪费,因为它有64位,而我们通常只有大约3位节点的位数。我们可以用id代替引用,或者按节点收集任务。按节点收集任务可能使我们能够进一步优化具有状态的任务的执行。
- 我们希望所有核心始终工作。可能存在这样一种情况,即有很多任务,但所有任务都是针对特定具有状态的节点,因此它们不能并行化。为此,具有状态的节点应始终优先考虑。
- 通过了解哪些节点产生更多工作量以及哪些节点消耗更多工作量,智能地管理拥有许多就绪任务和内存使用之间的平衡,并根据当前工作量优先考虑它们。
进行中
- 在宏中支持扇出
- 在宏中支持多个生产者
依赖项
~370KB