#rx #async #reactivex #async-io #combination #rxrust

another-rxrust

相比 rxRust,这是一种更易于在 Rust 中使用 ReactiveX 的不同实现。

36个版本

0.0.46 2024年3月26日
0.0.45 2023年4月22日
0.0.26 2023年3月31日

#208 in 异步

Download history 2/week @ 2024-04-16 13/week @ 2024-04-23 8/week @ 2024-04-30 4/week @ 2024-05-14 43/week @ 2024-05-21 15/week @ 2024-05-28 3/week @ 2024-06-04 145/week @ 2024-07-02 73/week @ 2024-07-09

218 个月下载量

MIT 许可证

220KB
8K SLoC

another-rxrust

为什么需要新的实现?

rxRustReactiveXRust 中的语言实现,但它不适合复杂的 observable 组合。对于在如 rxjs (TypeScript) 或 rxcpp 等其他语言中使用 ReactiveX 的用户来说,rxRust 是一个非常困难的库。

因此,我创建了 another-rxrust,认为我需要一个允许 observable 以与其他平台相同的方式连接的库,并在 Rust 中享受 ReactiveX

此外,如果目的是并行化重型处理并加速它,那么 ReactiveX 可能不是最佳解决方案。然而,对于复杂组合的非阻塞 I/O 和错误处理,ReactiveX 是一个答案。

实现策略

  • 假设可以共享线程之间可能发射的值和函数。
  • 应仅发射满足 Clone + Send + Sync 的值。
  • 尽可能使用 move 发射值。
  • 函数应满足 Fn() + Send + Sync
  • 使用 std::any 进行错误类型擦除。
    • 因此需要 'static 生命周期。
    • 您可以使用 anyhow
  • 优先考虑灵活性而不是内存效率和执行速度。

实现状态

请参阅实现状态

安装

cargo add  another-rxrust

或者

[dependencies]
another-rxrust = {}

如果您想在网页上使用它,请按照以下设置。

[dependencies]
another-rxrust = {features=["web"]}

示例

from_iter, map, zip

use another_rxrust::prelude::*;
use another_rxrust::{print_next_fmt, print_error, print_complete};

fn main() {
  let ob = observables::from_iter(0..10);
  ob.zip(&[ob.map(|x| x + 10), ob.map(|x| x + 20)]).subscribe(
    print_next_fmt!("{:?}"),
    print_error!(),
    print_complete!(),
  );
}
// [console-results]
// next - [0, 10, 20]
// next - [1, 11, 21]
// next - [2, 12, 22]
// next - [3, 13, 23]
// next - [4, 14, 24]
// next - [5, 15, 25]
// next - [6, 16, 26]
// next - [7, 17, 27]
// next - [8, 18, 28]
// next - [9, 19, 29]
// complete

subject, scheduler, take, sample

use another_rxrust::prelude::*;
use std::{thread, time};
use another_rxrust::{print_next_fmt, print_error, print_complete};

fn main() {
  let sbj = subjects::Subject::new();
  observables::interval(
    time::Duration::from_millis(100),
    schedulers::new_thread_scheduler(),
  )
  .sample(sbj.observable())
  .take(3)
  .subscribe(print_next_fmt!("{}"), print_error!(), print_complete!());

  (0..3).for_each(|_| {
    thread::sleep(time::Duration::from_millis(500));
    sbj.next(());
  });
  sbj.complete();
  thread::sleep(time::Duration::from_millis(500));
}
// [console-results (Depends on execution environment)]
// next - 3
// next - 8
// next - 13
// complete

just, error, emptry, never, flat_map...

use another_rxrust::prelude::*;
use std::{thread, time};

fn main() {
  // observable creator function
  fn ob() -> Observable<'static, i32> {
    Observable::create(|s| {
      s.next(100);
      s.next(200);
      s.complete();
    })
  }

  observables::from_iter(1..10)
    .observe_on(schedulers::new_thread_scheduler())
    .flat_map(|x| match x {
      1 => observables::empty(),
      2 => observables::just(x),
      3 => ob().map(move |y| (y + x)),
      4 => observables::error(RxError::from_error("some error")),
      _ => observables::never(),
    })
    .map(|x| format!("{}", x))
    .on_error_resume_next(|e| {
      ob().map(move |x| {
        format!(
          "resume {:?} {}",
          e.downcast_ref::<&str>(),
          x
        )
      })
    })
    .subscribe(
      |x| println!("{}", x),
      |e| println!("{:?}", e.downcast_ref::<&str>()),
      || println!("complete"),
    );

  thread::sleep(time::Duration::from_millis(500));
}
// [console-results]
// next 2
// next 103
// next 203
// next resume "some error" 100
// next resume "some error" 200
// complete

依赖项

~0–2.2MB
~41K SLoC