8 个版本

0.9.7 2023 年 1 月 4 日
0.9.6 2023 年 1 月 4 日
0.9.4 2022 年 12 月 29 日
0.9.1 2022 年 9 月 4 日

#517 in 异步

Download history 6/week @ 2024-03-23 197/week @ 2024-03-30 1/week @ 2024-04-06 211/week @ 2024-06-08 9/week @ 2024-06-15

220 个月下载量

自定义许可

28KB
297

Swarm Commander

成功指挥您的蜂群!您可以创建大量命令,并以简单的方式监控它们的进度。当然,也是异步的

Crates.io Apache 2 licensed

网站 | API 文档

概述

有时您需要一个控制其他应用程序的应用程序。用例范围广泛,从使用 imagemagick 转换数千张图片,启动多个 nginx 服务器或流式传输大量 ffmpeg 视频。处理这些事情有点棘手,因为您需要控制 stderror、stdout(以非阻塞方式读取两者),处理命令的状态(是否成功启动?返回了错误?仍在运行?……),决定是否要终止命令,当然,解析所有输出。这个 crate 提供了一个干净且通用的方法,让您可以专注于自己的业务。

想法是这样的。您决定要运行哪些命令(ls、find...)并实现输出解析器。然后,通过提供这两样东西,您只需消费您解析格式的新的消息。

示例

基本示例

[dependencies]
swarm-commander = "0.9.4"

将此放在您的 main.rs 中

use anyhow::Result;
use tokio::process::Command;
use swarm_commander::{run_hive, StdType, RunnerEvent::{RunnerStartEvent, RunnerStopEvent, RunnerStatusEvent, RunnerLogEvent}};


// This is what you parse will build from a command output line
#[derive(Debug)]
struct Request {
    method: String,
    status: String,
    user_agent: String,
    url: String
 }

#[tokio::main]
async fn main() -> Result<()> {
    // Create the communication channel
    let (tx, rx) = flume::unbounded();

    // A command you want to run.
    let mut cmd = Command::new("/usr/bin/nginx");
    cmd.arg("-c").arg("/opt/nginx/nginx.conf");
        
    // Your parser which will receive all the outputs and parse them. Return None if you just want to skip the line
    let parser = move |line: &str, pid: u32, std_type: &StdType| -> Option<Request> {
        // This nginx output is like "GET /index.html 200 Mozilla/5.0"
        if line.starts_with("GET") || line.starts_with("POST") {
            // I'm interested only on GET and POST requests
            let parts = line.split(" ").collect::<Vec<&str>>();
            Some(Request {
            method: parts[0].to_owned(),
            status: parts[2].to_owned(),
            user_agent: parts[3].to_owned(),
            url: parts[1].to_owned(),
            })
        } else {
            // Other kind of request or any other output that I'm ignoring
            None
        }
    };
  
    // Establish a hive
    let (_, mut hive) = run_hive(tx.clone(), parser).await;
    // Spawn the nginx command
    hive.spawn("my-nginx", cmd).await?;
  
    // I will use this interval to kill the nginx in 15 seconds
    let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
    interval.tick().await;
    
    // Wait for the updates
    let mut keep_running = true;
    while keep_running {
        tokio::select! {
            message = rx.recv_async() => {
                match message {
                    Ok(message) => {
                        // message is any kind of `RunnerEvent`
                        match message {
                            RunnerStartEvent(event) => {
                                println!("Process with id {} started", event.id)
                            }, 
                            RunnerStopEvent(event) => {
                                println!("Process with pid {} died", event.pid)
                            },
                            RunnerStatusEvent(event) => {
                                println!("New message from {}: {:?}", event.id, event.data)
                            },
                            RunnerLogEvent(event) => {
                                println!("Log of type {:?} from {}: {:?}", event.std, event.id, event.log)
                            }
                        }
                    },
                    Err(err) => {
                        println!("ERROR {:?}", err);
                        keep_running = false;
                    }
                }
                
            },
            _ = interval.tick() => {
                // List all running processes
                let proc_list = hive.processes_info().await;
                println("Before die: {:?}", proc_list);
                println!("DIE NGINX DIE HAHAHAAH");
                hive.halt("my-nginx").await?;
                let proc_list = hive.processes_info().await;
                println("After die: {:?}", proc_list);
            }
        }
    }
    // Kill all running processes before exit
    hive.disband().await?;
    Ok(())
}

许可证

该项目受 Apache 2 许可证 的许可。

依赖关系

~5–17MB
~169K SLoC