#事件监听器 # #工作器 #事件 #上下文 #未来 #事件驱动

message_worker

Message Worker 是一个为Rust提供的低级别库,用于使用futures和streams创建事件监听器。值得注意的是,MW支持监听器中的非同步和非发送(即非线程安全)上下文。

6个版本 (破坏性更新)

0.6.0 2023年9月6日
0.5.1 2023年9月5日
0.5.0 2021年6月5日
0.4.0 2021年5月21日
0.1.0 2021年5月20日

#1281 in 异步

每月26次下载

MPL-2.0 许可证

45KB
625

Message Worker (message_worker)

crates.io link crates.io link

Message Worker 是一个用于Rust的库,用于使用futures和streams创建事件监听器。值得注意的是,Message Worker 支持监听器中的非同步和非发送(即非线程安全)上下文。

请参阅文档以获取示例和更多信息。


lib.rs:

Message Worker 是一个用于Rust的库,用于使用futures和streams创建事件监听器。值得注意的是,Message Worker 支持监听器中的非同步和非发送(即非线程安全)上下文。

这是一个相当低级的库,可用于构建各种流处理和事件驱动系统。它甚至可以用来构建actor系统!

必须在一个tokio 运行时中使用此库。

简而言之,如果你需要一个异步接收消息/事件的worker并针对每个接收到的消息执行某些操作...这就是你的库!这里的关键函数是message_worker::[]blocking::listen(stream, || ctx, handler)

第一个参数是一个 Stream。Stream基本上是异步迭代器,可以从许多不同的事物中创建,包括mpsc/broadcast通道。

第二个参数是一个闭包,用于为工作线程创建“上下文”。本质上,这是你希望工作线程能够访问的任何状态。对于非阻塞工作线程,通常最好使用不可变数据结构,例如来自 im 的数据结构,如果你需要修改状态。对于阻塞工作线程,你可以简单地将你的状态包裹在一个 RefCell 中。

第三个参数是处理器,这里是魔法发生的地方。处理器是你声明的具有以下签名的函数的名称:fn(ctx: Arc/Rc<Context>, msg: MessageType) -> Result<Option<Context>, Err>。如果返回错误,工作线程的错误处理器将运行。如果返回 Ok(None),工作线程将按原样继续运行。如果返回 Ok(context),工作线程将继续运行,但下次运行时将使用返回值中的新上下文。

示例

打印机

use message_worker::non_blocking::listen;
use message_worker::{empty_ctx, EmptyCtx};
use std::sync::Arc;
use anyhow::Result;

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
    // Create our stream
    let source = tokio_stream::iter(vec![42, 0xff6900, 1337]);

    // Create a listener that prints out each item in the stream
    async fn on_item(_ctx: EmptyCtx, event: usize) -> Result<Option<EmptyCtx>> {
        eprintln!("{}", event);
        Ok(None)
    }

    // Start listening
    listen(source, empty_ctx, on_item).await.unwrap();

    /* Prints:
       42
       0xff6900
       1337
    */
})

双向通信

use message_worker::non_blocking::listen;
use std::sync::Arc;
use anyhow::Result;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
    #[derive(Clone)]
    #[repr(transparent)]
    struct BiCtx { output: tokio::sync::mpsc::Sender<usize> }

    // Create our stream
    let source = tokio_stream::iter(vec![42, 0xff6900, 1337]);

    // Create a listener that outputs each item in the stream multiplied by two
    async fn on_item(ctx: BiCtx, event: usize) -> Result<Option<BiCtx>> {
        ctx.output.send(event * 2).await?; // Send the output
        Ok(None)
    }

    // Connect the number stream to `on_item`
    let (tx, rx) = tokio::sync::mpsc::channel::<usize>(3);
    listen(source, move || BiCtx {
        output: tx
    }, on_item);

    let mut  rx = ReceiverStream::new(rx);
    assert_eq!(rx.next().await, Some(84));
    assert_eq!(rx.next().await, Some(0x1fed200));
    assert_eq!(rx.next().await, Some(2674));
})

乒乓(Actor)

