#命令行 #编排器 #ipc #本地 #开发 #进程 #编排

ipc-orchestrator

用于本地开发中带有 IPC 通信的命令行进程编排

4 个版本

0.3.3 2020年1月18日
0.3.2 2020年1月18日
0.3.1 2020年1月18日
0.3.0 2020年1月17日

#1256 in 异步

每月 23 次下载

MIT 许可证

30KB
479

执行和编排命令行工具。

基于 io 流和可选的 ipc-channel,编排器旨在启动和编排程序。

此仓库不适用于运行生产关键任务,而是用于本地开发或非关键编排。

在 Linux 和 Mac OS X 上工作,由于依赖于 ipc_channels,因此不支持 Windows。

基本示例

use tokio::process::{Command};
use ipc_orchestrator::orchestrator;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut orchestrator = orchestrator().ipc(false);
    orchestrator.start("start", &mut Command::new("echo"));
    orchestrator.connect().await
}

IPC 路由示例

cargo run --example=orchestrate
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut orchestrator = orchestrator().ipc(true);

    // Start pipeline: generate random f64 [0;1) -> sum -> write to stdout every 10_000 times
    let mut cmd = Command::new("cargo");
    orchestrator.start("generate", cmd.arg("run").arg("--example=generate"))
        .expect("failed to start generate");
    let mut cmd = Command::new("cargo");
    orchestrator.start("sum", cmd.arg("run").arg("--example=sum"))
        .expect("failed to start sum");
    let mut cmd = Command::new("cargo");
    orchestrator.start("mul", cmd.arg("run").arg("--example=write"))
        .expect("failed to start mul");

    // Connect log handlers and IPC handlers
    let mut orchestra = match orchestrator.connect().await {
        Err(_) => std::process::exit(1),
        Ok(o) => o,
    };

    // Route IPC messages
    orchestra.pipe_bridges("generate", "sum")?;
    orchestra.pipe_bridges("sum", "write")?;

    // Killing it hard since some spawned futures might still run
    match orchestra.run().await {
        Err(_) => std::process::exit(1),
        _ => Ok(()),
    }
}

自定义记录器

use tokio::process::{Command, ChildStdout};
use ipc_orchestrator::Orchestrator;
use std::sync::atomic::{AtomicBool, Ordering};
static CALLED: AtomicBool = AtomicBool::new(false);
use tokio::io::{AsyncBufReadExt, BufReader};

// custom logs processor
async fn mock_log_handler(reader: ChildStdout, name: String) -> anyhow::Result<()> {
   let mut reader = BufReader::new(reader).lines();
   assert_eq!(reader.next_line().await?.unwrap(), "testbed");
   CALLED.store(true, Ordering::Relaxed);
   Ok(())
}

#[tokio::main]
async fn main() {
    let mut orchestrator = Orchestrator::from_handlers(mock_log_handler).ipc(false);
    let mut cmd = Command::new("echo");
    cmd.arg("testbed");
    orchestrator.start("start", &mut cmd);
    let orchestra = orchestrator.connect().await.unwrap();
    // it supposes never existing processes
    // hence it will give error on when any process exit or stdout was closed
    orchestra.run().await.unwrap_err();
    assert!(CALLED.load(Ordering::Relaxed));
}

依赖项

~10–20MB
~292K SLoC