36个版本
0.0.46 | 2024年3月26日 |
---|---|
0.0.45 | 2023年4月22日 |
0.0.26 | 2023年3月31日 |
#208 in 异步
218 个月下载量
220KB
8K SLoC
another-rxrust
为什么需要新的实现?
rxRust
是 ReactiveX
在 Rust
中的语言实现,但它不适合复杂的 observable
组合。对于在如 rxjs (TypeScript) 或
rxcpp
等其他语言中使用 ReactiveX
的用户来说,rxRust
是一个非常困难的库。
因此,我创建了 another-rxrust
,认为我需要一个允许 observable
以与其他平台相同的方式连接的库,并在 Rust
中享受 ReactiveX
。
此外,如果目的是并行化重型处理并加速它,那么 ReactiveX
可能不是最佳解决方案。然而,对于复杂组合的非阻塞 I/O 和错误处理,ReactiveX
是一个答案。
实现策略
- 假设可以共享线程之间可能发射的值和函数。
- 应仅发射满足
Clone + Send + Sync
的值。 - 尽可能使用
move
发射值。 - 函数应满足
Fn() + Send + Sync
。 - 使用
std::any
进行错误类型擦除。- 因此需要
'static
生命周期。 - 您可以使用
anyhow
。
- 因此需要
- 优先考虑灵活性而不是内存效率和执行速度。
实现状态
请参阅实现状态。
安装
cargo add another-rxrust
或者
[dependencies]
another-rxrust = {}
如果您想在网页上使用它,请按照以下设置。
[dependencies]
another-rxrust = {features=["web"]}
示例
from_iter, map, zip
use another_rxrust::prelude::*;
use another_rxrust::{print_next_fmt, print_error, print_complete};
fn main() {
let ob = observables::from_iter(0..10);
ob.zip(&[ob.map(|x| x + 10), ob.map(|x| x + 20)]).subscribe(
print_next_fmt!("{:?}"),
print_error!(),
print_complete!(),
);
}
// [console-results]
// next - [0, 10, 20]
// next - [1, 11, 21]
// next - [2, 12, 22]
// next - [3, 13, 23]
// next - [4, 14, 24]
// next - [5, 15, 25]
// next - [6, 16, 26]
// next - [7, 17, 27]
// next - [8, 18, 28]
// next - [9, 19, 29]
// complete
subject, scheduler, take, sample
use another_rxrust::prelude::*;
use std::{thread, time};
use another_rxrust::{print_next_fmt, print_error, print_complete};
fn main() {
let sbj = subjects::Subject::new();
observables::interval(
time::Duration::from_millis(100),
schedulers::new_thread_scheduler(),
)
.sample(sbj.observable())
.take(3)
.subscribe(print_next_fmt!("{}"), print_error!(), print_complete!());
(0..3).for_each(|_| {
thread::sleep(time::Duration::from_millis(500));
sbj.next(());
});
sbj.complete();
thread::sleep(time::Duration::from_millis(500));
}
// [console-results (Depends on execution environment)]
// next - 3
// next - 8
// next - 13
// complete
just, error, emptry, never, flat_map...
use another_rxrust::prelude::*;
use std::{thread, time};
fn main() {
// observable creator function
fn ob() -> Observable<'static, i32> {
Observable::create(|s| {
s.next(100);
s.next(200);
s.complete();
})
}
observables::from_iter(1..10)
.observe_on(schedulers::new_thread_scheduler())
.flat_map(|x| match x {
1 => observables::empty(),
2 => observables::just(x),
3 => ob().map(move |y| (y + x)),
4 => observables::error(RxError::from_error("some error")),
_ => observables::never(),
})
.map(|x| format!("{}", x))
.on_error_resume_next(|e| {
ob().map(move |x| {
format!(
"resume {:?} {}",
e.downcast_ref::<&str>(),
x
)
})
})
.subscribe(
|x| println!("{}", x),
|e| println!("{:?}", e.downcast_ref::<&str>()),
|| println!("complete"),
);
thread::sleep(time::Duration::from_millis(500));
}
// [console-results]
// next 2
// next 103
// next 203
// next resume "some error" 100
// next resume "some error" 200
// complete
依赖项
~0–2.2MB
~41K SLoC