3个不稳定版本

0.2.1 2023年5月6日
0.2.0 2023年5月5日
0.1.0 2023年2月6日

#486 in 过程宏

31 每月下载量

MIT/Apache

375KB
2K SLoC

static-graph

Crates.io License Build Status

此crate提供了通过分析DSL中的节点依赖来生成静态图的能力。它允许图中只有一个输入和一个输出,独立节点可以最大程度地并行运行。

例如,在以下图中(数字代表节点的执行时间),串行运行将需要6秒,但最大并行运行仅需2秒。

graph TD;
    A/0-->B/1;
    A/0-->C/2;
    A/0-->D/1;
    A/0-->E/1;
    B/1-->F/0;
    C/2-->F/0;
    D/1-->G/1;
    E/1-->G/1;
    F/0-->H/0;
    G/1-->H/0;

用法

将其添加到您的 Cargo.toml

[build-dependencies]
static-graph = "0.2"

示例

example.graph 文件中编写图描述

node E -> (X, Y) {
    #[default = "crate::Custom::new"]
    custom: crate::Custom,
}

node X -> O {
    x: list<string>,
}

node Y -> O {
    y: map<i32, string>,
}

node O {
    #[editable = "true"]
    o: string,
}

graph G(E)

然后,在 build.rs

fn main() {
    static_graph::configure()
        .file_name("example.rs")
        .compile("example.graph")
        .unwrap();
}

最后,在 main.rs 中为图中的节点编写自己的逻辑。生成的代码默认位于 OUT_DIR 目录,图名为 G,节点名为 EXYO。您应该为每个节点实现 Runnable trait,然后可以通过调用 G::new().run() 自动在最大并行度下运行图。

use std::{
    sync::Arc,
    time::{Duration, Instant},
};

use gen_graph::{Runnable, E, G, O, X, Y};

#[allow(warnings, clippy::all)]
pub mod gen_graph {
    static_graph::include_graph!("example.rs");
}

#[derive(Default)]
pub struct Custom;

impl Custom {
    pub fn new() -> Self {
        Self
    }
}

#[tokio::main]
async fn main() {
    let start = Instant::now();
    let resp = G::new()
        .run::<Request, EResponse, XResponse, YResponse, OResponse, ()>(Request {
            msg: "**Hello, world!**".to_string(),
            user_age: 18,
        })
        .await;
    let duration = start.elapsed();

    println!("Time elapsed is {duration:?}, resp is {resp:?}");
}

#[derive(Clone)]
pub struct Request {
    msg: String,
    user_age: u8,
}

#[derive(Clone)]
pub struct EResponse(Duration);

#[async_trait::async_trait]
impl Runnable<Request, ()> for E {
    type Resp = EResponse;
    type Error = ();

    async fn run(&self, _req: Request, _prev_resp: ()) -> Result<Self::Resp, Self::Error> {
        tokio::time::sleep(Duration::from_secs(1)).await;
        Ok(EResponse(Duration::from_secs(1)))
    }
}

#[derive(Clone)]
pub struct XResponse(bool);

#[async_trait::async_trait]
impl Runnable<Request, EResponse> for X {
    type Resp = XResponse;
    type Error = ();

    async fn run(&self, req: Request, prev_resp: EResponse) -> Result<Self::Resp, Self::Error> {
        tokio::time::sleep(prev_resp.0).await;
        Ok(XResponse(!req.msg.contains('*')))
    }
}

#[derive(Clone)]
pub struct YResponse(bool);

#[async_trait::async_trait]
impl Runnable<Request, EResponse> for Y {
    type Resp = YResponse;
    type Error = ();

    async fn run(&self, req: Request, prev_resp: EResponse) -> Result<Self::Resp, Self::Error> {
        tokio::time::sleep(prev_resp.0).await;
        Ok(YResponse(req.user_age >= 18))
    }
}

#[derive(Clone, Debug)]
pub struct OResponse(String);

#[async_trait::async_trait]
impl Runnable<Request, (XResponse, YResponse)> for O {
    type Resp = OResponse;
    type Error = ();

    async fn run(
        &self,
        req: Request,
        prev_resp: (XResponse, YResponse),
    ) -> Result<Self::Resp, Self::Error> {
        self.o.store(Arc::new(req.msg.clone()));
        println!("O: {:#?}", self.o.load());
        if prev_resp.0 .0 && prev_resp.1 .0 {
            Ok(OResponse(req.msg))
        } else {
            Ok(OResponse("Ban".to_string()))
        }
    }
}

许可证

Volo dual-licensed under the MIT license and the Apache License (Version 2.0).

See LICENSE-MIT and LICENSE-APACHE for details.

社区

依赖项

~5–12MB
~107K SLoC