10 个版本
0.1.9 | 2024年6月3日 |
---|---|
0.1.8 | 2024年4月27日 |
0.1.6 | 2024年3月9日 |
0.1.5 | 2024年1月7日 |
0.1.3 | 2023年12月9日 |
#165 in 异步
210KB
3K SLoC
rxr
Rust 编程语言的反应式扩展实现,灵感来源于 JavaScript 中流行的 RxJS 库。目前,rxr 实现的操作符子集比 RxJS 更小。
设计
rxr 支持 Observables 和 Subjects。您可以根据需要定义自己的 Observables。您的 Observables 可以是同步的或异步的。对于异步 Observables,您可以使用操作系统线程或 Tokio 任务。
有关如何定义您的 Observables 的示例,请参阅文档。要查看当前实现了哪些操作符,请查看ObservableExt 特性。
请注意,您不必在项目中使用 Tokio 就可以使用 rxr 库。
示例
具有取消订阅逻辑的异步 Observables。
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
Observable, ObservableExt, Observer, Subscribeable,
};
const UNSUBSCRIBE_SIGNAL: bool = true;
fn main() {
// Create a custom observable that emits values in a separate thread.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
// Spawn a new thread to await a signal sent from the unsubscribe logic.
std::thread::spawn(move || {
// Attempt to receive a signal sent from the unsubscribe logic.
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
// Update the `done_c` mutex with the received signal.
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
// Launch a new thread for the Observable's processing and store its handle.
let join_handle = std::thread::spawn(move || {
for i in 0..=10000 {
// If an unsubscribe signal is received, exit the loop and stop emissions.
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
std::thread::sleep(Duration::from_millis(1));
}
// Signal completion to the subscriber.
o.complete();
});
// Return a new `Subscription` with custom unsubscribe logic.
Subscription::new(
// The provided closure defines the behavior of the subscription when it
// is unsubscribed. In this case, it sends a signal to an asynchronous
// observable to stop emitting values.
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
// Store the `JoinHandle` for awaiting completion using the `Subscription`.
SubscriptionHandle::JoinThread(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
// take utilizes our unsubscribe function to stop background emissions after
// a specified item count.
.take(500)
.map(|v| format!("Mapped {}", v))
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Unsubscribe from the observable to stop emissions.
subscription.unsubscribe();
// Allow some time for the main thread to confirm that the observable indeed
// isn't emitting.
std::thread::sleep(Duration::from_millis(2000));
println!("`main` function done")
}
使用 Subject 作为观察者。这可以适用于任何 Subject 变体。
use std::{fmt::Display, time::Duration};
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
Observable, ObservableExt, Observer, Subject, Subscribeable,
};
pub fn create_subscriber<T: Display>(subscriber_id: u32) -> Subscriber<T> {
Subscriber::new(
move |v: T| println!("Subscriber {}: {}", subscriber_id, v),
move |e| eprintln!("Error {}: {}", subscriber_id, e),
move || println!("Completed Subscriber {}", subscriber_id),
)
}
pub fn main() {
// Make an Observable.
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Initialize a `Subject` and obtain its emitter and receiver.
let (emitter, mut receiver) = Subject::emitter_receiver();
// Register `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
// Register `Subscriber` 2.
receiver
// We're cloning the receiver so we can use it again.
// Shallow clone: clones only the pointer to the `Subject`.
.clone()
.take(7) // For performance, prioritize placing `take()` as the first operator.
.delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
// Register `Subscriber` 3.
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
// Convert the emitter into an observer and subscribe it to the observable.
observable.subscribe(emitter.into());
}
安装
在您的 Cargo.toml 中添加一行
[dependencies]
rxr = "0.1.9"
许可证
根据您的选择,在Apache 许可证版本 2.0 或 MIT 许可证下许可。除非您明确表示,否则根据 Apache-2.0 许可证定义的,您有意提交给 rxr 的任何贡献,都应按上述方式双许可,无需任何额外条款或条件。
依赖关系
~2.2–4MB
~63K SLoC