#r-socket #flux #mono #wasm-rs #stream #value

wasmrs-rx

WasmRS RSocket 协议的基本主机和客户端实现

19 个版本 (破坏性)

0.17.0 2023年10月9日
0.15.0 2023年8月17日
0.14.0 2023年7月26日
0.8.0 2023年3月22日

#422 in WebAssembly

Download history 161/week @ 2024-03-24 190/week @ 2024-03-31 91/week @ 2024-04-07 95/week @ 2024-04-14 851/week @ 2024-04-21 118/week @ 2024-04-28 85/week @ 2024-05-05 85/week @ 2024-05-12 76/week @ 2024-05-19 111/week @ 2024-05-26 78/week @ 2024-06-02 47/week @ 2024-06-09 85/week @ 2024-06-16 90/week @ 2024-06-23 19/week @ 2024-06-30 43/week @ 2024-07-07

每月下载量 238 次
用于 35 个 crate (直接使用 12 个)

Apache-2.0

52KB
1.5K SLoC

wasmrs-rx

WasmRS-RX 是专为在 wasmrs(WebAssembly RSocket 实现)中使用而设计的 Rust 中 rx-like 功能的简单实现。

注意

RX 和响应式流围绕可观察的概念。本项目选择保留 Flux/Mono 术语,以与其他 RSocket 实现保持一致。

用法

Mono 是单个值,而 Flux 是任意数量的值。它们分别类似于 Futures 和 Streams。在此实现中,每个值要么是成功,要么是失败,这使得 wasmrs-rx 的 MonoFlux 感觉像异步的 ResultResult 流。

Mono 可以使用单个成功或失败值实例化:

let mono = Mono::<_, Error>::new_success(100);

let result = mono.await?;

println!("{}", result);

它也可以从 future 创建

let mono = Mono::<_, Error>::from_future(async move { Ok(101) });

let result = mono.await?;

println!("{}", result);

或者,可以创建一个 Mono 并稍后完成它

let mut mono = Mono::<u32, Error>::new();

mono.success(100);

let result = mono.await?;

println!("{}", result);

Flux

Flux 是一个打包在一起的流/通道。您可以向其推送、完成它并等待它

let mut flux = FluxChannel::<_, Error>::new();

flux.send(100)?;
flux.send(101)?;
flux.send(102)?;
flux.complete();

while let Some(payload) = flux.next().await {
  println!("{}", payload?);
}

您可以将接收部分取出来,并将发送/接收分割成其他通道的方式

let flux = FluxChannel::<_, Error>::new();
let mut rx = flux.take_rx()?;

let task = tokio::spawn(async move {
  sleep(Duration::from_millis(500)).await;
  flux.send(100).unwrap();
  flux.send(101).unwrap();
  flux.send(102).unwrap();
  flux.complete()
});

while let Some(payload) = rx.next().await {
  println!("{}", payload?);
}
task.await?;

由于 Flux 包含了 Result 的概念,.send() 推送 Ok 值,而 .error() 可用于推送错误值。

let mut flux = FluxChannel::<_, Error>::new();

flux.send(100)?;
flux.send(101)?;
flux.send(102)?;
flux.error(anyhow::anyhow!("error"))?;
flux.complete();

while let Some(payload) = flux.next().await {
  println!("{:?}", payload);
}

更多信息

有关 wasmRS 的更多信息,请参阅核心 wasmrs crate。

WasmRS 大量使用由 apex 规范和生成器生成的代码来自动化所有样板代码。请参阅入门指南了解用法。

贡献

请参阅CONTRIBUTING.md

许可证

请参阅根目录下的LICENSE.txt

依赖项

~2–9.5MB
~81K SLoC