33 个版本 (14 个重大更新)
1.0.0-beta.8 | 2024年6月5日 |
---|---|
1.0.0-beta.7 | 2023年12月27日 |
1.0.0-beta.6 | 2023年10月21日 |
1.0.0-beta.4 | 2023年6月10日 |
0.1.0 | 2019年7月31日 |
在 异步 类别中排名 58
每月下载量 381
被 6 个 库(直接使用 3 个)使用
350KB
11K SLoC
rxRust: 响应式扩展的 Rust 实现
用法
将此添加到您的 Cargo.toml 中
[dependencies]
rxrust = "1.0.0-beta.0"
示例
use rxrust:: prelude::*;
let mut numbers = observable::from_iter(0..10);
// create an even stream by filter
let even = numbers.clone().filter(|v| v % 2 == 0);
// create an odd stream by filter
let odd = numbers.clone().filter(|v| v % 2 != 0);
// merge odd and even stream again
even.merge(odd).subscribe(|v| print!("{} ", v, ));
// "0 2 4 6 8 1 3 5 7 9" will be printed.
克隆流
在 rxrust
中,几乎所有扩展都会消耗上游。所以当您尝试两次订阅一个流时,编译器会报错。
# use rxrust::prelude::*;
let o = observable::from_iter(0..10);
o.subscribe(|_| println!("consume in first"));
o.subscribe(|_| println!("consume in second"));
在这种情况下,我们必须克隆流。
# use rxrust::prelude::*;
let o = observable::from_iter(0..10);
o.clone().subscribe(|_| println!("consume in first"));
o.clone().subscribe(|_| println!("consume in second"));
如果您想共享相同的可观察对象,可以使用 Subject
。
调度器
rxrust
使用 Future
的运行时作为调度器,LocalPool
和 ThreadPool
在 futures::executor
中可以直接用作调度器,并且 tokio::runtime::Runtime
也受支持,但需要启用 futures-scheduler
功能。在 Scheduler
中实现自定义 Scheduler
。一些可观察操作(如 delay
和 debounce
)需要延迟能力,当使用 timer
功能时,futures-time 支持这种能力,但您也可以通过设置新的 timer
函数为 NEW_TIMER_FN
变体并移除 timer
功能来自定义它。
use rxrust::prelude::*;
// `FuturesThreadPoolScheduler` is the alias of `futures::executor::ThreadPool`.
let threads_scheduler = FuturesThreadPoolScheduler::new().unwrap();
observable::from_iter(0..10)
.subscribe_on(threads_scheduler.clone())
.map(|v| v*2)
.observe_on_threads(threads_scheduler)
.subscribe(|v| println!("{},", v));
此外,rxrust
通过启用 wasm-scheduler
功能并使用 wasm-bindgen
包支持 WebAssembly。一个简单的示例在这里:这里。
从 Future 转换
只需使用 observable::from_future
将 Future
转换为可观察序列。
use rxrust::prelude::*;
let mut scheduler_pool = FuturesLocalSchedulerPool::new();
observable::from_future(std::future::ready(1), scheduler_pool.spawner())
.subscribe(move |v| println!("subscribed with {}", v));
// Wait `task` finish.
scheduler_pool.run();
还提供了 from_future_result
函数来传播来自 `Future`` 的错误。
缺失的功能列表
有关 rxRust 还未实现的功能信息,请参阅 缺失的功能。
欢迎所有贡献
我们正在寻找贡献者!请随意提出问题、建议功能或其他事项的 issue!
帮助和贡献可以是以下任何一种
- 使用项目并在项目问题页面报告问题
- 文档和README增强(非常重要)
- 在CI管道中持续改进
- 实现任何未实现的操作符,记得在开始编写代码之前创建一个pull request,这样其他人就知道你在进行工作。
您可以通过timer
功能启用默认计时器,或者通过new_timer_fn
在函数中设置计时器
依赖项
~0.7–16MB
~157K SLoC