#async #mpmc #thread

watchable

一个可观察的RwLock-like类型,兼容多线程和异步代码

6个版本 (稳定)

1.1.2 2023年12月29日
1.1.1 2023年1月5日
1.0.0 2022年8月23日
0.1.0 2022年3月9日
0.0.0-reserve.02022年3月8日

#87并发

Download history 1457/week @ 2024-04-20 1564/week @ 2024-04-27 1094/week @ 2024-05-04 1455/week @ 2024-05-11 2339/week @ 2024-05-18 1142/week @ 2024-05-25 2298/week @ 2024-06-01 1071/week @ 2024-06-08 1682/week @ 2024-06-15 1588/week @ 2024-06-22 1970/week @ 2024-06-29 2256/week @ 2024-07-06 1662/week @ 2024-07-13 2354/week @ 2024-07-20 2492/week @ 2024-07-27 3098/week @ 2024-08-03

每月下载量 9,882
用于 23 个crates (2 个直接使用)

MIT/Apache

37KB
647

Watchable

watchable 实现了一个可观察的RwLock-like类型,该类型兼容多线程和异步代码。灵感来自 tokio::sync::watch

watchable forbids unsafe code crate version Live Build Status HTML Coverage Report for main branch Documentation for main branch

watchable 是一个RwLock-like类型,允许使用多生产者、多消费者方法来观察值的变化,其中每个消费者都只能保证接收到最新的写入值。

use watchable::{Watchable, Watcher};

fn main() {
    // Create a Watchable<u32> which holds a u32 and notifies watchers when the
    // contained value changes.
    let watchable = Watchable::default();
    // Create a watcher that will efficiently be able to monitor and read the
    // contained value as it is updated.
    let watcher = watchable.watch();
    // Spawn a background worker that will print out the values the watcher reads.
    let watching_thread = std::thread::spawn(|| watching_thread(watcher));

    // Store a sequence of values. Each time a new value is written, any waiting
    // watchers will be notified there is a new value available.
    for i in 1_u32..=1000 {
        watchable.replace(i);
    }

    // Once we're done sending values, dropping the Watchable will ensure
    // watchers are notified of the disconnection. Watchers are guaranteed to be
    // able to read the final value.
    drop(watchable);

    // Wait for the thread to exit.
    watching_thread.join().unwrap();
}

fn watching_thread(watcher: Watcher<u32>) {
    // A Watcher can be used as an iterator which always reads the most
    // recent value, or parks the current thread until a new value is available.
    for value in watcher {
        // The value we read will not necessarily be sequential, even though the
        // main thread is storing a complete sequence.
        println!("Read value: {value}");
    }
}

当运行此示例时,输出将类似于

...
Read value: 876
Read value: 897
Read value: 923
Read value: 944
Read value: 957
Read value: 977
Read value: 995
Read value: 1000

如你所见,接收线程不会接收到每个值。每个观察者都保证在发生更改时收到通知,并且保证能够检索到最新值。

异步支持

可以使用多种方式在异步代码中使用 Watcher

  • Watcher::into_stream():将观察者包装在一个实现了 futures::Stream 的类型中。
  • Watcher::wait_async().await:暂停当前任务的执行,直到有新的值可供读取。在 wait_async 返回后,可以使用 Watcher::read() 来检索当前值。

以下是与上面相同的示例,但这次使用 Watcher::into_stream 与 Tokio 一起

use futures_util::StreamExt;
use watchable::{Watchable, Watcher};

#[tokio::main]
async fn main() {
    // Create a Watchable<u32> which holds a u32 and notifies watchers when the
    // contained value changes.
    let watchable = Watchable::default();
    // Create a watcher that will efficiently be able to monitor and read the
    // contained value as it is updated.
    let watcher = watchable.watch();
    // Spawn a background worker that will print out the values the watcher reads.
    let watching_task = tokio::task::spawn(watching_task(watcher));

    // Store a sequence of values. Each time a new value is written, any waiting
    // watchers will be notified there is a new value available.
    for i in 1_u32..=1000 {
        watchable.replace(i);
    }

    // Once we're done sending values, dropping the Watchable will ensure
    // watchers are notified of the disconnection. Watchers are guaranteed to be
    // able to read the final value.
    drop(watchable);

    // Wait for the spawned task to exit.
    watching_task.await.unwrap();
}

async fn watching_task(watcher: Watcher<u32>) {
    // A Watcher can be converted into a Stream, which allows for asynchronous
    // iteration.
    let mut stream = watcher.into_stream();
    while let Some(value) = stream.next().await {
        // The value we received will not necessarily be sequential, even though
        // the main thread is publishing a complete sequence.
        println!("Read value: {value}");
    }
}

watchable 与所有异步运行时兼容。

开源许可证

该项目,如同 Khonsu Labs 的所有项目一样,是开源的。此存储库在 MIT 许可证Apache License 2.0 下可用。

有关更多贡献信息,请参阅 CONTRIBUTING.md

依赖项

~1.5–7MB
~47K SLoC