28 个稳定版本
1.1.26 | 2024 年 3 月 3 日 |
---|---|
1.1.25 | 2024 年 1 月 8 日 |
1.1.24 | 2023 年 11 月 17 日 |
1.1.23 | 2023 年 8 月 26 日 |
0.1.1 | 2023 年 5 月 30 日 |
#90 在 异步 中
1,078 每月下载量
用于 reactive-messaging
4.5MB
10K SLoC
reactive-mutiny 库
异步和零成本抽象事件驱动响应式库,适用于 Rust,具有高级和优化容器和流执行器
浏览 文档。
Rust 的 reactive-mutiny
被设计成允许构建高效的、优雅的异步事件处理管道(使用 Streams,也称为“异步迭代器”),简化灵活且解耦的微服务架构(分布式或不分布式),适用于生产环境。
该库的核心由一个 Uni
和一个 Multi
组成——这就是为什么叫“Mutiny”。两者都处理事件流
Uni
允许每个生成的有效载荷有 单个监听器或多个消费者 – 也可定义为一个 允许单个事件处理管道;Multi
允许每个生成的有效载荷有 多个监听器和多个消费者,允许 多个事件处理管道 – 或者,用 Kafka 的话说,允许 多个消费者组Multi
可以做Uni
能做的,但后者做得更快——因此,证明其存在的合理性:Uni
不使用任何有效载荷引用计数,并且使用单个队列/通道(而Multi
需要和监听器一样多的数量)。
此外,还提供了关于指标、日志和重试的零成本抽象,如果未使用,将按指定的初始化选项和功能更深入的 API 选项进行优化。
在以下摘录中品尝该库
use reactive_mutiny::prelude::*;
fn logic_1(events_stream: impl Stream<Item=InputEventType>) -> impl Stream<Item=OutputEventType> {
// your logic goes here using Rust's Stream / Iterator functions
}
fn main() {
// build the event processing pipeline
let events_handle = UniZeroCopy::<InputEventType, 1024, 1>::new()
.spawn_non_futures_non_fallible_executor("Consumer of InputEventType and issiuer of OutputEventType",
|events_stream| {
logic_2(logic_1(events_stream))
.inspect(|outgoing_event| send(outgoing_event))
},
|_executor| async { /* on-close logic */ });
}
// see more details in examples/uni-microservice
核心组件
- 一组通道,通过这些通道将事件从生产者发送到消费者——所有都是无上下文切换的(又称“无锁”)——包括基于零拷贝和 mmap 的日志实现;
- 适用于所有可能的组合(Future/非Future & 可失败/不可失败事件类型)的通用
Stream
执行器,具有在每个事件解决其Future
时强制执行或取消强制执行超时的选项。API经过精心设计,以便编译器可以完全优化一切:大多数情况下,所有反应性代码都最终在执行器中,整个Multi / Uni抽象都被置零; - 用于性能和操作可见性的仪表和度量收集器(如前所述,作为一个零成本抽象);
- 主要的
Multi
和Uni
对象,以及一组将通道和分配器绑定在一起的前置类型别名。 - 基于常量池的分配器,用于卓越的性能和灵活性——参见
AtomicZeroCopy
通道基准测试;
注意:这个crate相当新(不到1年),但正在积极维护并在生产中使用:没有已知的错误(并且MIRI说我们没问题),速度惊人,API已经彻底测试和审查,并且是稳定的,但改进的文档和代码清理/重构将逐步解决以改善外观。无论如何,进化总是由社区反馈驱动的
MIRI:截至2023年6月14日,此crate的并非所有部分都可以用MIRI进行测试:“epoll_wait的准备好的事件尚未实现”;“mmap系统调用”和其他一些功能在MIRI中不可用——但可以测试的部分都能通过。
性能
此crate非常松散地受到Java的SmallRye的Mutiny库的启发,从中借用了一些名称。由于语言的本地函数式方法、强大的错误处理、异步支持和令人惊叹且灵活的Iterator/Stream API,将相同的功能带到Rust中几乎不需要做什么,因此这项工作的重点放在将事件带到最大的处理速度和操作能力上:创建了特殊的队列、主题、堆栈、通道和Stream执行器,提供了比Rust的本地和社区版本更好的性能——请检查benches
文件夹以获取详细信息
标准/社区与我们的提供的从线程到线程的原始负载发送器的性能特性
标准与我们的提供的类型包装器和分配器性能特性比较,用于零拷贝通道——以原始
memcopy
和分配器为基础
下一步是什么
文档仍将得到改进。同时,建议新用户按照以下顺序使用此crate
- 查看
examples/
; - 研究
reactive-mutiny::prelude::advanced::*
中的类型别名——在这个时候,可以放心地相信文档将提供你需要的一切; - 对于高级使用示例,请检查
reactive-messaging
crate——特别是,反应抽象如何轻松解耦,使得升级处理器变得容易,该处理器不返回任何答案,到一个返回答案的处理器。
比较
如果你熟悉SmallRye的Mutiny,以下是一些主要区别:
- 我们这里的
Uni
和Multi
都处理事件流。在原始库中,Uni像单个“异步future”,由于我们在Rust中不需要这个,因此名称被重新用途:其他Multi是我们的Uni
(在“订阅”使用时也可以作为我们的Multi
),另一个Uni可以通过仅使用Rust的异步调用并处理任何Result<>
来获得,用于错误处理; - 管道中输入的每个事件都会被执行,无论最终是否有答案;此外,没有“订阅”(通过将管道添加到
Multi
来实现订阅语义); - 执行器及其设置是在生产者/管道对形成时设置(当创建
Uni
/Multi
对象时):管道中不需要调用.merge()或.executeAt(); - 没有Multi/Uni管道转换及其相应的众多函数——它们根本不需要;
- 管道中没有设置超时——这是执行器的事情,执行器会简单地取消持续时间超过配置执行器最大值的(SmallRye的Uni超时可以通过使用Tokio的“futures”超时来实现,就像对任何异步函数调用所做的那样);
- 速度惊人:Rust的编译器使您的管道(以及这个库的大部分)在发布模式下表现出零成本抽象。使用的并滥用常量泛型在这个优化中发挥了重要作用——以牺牲我们在
reactive_mutiny::prelude::advanced
中的相当复杂的类型定义为代价。 - 要完全获得原始Mutiny的行为,您将不得不使用:
- Rust的
reactive-mutiny
(用于反应式异步事件处理); Tokio
(从Futures获取响应,并指定异步调用、异步休眠等异步调用中的超时——为这个crate节省了大量API);- 流(在这里我们不混合Multi & Stream & Iterator功能——这在实践中会导致对原始Java库抽象的低效滥用——在一个可以用来Stream或Iterator的地方使用Multi的新实例是一个常见的错误实践/反模式);
- Rust的
依赖项
~16–23MB
~141K SLoC