8 个版本

0.9.7 2023 年 1 月 4 日
0.9.6 2023 年 1 月 4 日
0.9.4 2022 年 12 月 29 日
0.9.1 2022 年 9 月 4 日

#517 in 异步

Download history 6/week @ 2024-03-23 197/week @ 2024-03-30 1/week @ 2024-04-06 211/week @ 2024-06-08 9/week @ 2024-06-15

220 个月下载量



Swarm Commander


Crates.io Apache 2 licensed

网站 | API 文档


有时您需要一个控制其他应用程序的应用程序。用例范围广泛,从使用 imagemagick 转换数千张图片,启动多个 nginx 服务器或流式传输大量 ffmpeg 视频。处理这些事情有点棘手,因为您需要控制 stderror、stdout(以非阻塞方式读取两者),处理命令的状态(是否成功启动?返回了错误?仍在运行?……),决定是否要终止命令,当然,解析所有输出。这个 crate 提供了一个干净且通用的方法,让您可以专注于自己的业务。




swarm-commander = "0.9.4"

将此放在您的 main.rs 中

use anyhow::Result;
use tokio::process::Command;
use swarm_commander::{run_hive, StdType, RunnerEvent::{RunnerStartEvent, RunnerStopEvent, RunnerStatusEvent, RunnerLogEvent}};

// This is what you parse will build from a command output line
struct Request {
    method: String,
    status: String,
    user_agent: String,
    url: String

async fn main() -> Result<()> {
    // Create the communication channel
    let (tx, rx) = flume::unbounded();

    // A command you want to run.
    let mut cmd = Command::new("/usr/bin/nginx");
    // Your parser which will receive all the outputs and parse them. Return None if you just want to skip the line
    let parser = move |line: &str, pid: u32, std_type: &StdType| -> Option<Request> {
        // This nginx output is like "GET /index.html 200 Mozilla/5.0"
        if line.starts_with("GET") || line.starts_with("POST") {
            // I'm interested only on GET and POST requests
            let parts = line.split(" ").collect::<Vec<&str>>();
            Some(Request {
            method: parts[0].to_owned(),
            status: parts[2].to_owned(),
            user_agent: parts[3].to_owned(),
            url: parts[1].to_owned(),
        } else {
            // Other kind of request or any other output that I'm ignoring
    // Establish a hive
    let (_, mut hive) = run_hive(tx.clone(), parser).await;
    // Spawn the nginx command
    hive.spawn("my-nginx", cmd).await?;
    // I will use this interval to kill the nginx in 15 seconds
    let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
    // Wait for the updates
    let mut keep_running = true;
    while keep_running {
        tokio::select! {
            message = rx.recv_async() => {
                match message {
                    Ok(message) => {
                        // message is any kind of `RunnerEvent`
                        match message {
                            RunnerStartEvent(event) => {
                                println!("Process with id {} started", event.id)
                            RunnerStopEvent(event) => {
                                println!("Process with pid {} died", event.pid)
                            RunnerStatusEvent(event) => {
                                println!("New message from {}: {:?}", event.id, event.data)
                            RunnerLogEvent(event) => {
                                println!("Log of type {:?} from {}: {:?}", event.std, event.id, event.log)
                    Err(err) => {
                        println!("ERROR {:?}", err);
                        keep_running = false;
            _ = interval.tick() => {
                // List all running processes
                let proc_list = hive.processes_info().await;
                println("Before die: {:?}", proc_list);
                println!("DIE NGINX DIE HAHAHAAH");
                let proc_list = hive.processes_info().await;
                println("After die: {:?}", proc_list);
    // Kill all running processes before exit


该项目受 Apache 2 许可证 的许可。


~169K SLoC