7 个版本
0.1.6 | 2024年6月27日 |
---|---|
0.1.5 | 2024年6月6日 |
0.1.4 | 2024年5月29日 |
0.1.3 | 2024年1月5日 |
0.1.1 | 2023年7月23日 |
#284 在 并发
每月下载量 416
用于 libreda-sta
105KB
2K SLoC
ParGraph - 并行图处理库
Pargraph 是一个简单而谦逊的库,用于并行处理 petgraph
数据结构。
测试
使用 cargo test
运行基本测试。
正确测试并发数据结构是困难的。此 crate 使用 loom
crate 对内部同步原语进行测试。必须单独运行这些测试
RUSTFLAGS="--cfg loom" cargo test
lib.rs
:
ParaGraph: 并行图处理。
此 crate 实现了用于并发遍历图的数据结构和算法。可以使用 '操作符' 处理图。操作符只能看到图的一小部分,即 '活动' 节点和它的直接邻居。标记操作符可以编辑活动节点的相关数据,并且可以生成一组新节点,稍后应进行处理。节点处理的顺序主要由 '工作列表' 定义。
操作符
有以下类型的操作符
ReadonlyOperator
- 只能通过不可变引用访问图元素。如果需要修改,则需要使用内部可变性。LabellingOperator
- 可以修改局部节点数据。执行器必须提供对局部节点数据的可变引用。
执行器
有以下类型的执行器
executors::single_thread::SingleThreadExecutor
- 在当前线程中执行操作符。executors::multi_thread::MultiThreadExecutor
- 在许多线程上执行操作符。对操作符和图数据结构施加更严格的 trait 约束。
示例:使用原子操作计算影响锥
以下示例访问了 src
节点的输出锥。输出锥包括从 src
开始,然后跟随出边可以到达的所有节点。此外,对于锥中的每个节点,操作符还跟踪锥中存在的输入节点。
类似的算法可以用于标记最短路径搜索的增量更新感兴趣的区域。
此算法实现为一个 ReadonlyOperator
,该操作符在节点数据的不可变引用上操作。通过原子操作仍然实现了节点数据的安全可变性。这避免了将节点数据包装到 DataCell
中进行锁定。
use pargraph::prelude::*;
use petgraph::data::DataMap;
use petgraph::graph::DiGraph;
use petgraph::visit::*;
use std::sync::atomic::AtomicU32;
struct NodeData {
/// Count the number of input edges to the node
/// which are part of the cone.
num_dependencies: AtomicU32,
}
impl NodeData {
fn new() -> Self {
Self {
num_dependencies: AtomicU32::new(0),
}
}
}
// Create a graph like:
// x---
// | \
// src y
// |
// a
// / \
// b c
// \ /
// d
let mut g = DiGraph::new();
// Helper function for creating new nodes with default node data.
// Initialize the distance to the maximum value.
let mut new_node = || g.add_node(NodeData::new());
// Create some new nodes.
let [x, y, src, a, b, c, d] = [(); 7].map(|_| new_node());
// Add some edges (without any weights).
g.add_edge(x, src, ());
g.add_edge(x, y, ());
g.add_edge(src, a, ());
g.add_edge(a, b, ());
g.add_edge(a, c, ());
g.add_edge(c, d, ());
g.add_edge(b, d, ());
let operator = ConeOfInfluenceOp {};
let executor = MultiThreadExecutor::new();
// Create a worklist and add the source node.
let wl = FifoWorklist::new_with_local_queues(vec![src].into());
executor.run_readonly(wl, &operator, &g);
let get_num_dependencies = |n: petgraph::graph::NodeIndex| -> u32 {
g.node_weight(n)
.unwrap()
.num_dependencies
.load(std::sync::atomic::Ordering::Relaxed)
};
// Check the distances.
assert_eq!(get_num_dependencies(x), 0, "x is not in the cone of influence of src");
assert_eq!(get_num_dependencies(y), 0, "y is not in the cone of influence of src");
assert_eq!(get_num_dependencies(src), 0);
assert_eq!(get_num_dependencies(a), 1);
assert_eq!(get_num_dependencies(b), 1);
assert_eq!(get_num_dependencies(c), 1);
assert_eq!(get_num_dependencies(d), 2);
// This is our operator.
struct ConeOfInfluenceOp {}
// We can implement this operator as a `ReadonlyOperator` because it does not require
// a mutable reference to the node data. Safe mutability is achieved using atomics.
// Note that we implement the operator for the reference type. Operators are required to implement `Clone`.
// A reference implements `Clone` automatically. Alternatively we could also derive `Clone` for `ConeOfInfluenceOp`
// and pass ownership of the operator to the executor. The executor might create clones of the operators for the worker
// threads.
impl<G> ReadonlyOperator<G> for &ConeOfInfluenceOp
where
G: GraphBase + IntoEdgesDirected,
G: DataMap<NodeWeight = NodeData>,
{
type WorkItem = G::NodeId;
fn op(
&self,
active_node: Self::WorkItem,
local_view: LocalGraphView<&G>,
mut worklist: impl WorklistPush<Self::WorkItem>,
) {
let output_nodes =
local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing);
for n in output_nodes {
// Access the node weight.
let n_data = local_view
.node_weight(n)
.expect("all nodes should have a weight");
// Atomically increment the number of dependencies of the node `n`.
// `fetch_add` returns the previous value. If the previous value is `0` then
// we know that this is the first time we look at node `n` (unless there is a cycle leading to the source node).
let previous_num_dependencies = n_data
.num_dependencies
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if previous_num_dependencies == 0 {
// This is the first time n is touched.
worklist.push(n);
}
}
}
}
依赖项
~3–27MB
~378K SLoC