10 个版本

0.1.9 2024年6月3日
0.1.8 2024年4月27日
0.1.6 2024年3月9日
0.1.5 2024年1月7日
0.1.3 2023年12月9日

#165 in 异步

MIT/Apache

210KB
3K SLoC

rxr

Rust 编程语言的反应式扩展实现,灵感来源于 JavaScript 中流行的 RxJS 库。目前,rxr 实现的操作符子集比 RxJS 更小。

设计

rxr 支持 Observables 和 Subjects。您可以根据需要定义自己的 Observables。您的 Observables 可以是同步的或异步的。对于异步 Observables,您可以使用操作系统线程或 Tokio 任务。
有关如何定义您的 Observables 的示例,请参阅文档。要查看当前实现了哪些操作符,请查看ObservableExt 特性。

请注意,您不必在项目中使用 Tokio 就可以使用 rxr 库。

示例

具有取消订阅逻辑的异步 Observables。

use std::{
    sync::{Arc, Mutex},
    time::Duration,
};

use rxr::{
    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
    Observable, ObservableExt, Observer, Subscribeable,
};

const UNSUBSCRIBE_SIGNAL: bool = true;

fn main() {
    // Create a custom observable that emits values in a separate thread.
    let observable = Observable::new(|mut o| {
        let done = Arc::new(Mutex::new(false));
        let done_c = Arc::clone(&done);
        let (tx, rx) = std::sync::mpsc::channel();

        // Spawn a new thread to await a signal sent from the unsubscribe logic.
        std::thread::spawn(move || {
            // Attempt to receive a signal sent from the unsubscribe logic.
            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
                // Update the `done_c` mutex with the received signal.
                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
            }
        });

        // Launch a new thread for the Observable's processing and store its handle.
        let join_handle = std::thread::spawn(move || {
            for i in 0..=10000 {
                // If an unsubscribe signal is received, exit the loop and stop emissions.
                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
                    break;
                }
                // Emit the value to the subscriber.
                o.next(i);
                // Important. Put an await point after each emit or after some emits.
                // This allows the `take()` operator to function properly.
                std::thread::sleep(Duration::from_millis(1));
            }
            // Signal completion to the subscriber.
            o.complete();
        });

        // Return a new `Subscription` with custom unsubscribe logic.
        Subscription::new(
            // The provided closure defines the behavior of the subscription when it
            // is unsubscribed. In this case, it sends a signal to an asynchronous
            // observable to stop emitting values.
            UnsubscribeLogic::Logic(Box::new(move || {
                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
                    println!("Receiver dropped.");
                }
            })),
            // Store the `JoinHandle` for awaiting completion using the `Subscription`.
            SubscriptionHandle::JoinThread(join_handle),
        )
    });

    // Create the `Subscriber` with a mandatory `next` function, and optional
    // `complete` function. No need for `error` function in this simple example.
    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
    observer.on_complete(|| println!("Completed"));

    // This observable uses OS threads so it will not block the current thread.
    // Observables are cold so if you comment out the statement bellow nothing
    // will be emitted.
    let subscription = observable
        // take utilizes our unsubscribe function to stop background emissions after
        // a specified item count.
        .take(500)
        .map(|v| format!("Mapped {}", v))
        .subscribe(observer);

    // Do something else here.
    println!("Do something while Observable is emitting.");

    // Unsubscribe from the observable to stop emissions.
    subscription.unsubscribe();

    // Allow some time for the main thread to confirm that the observable indeed
    // isn't emitting.
    std::thread::sleep(Duration::from_millis(2000));
    println!("`main` function done")
}

使用 Subject 作为观察者。这可以适用于任何 Subject 变体。

use std::{fmt::Display, time::Duration};

use rxr::{
    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
    Observable, ObservableExt, Observer, Subject, Subscribeable,
};

pub fn create_subscriber<T: Display>(subscriber_id: u32) -> Subscriber<T> {
    Subscriber::new(
        move |v: T| println!("Subscriber {}: {}", subscriber_id, v),
        move |e| eprintln!("Error {}: {}", subscriber_id, e),
        move || println!("Completed Subscriber {}", subscriber_id),
    )
}

pub fn main() {
    // Make an Observable.
    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
        for i in 0..10 + 1 {
            o.next(i);
            std::thread::sleep(Duration::from_millis(1));
        }
        o.complete();
        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
    });

    // Initialize a `Subject` and obtain its emitter and receiver.
    let (emitter, mut receiver) = Subject::emitter_receiver();

    // Register `Subscriber` 1.
    receiver.subscribe(create_subscriber(1));

    // Register `Subscriber` 2.
    receiver
        // We're cloning the receiver so we can use it again.
        // Shallow clone: clones only the pointer to the `Subject`.
        .clone()
        .take(7) // For performance, prioritize placing `take()` as the first operator.
        .delay(1000)
        .map(|v| format!("mapped {}", v))
        .subscribe(create_subscriber(2));

    // Register `Subscriber` 3.
    receiver
        .filter(|v| v % 2 == 0)
        .map(|v| format!("filtered {}", v))
        .subscribe(create_subscriber(3));

    // Convert the emitter into an observer and subscribe it to the observable.
    observable.subscribe(emitter.into());
}

更多示例可以在examples 目录和文档中找到。

安装

在您的 Cargo.toml 中添加一行

[dependencies]
rxr = "0.1.9"

许可证

根据您的选择,在Apache 许可证版本 2.0MIT 许可证下许可。
除非您明确表示,否则根据 Apache-2.0 许可证定义的,您有意提交给 rxr 的任何贡献,都应按上述方式双许可,无需任何额外条款或条件。

依赖关系

~2.2–4MB
~63K SLoC