8 个版本
0.9.7 | 2023 年 1 月 4 日 |
---|---|
0.9.6 | 2023 年 1 月 4 日 |
0.9.4 | 2022 年 12 月 29 日 |
0.9.1 | 2022 年 9 月 4 日 |
#517 in 异步
220 个月下载量
28KB
297 行
Swarm Commander
成功指挥您的蜂群!您可以创建大量命令,并以简单的方式监控它们的进度。当然,也是异步的
概述
有时您需要一个控制其他应用程序的应用程序。用例范围广泛,从使用 imagemagick
转换数千张图片,启动多个 nginx 服务器或流式传输大量 ffmpeg
视频。处理这些事情有点棘手,因为您需要控制 stderror、stdout(以非阻塞方式读取两者),处理命令的状态(是否成功启动?返回了错误?仍在运行?……),决定是否要终止命令,当然,解析所有输出。这个 crate 提供了一个干净且通用的方法,让您可以专注于自己的业务。
想法是这样的。您决定要运行哪些命令(ls、find...)并实现输出解析器。然后,通过提供这两样东西,您只需消费您解析格式的新的消息。
示例
基本示例
[dependencies]
swarm-commander = "0.9.4"
将此放在您的 main.rs 中
use anyhow::Result;
use tokio::process::Command;
use swarm_commander::{run_hive, StdType, RunnerEvent::{RunnerStartEvent, RunnerStopEvent, RunnerStatusEvent, RunnerLogEvent}};
// This is what you parse will build from a command output line
#[derive(Debug)]
struct Request {
method: String,
status: String,
user_agent: String,
url: String
}
#[tokio::main]
async fn main() -> Result<()> {
// Create the communication channel
let (tx, rx) = flume::unbounded();
// A command you want to run.
let mut cmd = Command::new("/usr/bin/nginx");
cmd.arg("-c").arg("/opt/nginx/nginx.conf");
// Your parser which will receive all the outputs and parse them. Return None if you just want to skip the line
let parser = move |line: &str, pid: u32, std_type: &StdType| -> Option<Request> {
// This nginx output is like "GET /index.html 200 Mozilla/5.0"
if line.starts_with("GET") || line.starts_with("POST") {
// I'm interested only on GET and POST requests
let parts = line.split(" ").collect::<Vec<&str>>();
Some(Request {
method: parts[0].to_owned(),
status: parts[2].to_owned(),
user_agent: parts[3].to_owned(),
url: parts[1].to_owned(),
})
} else {
// Other kind of request or any other output that I'm ignoring
None
}
};
// Establish a hive
let (_, mut hive) = run_hive(tx.clone(), parser).await;
// Spawn the nginx command
hive.spawn("my-nginx", cmd).await?;
// I will use this interval to kill the nginx in 15 seconds
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
interval.tick().await;
// Wait for the updates
let mut keep_running = true;
while keep_running {
tokio::select! {
message = rx.recv_async() => {
match message {
Ok(message) => {
// message is any kind of `RunnerEvent`
match message {
RunnerStartEvent(event) => {
println!("Process with id {} started", event.id)
},
RunnerStopEvent(event) => {
println!("Process with pid {} died", event.pid)
},
RunnerStatusEvent(event) => {
println!("New message from {}: {:?}", event.id, event.data)
},
RunnerLogEvent(event) => {
println!("Log of type {:?} from {}: {:?}", event.std, event.id, event.log)
}
}
},
Err(err) => {
println!("ERROR {:?}", err);
keep_running = false;
}
}
},
_ = interval.tick() => {
// List all running processes
let proc_list = hive.processes_info().await;
println("Before die: {:?}", proc_list);
println!("DIE NGINX DIE HAHAHAAH");
hive.halt("my-nginx").await?;
let proc_list = hive.processes_info().await;
println("After die: {:?}", proc_list);
}
}
}
// Kill all running processes before exit
hive.disband().await?;
Ok(())
}
许可证
该项目受 Apache 2 许可证 的许可。
依赖关系
~5–17MB
~169K SLoC