#rx #frp #reactivex #extension

rxrust

响应式扩展的 Rust 实现

33 个版本 (14 个重大更新)

1.0.0-beta.82024年6月5日
1.0.0-beta.72023年12月27日
1.0.0-beta.62023年10月21日
1.0.0-beta.42023年6月10日
0.1.0 2019年7月31日

异步 类别中排名 58

Download history 136/week @ 2024-05-02 88/week @ 2024-05-09 41/week @ 2024-05-16 48/week @ 2024-05-23 175/week @ 2024-05-30 128/week @ 2024-06-06 72/week @ 2024-06-13 61/week @ 2024-06-20 27/week @ 2024-06-27 18/week @ 2024-07-04 56/week @ 2024-07-11 26/week @ 2024-07-18 176/week @ 2024-07-25 91/week @ 2024-08-01 69/week @ 2024-08-08 44/week @ 2024-08-15

每月下载量 381
6 库(直接使用 3 个)使用

MIT 许可协议

350KB
11K SLoC

rxRust: 响应式扩展的 Rust 实现

codecov

用法

将此添加到您的 Cargo.toml 中

[dependencies]
rxrust = "1.0.0-beta.0"

示例

use rxrust:: prelude::*;

let mut numbers = observable::from_iter(0..10);
// create an even stream by filter
let even = numbers.clone().filter(|v| v % 2 == 0);
// create an odd stream by filter
let odd = numbers.clone().filter(|v| v % 2 != 0);

// merge odd and even stream again
even.merge(odd).subscribe(|v| print!("{} ", v, ));
// "0 2 4 6 8 1 3 5 7 9" will be printed.

克隆流

rxrust 中,几乎所有扩展都会消耗上游。所以当您尝试两次订阅一个流时,编译器会报错。

 # use rxrust::prelude::*;
 let o = observable::from_iter(0..10);
 o.subscribe(|_| println!("consume in first"));
 o.subscribe(|_| println!("consume in second"));

在这种情况下,我们必须克隆流。

 # use rxrust::prelude::*;
 let o = observable::from_iter(0..10);
 o.clone().subscribe(|_| println!("consume in first"));
 o.clone().subscribe(|_| println!("consume in second"));

如果您想共享相同的可观察对象,可以使用 Subject

调度器

rxrust 使用 Future 的运行时作为调度器,LocalPoolThreadPoolfutures::executor 中可以直接用作调度器,并且 tokio::runtime::Runtime 也受支持,但需要启用 futures-scheduler 功能。在 Scheduler 中实现自定义 Scheduler。一些可观察操作(如 delaydebounce)需要延迟能力,当使用 timer 功能时,futures-time 支持这种能力,但您也可以通过设置新的 timer 函数为 NEW_TIMER_FN 变体并移除 timer 功能来自定义它。

use rxrust::prelude::*;

// `FuturesThreadPoolScheduler` is the alias of `futures::executor::ThreadPool`.
let threads_scheduler = FuturesThreadPoolScheduler::new().unwrap();

observable::from_iter(0..10)
  .subscribe_on(threads_scheduler.clone())
  .map(|v| v*2)
  .observe_on_threads(threads_scheduler)
  .subscribe(|v| println!("{},", v));

此外,rxrust 通过启用 wasm-scheduler 功能并使用 wasm-bindgen 包支持 WebAssembly。一个简单的示例在这里:这里

从 Future 转换

只需使用 observable::from_futureFuture 转换为可观察序列。

use rxrust::prelude::*;

let mut scheduler_pool = FuturesLocalSchedulerPool::new();
observable::from_future(std::future::ready(1), scheduler_pool.spawner())
  .subscribe(move |v| println!("subscribed with {}", v));

// Wait `task` finish.
scheduler_pool.run();

还提供了 from_future_result 函数来传播来自 `Future`` 的错误。

缺失的功能列表

有关 rxRust 还未实现的功能信息,请参阅 缺失的功能

欢迎所有贡献

我们正在寻找贡献者!请随意提出问题、建议功能或其他事项的 issue!

帮助和贡献可以是以下任何一种

  • 使用项目并在项目问题页面报告问题
  • 文档和README增强(非常重要)
  • 在CI管道中持续改进
  • 实现任何未实现的操作符,记得在开始编写代码之前创建一个pull request,这样其他人就知道你在进行工作。

您可以通过timer功能启用默认计时器,或者通过new_timer_fn在函数中设置计时器

依赖项

~0.7–16MB
~157K SLoC