7 个版本

0.1.6 2024年6月27日
0.1.5 2024年6月6日
0.1.4 2024年5月29日
0.1.3 2024年1月5日
0.1.1 2023年7月23日

#284并发

Download history 1/week @ 2024-05-17 79/week @ 2024-05-24 143/week @ 2024-05-31 48/week @ 2024-06-07 8/week @ 2024-06-14 84/week @ 2024-06-21 33/week @ 2024-06-28 2/week @ 2024-07-05

每月下载量 416
用于 libreda-sta

GPL-3.0-or-later

105KB
2K SLoC

ParGraph - 并行图处理库

Pargraph 是一个简单而谦逊的库,用于并行处理 petgraph 数据结构。

测试

使用 cargo test 运行基本测试。

正确测试并发数据结构是困难的。此 crate 使用 loom crate 对内部同步原语进行测试。必须单独运行这些测试

RUSTFLAGS="--cfg loom" cargo test

lib.rs:

ParaGraph: 并行图处理。

此 crate 实现了用于并发遍历图的数据结构和算法。可以使用 '操作符' 处理图。操作符只能看到图的一小部分,即 '活动' 节点和它的直接邻居。标记操作符可以编辑活动节点的相关数据,并且可以生成一组新节点,稍后应进行处理。节点处理的顺序主要由 '工作列表' 定义。

操作符

有以下类型的操作符

  • ReadonlyOperator - 只能通过不可变引用访问图元素。如果需要修改,则需要使用内部可变性。
  • LabellingOperator - 可以修改局部节点数据。执行器必须提供对局部节点数据的可变引用。

执行器

有以下类型的执行器

示例:使用原子操作计算影响锥

以下示例访问了 src 节点的输出锥。输出锥包括从 src 开始,然后跟随出边可以到达的所有节点。此外,对于锥中的每个节点,操作符还跟踪锥中存在的输入节点。

类似的算法可以用于标记最短路径搜索的增量更新感兴趣的区域。

此算法实现为一个 ReadonlyOperator,该操作符在节点数据的不可变引用上操作。通过原子操作仍然实现了节点数据的安全可变性。这避免了将节点数据包装到 DataCell 中进行锁定。

use pargraph::prelude::*;
use petgraph::data::DataMap;
use petgraph::graph::DiGraph;
use petgraph::visit::*;
use std::sync::atomic::AtomicU32;

struct NodeData {
    /// Count the number of input edges to the node
    /// which are part of the cone.
    num_dependencies: AtomicU32,
}

impl NodeData {
    fn new() -> Self {
        Self {
            num_dependencies: AtomicU32::new(0),
        }
    }
}

// Create a graph like:
//     x---
//     |   \
//    src   y
//     |
//     a
//   /  \
//  b   c
//  \  /
//   d
let mut g = DiGraph::new();

// Helper function for creating new nodes with default node data.
// Initialize the distance to the maximum value.
let mut new_node = || g.add_node(NodeData::new());

// Create some new nodes.
let [x, y, src, a, b, c, d] = [(); 7].map(|_| new_node());

// Add some edges (without any weights).
g.add_edge(x, src, ());
g.add_edge(x, y, ());
g.add_edge(src, a, ());
g.add_edge(a, b, ());
g.add_edge(a, c, ());
g.add_edge(c, d, ());
g.add_edge(b, d, ());

let operator = ConeOfInfluenceOp {};

let executor = MultiThreadExecutor::new();

// Create a worklist and add the source node.
let wl = FifoWorklist::new_with_local_queues(vec![src].into());
executor.run_readonly(wl, &operator, &g);

let get_num_dependencies = |n: petgraph::graph::NodeIndex| -> u32 {
    g.node_weight(n)
        .unwrap()
        .num_dependencies
        .load(std::sync::atomic::Ordering::Relaxed)
};

// Check the distances.
assert_eq!(get_num_dependencies(x), 0, "x is not in the cone of influence of src");
assert_eq!(get_num_dependencies(y), 0, "y is not in the cone of influence of src");
assert_eq!(get_num_dependencies(src), 0);
assert_eq!(get_num_dependencies(a), 1);
assert_eq!(get_num_dependencies(b), 1);
assert_eq!(get_num_dependencies(c), 1);
assert_eq!(get_num_dependencies(d), 2);

// This is our operator.
struct ConeOfInfluenceOp {}

// We can implement this operator as a `ReadonlyOperator` because it does not require
// a mutable reference to the node data. Safe mutability is achieved using atomics.
// Note that we implement the operator for the reference type. Operators are required to implement `Clone`.
// A reference implements `Clone` automatically. Alternatively we could also derive `Clone` for `ConeOfInfluenceOp`
// and pass ownership of the operator to the executor. The executor might create clones of the operators for the worker
// threads.
impl<G> ReadonlyOperator<G> for &ConeOfInfluenceOp
where
    G: GraphBase + IntoEdgesDirected,
    G: DataMap<NodeWeight = NodeData>,
{
    type WorkItem = G::NodeId;

    fn op(
        &self,
        active_node: Self::WorkItem,
        local_view: LocalGraphView<&G>,
        mut worklist: impl WorklistPush<Self::WorkItem>,
    ) {
        let output_nodes =
            local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing);

        for n in output_nodes {
            // Access the node weight.
            let n_data = local_view
                .node_weight(n)
                .expect("all nodes should have a weight");

            // Atomically increment the number of dependencies of the node `n`.
            // `fetch_add` returns the previous value. If the previous value is `0` then
            // we know that this is the first time we look at node `n` (unless there is a cycle leading to the source node).
            let previous_num_dependencies = n_data
                .num_dependencies
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);

            if previous_num_dependencies == 0 {
                // This is the first time n is touched.
                worklist.push(n);
            }
        }
    }
}

依赖项

~3–27MB
~378K SLoC