#consumer #middleware #channel #watcher #communication-channel #messages #worker

supermon

一套通过通道协调观察者、中间件和消费者的实用工具

1 个不稳定版本

0.1.0 2022年3月15日

#467并发

MIT 许可证

14KB
199

Supermon

一个用于安排观察者并通过中间件将结果传递给消费者的工具

Supermon 允许您编写独立的计算单元(工作者),并处理它们之间的通信连接。它使用通道在这些不同的单元之间传递消息。

安装

将以下依赖项添加到您的 Cargo.toml

[dependencies]
supermon = { git = "https://github.com/z80dev/supermon" }
async-trait = "0.1.52"

当您实现 WatcherMiddlewareConsumer 特性时,您需要 async-trait

工作原理

目前支持三种不同类型的工作者

  • 观察者:监视特定条件,如果找到任何内容,则通过通道发送消息
  • 中间件:在观察者和听众之间处理消息,以执行必要的处理,如去重
  • 消费者:接收来自观察者的消息,并执行任何必要的操作。

这是一个非常灵活的基础,您可以在此基础上构建任何满足您需求的定制产品。

对于这些角色中的每一个,都有一个相应的特性。您可以在任何结构体上实现这些特性,以便 Supermon 可以安排其执行。

观察者

// watcher.rs
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;

#[async_trait]
pub trait Watcher {
    type Payload;
    async fn watch(&self, sx: Sender<Self::Payload>);
}

Watcher 特性只有一个必须实现的功能,即 watch。此异步函数如果在找到任何内容时,应通过通道 sx 发送消息。

例如(伪代码,此处 check_bal 函数是虚构的,仅检查某个地方的余额)

pub struct SuperWatcher {
    pub addr_to_watch: String,
}

impl Watcher for MulticallZapperWatcher {
    type Payload = String;
    async fn watch(&self, sx: Sender<Self::Payload>) {
        loop {
            if check_bal(self.addr_to_watch) != 0 {
                sx.send(self.addr_to_watch);
            }
        }
    }
}

中间件

use async_trait::async_trait;
use tokio::sync::mpsc::{Receiver, Sender};

#[async_trait]
pub trait Middleware {
    type Payload;
    async fn process(&self, sx: Sender<Self::Payload>, rx: Receiver<Self::Payload>);
}

Middleware 特性只有一个必须实现的功能,即 process。此异步函数应监听来自 rx 的消息,执行任何必要的处理或过滤,并将消息传递到 sx

这可以用于实现消息的去重。

消费者

use async_trait::async_trait;
use tokio::sync::mpsc::Receiver;

#[async_trait]
pub trait Consumer {
    type Payload;
    async fn consume(&self, mut rx: Receiver<Self::Payload>);
}

Consumer 特性只有一个必须实现的功能,即 consume。此异步函数应监听 rx 上的消息并执行任何必要的操作。

例如,记录接收到的任何消息的消费者

pub struct ListenerLogger{}

#[async_trait]
impl Consumer for ListenerLogger {
    type Payload = String;
    async fn consume(&self, mut rx: Receiver<Self::Payload>) {
        println!("Starting listener");
        while let Some(addr) = rx.recv().await {
            println!("Received address {} in message", addr);
        }
    }
}

整合一切

Executor 结构体处理启动您的观察者、中间件和消费者的执行。您可以通过以下函数与之交互

  • new:创建一个新的执行器对象,无参数
  • add_watcher:期望一个实现了 Watcher 的结构的 Box 实例
  • add_middleware:期望一个实现了 Middleware 的结构的 Box 实例
  • add_consumer:期望一个实现了 Consumer 的结构的 Box 实例
  • start:启动所有添加的监视器、中间件和执行器的执行
// main.rs
use supermon::{Executor}

// ... add struct definitions from examples above


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut executor = Executor::new();
    executor.add_watcher(
        Box::new(SuperWatcher{ addr_to_watch: "0x0000....." })
    );
    executor.set_listener(
        Box::new(ListenerLogger{})
    );
    executor.start().await;
    Ok(())
}

依赖项

~2.7–9.5MB
~75K SLoC