1 个不稳定版本
0.1.0 | 2022年3月15日 |
---|
#467 在 并发
14KB
199 行
Supermon
一个用于安排观察者并通过中间件将结果传递给消费者的工具
Supermon 允许您编写独立的计算单元(工作者),并处理它们之间的通信连接。它使用通道在这些不同的单元之间传递消息。
安装
将以下依赖项添加到您的 Cargo.toml
[dependencies]
supermon = { git = "https://github.com/z80dev/supermon" }
async-trait = "0.1.52"
当您实现 Watcher
、Middleware
和 Consumer
特性时,您需要 async-trait
。
工作原理
目前支持三种不同类型的工作者
- 观察者:监视特定条件,如果找到任何内容,则通过通道发送消息
- 中间件:在观察者和听众之间处理消息,以执行必要的处理,如去重
- 消费者:接收来自观察者的消息,并执行任何必要的操作。
这是一个非常灵活的基础,您可以在此基础上构建任何满足您需求的定制产品。
对于这些角色中的每一个,都有一个相应的特性。您可以在任何结构体上实现这些特性,以便 Supermon 可以安排其执行。
观察者
// watcher.rs
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;
#[async_trait]
pub trait Watcher {
type Payload;
async fn watch(&self, sx: Sender<Self::Payload>);
}
Watcher
特性只有一个必须实现的功能,即 watch
。此异步函数如果在找到任何内容时,应通过通道 sx
发送消息。
例如(伪代码,此处 check_bal
函数是虚构的,仅检查某个地方的余额)
pub struct SuperWatcher {
pub addr_to_watch: String,
}
impl Watcher for MulticallZapperWatcher {
type Payload = String;
async fn watch(&self, sx: Sender<Self::Payload>) {
loop {
if check_bal(self.addr_to_watch) != 0 {
sx.send(self.addr_to_watch);
}
}
}
}
中间件
use async_trait::async_trait;
use tokio::sync::mpsc::{Receiver, Sender};
#[async_trait]
pub trait Middleware {
type Payload;
async fn process(&self, sx: Sender<Self::Payload>, rx: Receiver<Self::Payload>);
}
Middleware
特性只有一个必须实现的功能,即 process
。此异步函数应监听来自 rx
的消息,执行任何必要的处理或过滤,并将消息传递到 sx
。
这可以用于实现消息的去重。
消费者
use async_trait::async_trait;
use tokio::sync::mpsc::Receiver;
#[async_trait]
pub trait Consumer {
type Payload;
async fn consume(&self, mut rx: Receiver<Self::Payload>);
}
Consumer
特性只有一个必须实现的功能,即 consume
。此异步函数应监听 rx
上的消息并执行任何必要的操作。
例如,记录接收到的任何消息的消费者
pub struct ListenerLogger{}
#[async_trait]
impl Consumer for ListenerLogger {
type Payload = String;
async fn consume(&self, mut rx: Receiver<Self::Payload>) {
println!("Starting listener");
while let Some(addr) = rx.recv().await {
println!("Received address {} in message", addr);
}
}
}
整合一切
Executor
结构体处理启动您的观察者、中间件和消费者的执行。您可以通过以下函数与之交互
new
:创建一个新的执行器对象,无参数add_watcher
:期望一个实现了Watcher
的结构的Box
实例add_middleware
:期望一个实现了Middleware
的结构的Box
实例add_consumer
:期望一个实现了Consumer
的结构的Box
实例start
:启动所有添加的监视器、中间件和执行器的执行
// main.rs
use supermon::{Executor}
// ... add struct definitions from examples above
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut executor = Executor::new();
executor.add_watcher(
Box::new(SuperWatcher{ addr_to_watch: "0x0000....." })
);
executor.set_listener(
Box::new(ListenerLogger{})
);
executor.start().await;
Ok(())
}
依赖项
~2.7–9.5MB
~75K SLoC