6个版本 (1个稳定版)
1.0.0 | 2023年8月4日 |
---|---|
0.7.2 | 2023年2月24日 |
0.6.5 | 2023年2月3日 |
#503 in 异步
每月下载量 26次
40KB
1K SLoC
rx_rust_mp
ReactiveX API的消息传递实现原型
这只是一个原型,只实现了我在硕士论文中需要的操作符。我创建它是因为发现官方实现已有8年未更新,而rxRust使用的是共享内存模型,这使得流数据的并行计算几乎不可能。
库本身很简单,有一个特质Observable
,它提供了创建每个操作符的实现,并要求实现结构体以实现actual_subscribe
函数。因此,每个实现Observable
的结构体都可以连接到一个流中。
在流声明结束时必须调用subscribe
,提供一个在每次传入值上执行的功能,以及一个用于调度每个任务的池。
此subscribe函数调用上面操作符的actual_subscribe,将其池和mpsc通道的Sender
部分传递给它,直到达到流声明顶部的create或from_iter函数。
每个操作符至少需要存储对上面结构的引用,这样它就可以在流在subscribe上构建时引用它。每个操作符的
actual_subscribe函数的一般工作流程是
- 创建一个mpsc通道,
- 在线程池上调度一个线程,该线程
- 从在
(1)
中创建的通道的接收端读取。 - 对每个传入值执行所需的转换
- 将结果发送到传递给
actual_subscribe
函数的通道
- 从在
- 调用上一个对象的
actual_subscribe
函数,将(1)
中创建的通道的发送端和线程池传递给它
这当然不是一个严格的配方,因为每个操作符都必须做不同的事情。
依赖项
~0.8–14MB
~125K SLoC