use message_worker::non_blocking::listen;
use std::sync::Arc;
use anyhow::{Result, bail, anyhow};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    #[derive(Clone)]
    #[repr(transparent)]
    struct ActorCtx { output: tokio::sync::broadcast::Sender<Message> }

    // Create our messages
    #[derive(Debug, Copy, Clone, Eq, PartialEq)]
    enum Message { Ping, Pong }


    // Create the ping actor
    async fn ping_actor(ctx: ActorCtx, event: Message) -> Result<Option<ActorCtx>> {
        match event {
            Message::Ping => bail!("I'm meant to be the pinger!"),
            Message::Pong => ctx.output.send(Message::Ping).map_err(|err| anyhow!(err))?
        };
        Ok(None)
    }

    // Create the pong actor
    async fn pong_actor(ctx: ActorCtx, event: Message) -> Result<Option<ActorCtx>> {
        match event {
            Message::Ping => ctx.output.send(Message::Pong).map_err(|err| anyhow!(err))?,
            Message::Pong => bail!("I'm meant to be the ponger!")
        };
        Ok(None)
    }

    // Create our initial stream
    let initial_ping = tokio_stream::iter(vec![Message::Ping]);

    // Connect everything together
    let (tx_ping, rx_ping) = tokio::sync::broadcast::channel::<Message>(2);
    let (tx_pong, rx_pong) = tokio::sync::broadcast::channel::<Message>(2);
    let mut watch_pongs = BroadcastStream::new(tx_ping.clone().subscribe())
        .filter(|msg| msg.is_ok())
        .map(|msg| msg.unwrap());
    let mut watch_pings = BroadcastStream::new(tx_pong.clone().subscribe())
        .filter(|msg| msg.is_ok())
        .map(|msg| msg.unwrap());

    // Start the ping actor
    listen(
        BroadcastStream::new(rx_ping)
            .filter(|msg| msg.is_ok())
            .map(|msg| msg.unwrap()),
        move || ActorCtx { output: tx_pong },
        ping_actor
    );

    // Start the pong actor
    listen(
        initial_ping.chain(BroadcastStream::new(rx_pong)
            .filter(|msg| msg.is_ok())
            .map(|msg| msg.unwrap())),
        move || ActorCtx { output: tx_ping },
        pong_actor
    );

    assert_eq!(watch_pings.next().await, Some(Message::Ping));
    assert_eq!(watch_pongs.next().await, Some(Message::Pong));
    assert_eq!(watch_pings.next().await, Some(Message::Ping));
    assert_eq!(watch_pongs.next().await, Some(Message::Pong));
}

疯狂示例(在事件监听器中通过 Deno 调用 V8 的 C++ 以运行 JS)

use message_worker::blocking::listen;
use deno_core::{JsRuntime, RuntimeOptions};
use std::rc::Rc;
use std::cell::RefCell;
use anyhow::Result;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
    struct Context {
        test_res: tokio::sync::mpsc::Sender<()>,
        runtime: JsRuntime
    }

    let (mut tx, rx) = tokio::sync::mpsc::channel::<()>(1);
    let stream = ReceiverStream::new(rx);

    let (test_res_tx, mut test_res) = {
        let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
        (tx, ReceiverStream::new(rx))
    };

    async fn mock_handle(ctx: Rc<RefCell<Context>>, _event: ()) -> Result<Option<Rc<RefCell<Context>>>> {
        let mut ctx = (*ctx).borrow_mut();
        let runtime = &mut ctx.runtime;

        runtime.execute_script_static(
            "<test>",
            r#"Deno.core.print(`Got a message!\n`);"#
        )?;
        runtime.run_event_loop(false).await?;

        ctx.test_res.send(()).await?;
        Ok(None)
    }

    listen(stream, move || {
        let runtime: JsRuntime = {
            let tokio_rt = tokio::runtime::Handle::current();
            tokio_rt.block_on(async {
                let local = tokio::task::LocalSet::new();
                local.run_until(async {
                    let mut runtime = JsRuntime::new(RuntimeOptions {
                        module_loader: Some(Rc::new(deno_core::FsModuleLoader)),
                        ..RuntimeOptions::default()
                    });

                    runtime.execute_script_static(
                        "<test>",
                        r#"Deno.core.print(`Starting up the JS runtime via C++ FFI and Deno 🤯\n`);"#
                    ).unwrap();
                    runtime.run_event_loop(false).await.unwrap();

                    runtime
                }).await
            })
        };

        Rc::new(RefCell::new(Context {
            test_res: test_res_tx,
            runtime
        }))
    }, mock_handle);
    tx.send(()).await.unwrap();

    /* Prints:
       Starting up the JS runtime via C++ FFI and Deno 🤯
       Got a message!
    */
    assert_eq!(test_res.next().await, Some(()));
})

依赖项

~3–9.5MB
~69K SLoC