#事件驱动 #微服务 #异步流 #响应式 #异步 #性能 #生产者-消费者

reactive-mutiny

具有高级和优化容器(通道)和流执行器的异步事件驱动响应式库

28 个稳定版本

1.1.26 2024 年 3 月 3 日
1.1.25 2024 年 1 月 8 日
1.1.24 2023 年 11 月 17 日
1.1.23 2023 年 8 月 26 日
0.1.1 2023 年 5 月 30 日

#90异步

Download history 5/week @ 2024-05-21 3/week @ 2024-05-28

1,078 每月下载量
用于 reactive-messaging

无许可证

4.5MB
10K SLoC

reactive-mutiny 库

reactive-mutiny GitHub Actions reactive-mutiny on crates.io reactive-mutiny on docs.rs

异步和零成本抽象事件驱动响应式库,适用于 Rust,具有高级和优化容器和流执行器

浏览 文档

Rust 的 reactive-mutiny 被设计成允许构建高效的、优雅的异步事件处理管道(使用 Streams,也称为“异步迭代器”),简化灵活且解耦的微服务架构(分布式或不分布式),适用于生产环境。

该库的核心由一个 Uni 和一个 Multi 组成——这就是为什么叫“Mutiny”。两者都处理事件流

  • Uni 允许每个生成的有效载荷有 单个监听器或多个消费者 – 也可定义为一个 允许单个事件处理管道
  • Multi 允许每个生成的有效载荷有 多个监听器和多个消费者,允许 多个事件处理管道 – 或者,用 Kafka 的话说,允许 多个消费者组
  • Multi 可以做 Uni 能做的,但后者做得更快——因此,证明其存在的合理性:Uni 不使用任何有效载荷引用计数,并且使用单个队列/通道(而 Multi 需要和监听器一样多的数量)。

此外,还提供了关于指标、日志和重试的零成本抽象,如果未使用,将按指定的初始化选项和功能更深入的 API 选项进行优化。

在以下摘录中品尝该库

    use reactive_mutiny::prelude::*;

    fn logic_1(events_stream: impl Stream<Item=InputEventType>) -> impl Stream<Item=OutputEventType> {
        // your logic goes here using Rust's Stream / Iterator functions
    }

    fn main() {
        // build the event processing pipeline
        let events_handle = UniZeroCopy::<InputEventType, 1024, 1>::new()
        .spawn_non_futures_non_fallible_executor("Consumer of InputEventType and issiuer of OutputEventType",
                                                 |events_stream| {
                                                     logic_2(logic_1(events_stream))
                                                         .inspect(|outgoing_event| send(outgoing_event))
                                                 },
                                                 |_executor| async { /* on-close logic */ });

    }

    // see more details in examples/uni-microservice

核心组件

  1. 一组通道,通过这些通道将事件从生产者发送到消费者——所有都是无上下文切换的(又称“无锁”)——包括基于零拷贝和 mmap 的日志实现;
  2. 适用于所有可能的组合(Future/非Future & 可失败/不可失败事件类型)的通用Stream执行器,具有在每个事件解决其Future时强制执行或取消强制执行超时的选项。API经过精心设计,以便编译器可以完全优化一切:大多数情况下,所有反应性代码都最终在执行器中,整个Multi / Uni抽象都被置零;
  3. 用于性能和操作可见性的仪表和度量收集器(如前所述,作为一个零成本抽象);
  4. 主要的MultiUni对象,以及一组将通道和分配器绑定在一起的前置类型别名。
  5. 基于常量池的分配器,用于卓越的性能和灵活性——参见AtomicZeroCopy通道基准测试;

注意:这个crate相当新(不到1年),但正在积极维护并在生产中使用:没有已知的错误(并且MIRI说我们没问题),速度惊人,API已经彻底测试和审查,并且是稳定的,但改进的文档和代码清理/重构将逐步解决以改善外观。无论如何,进化总是由社区反馈驱动的

MIRI:截至2023年6月14日,此crate的并非所有部分都可以用MIRI进行测试:“epoll_wait的准备好的事件尚未实现”;“mmap系统调用”和其他一些功能在MIRI中不可用——但可以测试的部分都能通过。

性能

此crate非常松散地受到Java的SmallRye的Mutiny库的启发,从中借用了一些名称。由于语言的本地函数式方法、强大的错误处理、异步支持和令人惊叹且灵活的Iterator/Stream API,将相同的功能带到Rust中几乎不需要做什么,因此这项工作的重点放在将事件带到最大的处理速度和操作能力上:创建了特殊的队列、主题、堆栈、通道和Stream执行器,提供了比Rust的本地和社区版本更好的性能——请检查benches文件夹以获取详细信息

reactive-mutiny的通道延迟 reactive-mutiny的通道吞吐量 标准/社区与我们的提供的从线程到线程的原始负载发送器的性能特性

reactive-mutiny的分配器和类型包装器 标准与我们的提供的类型包装器和分配器性能特性比较,用于零拷贝通道——以原始memcopy和分配器为基础

下一步是什么

文档仍将得到改进。同时,建议新用户按照以下顺序使用此crate

  1. 查看examples/
  2. 研究reactive-mutiny::prelude::advanced::*中的类型别名——在这个时候,可以放心地相信文档将提供你需要的一切;
  3. 对于高级使用示例,请检查reactive-messaging crate——特别是,反应抽象如何轻松解耦,使得升级处理器变得容易,该处理器不返回任何答案,到一个返回答案的处理器。

比较

如果你熟悉SmallRye的Mutiny,以下是一些主要区别:

  • 我们这里的UniMulti都处理事件流。在原始库中,Uni像单个“异步future”,由于我们在Rust中不需要这个,因此名称被重新用途:其他Multi是我们的Uni(在“订阅”使用时也可以作为我们的Multi),另一个Uni可以通过仅使用Rust的异步调用并处理任何Result<>来获得,用于错误处理;
  • 管道中输入的每个事件都会被执行,无论最终是否有答案;此外,没有“订阅”(通过将管道添加到Multi来实现订阅语义);
  • 执行器及其设置是在生产者/管道对形成时设置(当创建Uni / Multi对象时):管道中不需要调用.merge()或.executeAt();
  • 没有Multi/Uni管道转换及其相应的众多函数——它们根本不需要;
  • 管道中没有设置超时——这是执行器的事情,执行器会简单地取消持续时间超过配置执行器最大值的(SmallRye的Uni超时可以通过使用Tokio的“futures”超时来实现,就像对任何异步函数调用所做的那样);
  • 速度惊人:Rust的编译器使您的管道(以及这个库的大部分)在发布模式下表现出零成本抽象。使用的并滥用常量泛型在这个优化中发挥了重要作用——以牺牲我们在reactive_mutiny::prelude::advanced中的相当复杂的类型定义为代价。
  • 要完全获得原始Mutiny的行为,您将不得不使用:
    • Rust的reactive-mutiny(用于反应式异步事件处理);
    • Tokio(从Futures获取响应,并指定异步调用、异步休眠等异步调用中的超时——为这个crate节省了大量API);
    • 流(在这里我们不混合Multi & Stream & Iterator功能——这在实践中会导致对原始Java库抽象的低效滥用——在一个可以用来Stream或Iterator的地方使用Multi的新实例是一个常见的错误实践/反模式);

依赖项

~16–23MB
~141K SLoC