#rx #message-passing #frp #data-stream #reactivex #mp

rx_rust_mp

使用消息传递方法实现的ReactiveX API的原型实现

6个版本 (1个稳定版)

1.0.0 2023年8月4日
0.7.2 2023年2月24日
0.6.5 2023年2月3日

#503 in 异步

每月下载量 26次

MIT许可

40KB
1K SLoC

rx_rust_mp

ReactiveX API的消息传递实现原型

这只是一个原型,只实现了我在硕士论文中需要的操作符。我创建它是因为发现官方实现已有8年未更新,而rxRust使用的是共享内存模型,这使得流数据的并行计算几乎不可能。

库本身很简单,有一个特质Observable,它提供了创建每个操作符的实现,并要求实现结构体以实现actual_subscribe函数。因此,每个实现Observable的结构体都可以连接到一个流中。
在流声明结束时必须调用subscribe,提供一个在每次传入值上执行的功能,以及一个用于调度每个任务的池。
subscribe函数调用上面操作符的actual_subscribe,将其池和mpsc通道的Sender部分传递给它,直到达到流声明顶部的createfrom_iter函数。

每个操作符至少需要存储对上面结构的引用,这样它就可以在流在subscribe上构建时引用它。每个操作符的actual_subscribe函数的一般工作流程是

  1. 创建一个mpsc通道,
  2. 在线程池上调度一个线程,该线程
    1. 从在(1)中创建的通道的接收端读取。
    2. 对每个传入值执行所需的转换
    3. 将结果发送到传递给actual_subscribe函数的通道
  3. 调用上一个对象的actual_subscribe函数,将(1)中创建的通道的发送端和线程池传递给它

这当然不是一个严格的配方,因为每个操作符都必须做不同的事情。

依赖项

~0.8–14MB
~125K SLoC