6个版本 (稳定)
1.1.2 | 2023年12月29日 |
---|---|
1.1.1 | 2023年1月5日 |
1.0.0 | 2022年8月23日 |
0.1.0 | 2022年3月9日 |
0.0.0-reserve.0 | 2022年3月8日 |
#87 在 并发 中
每月下载量 9,882
用于 23 个crates (2 个直接使用)
37KB
647 行
Watchable
watchable
实现了一个可观察的RwLock-like类型,该类型兼容多线程和异步代码。灵感来自 tokio::sync::watch。
watchable
是一个RwLock-like类型,允许使用多生产者、多消费者方法来观察值的变化,其中每个消费者都只能保证接收到最新的写入值。
use watchable::{Watchable, Watcher};
fn main() {
// Create a Watchable<u32> which holds a u32 and notifies watchers when the
// contained value changes.
let watchable = Watchable::default();
// Create a watcher that will efficiently be able to monitor and read the
// contained value as it is updated.
let watcher = watchable.watch();
// Spawn a background worker that will print out the values the watcher reads.
let watching_thread = std::thread::spawn(|| watching_thread(watcher));
// Store a sequence of values. Each time a new value is written, any waiting
// watchers will be notified there is a new value available.
for i in 1_u32..=1000 {
watchable.replace(i);
}
// Once we're done sending values, dropping the Watchable will ensure
// watchers are notified of the disconnection. Watchers are guaranteed to be
// able to read the final value.
drop(watchable);
// Wait for the thread to exit.
watching_thread.join().unwrap();
}
fn watching_thread(watcher: Watcher<u32>) {
// A Watcher can be used as an iterator which always reads the most
// recent value, or parks the current thread until a new value is available.
for value in watcher {
// The value we read will not necessarily be sequential, even though the
// main thread is storing a complete sequence.
println!("Read value: {value}");
}
}
当运行此示例时,输出将类似于
...
Read value: 876
Read value: 897
Read value: 923
Read value: 944
Read value: 957
Read value: 977
Read value: 995
Read value: 1000
如你所见,接收线程不会接收到每个值。每个观察者都保证在发生更改时收到通知,并且保证能够检索到最新值。
异步支持
可以使用多种方式在异步代码中使用 Watcher
Watcher::into_stream()
:将观察者包装在一个实现了futures::Stream
的类型中。Watcher::wait_async().await
:暂停当前任务的执行,直到有新的值可供读取。在wait_async
返回后,可以使用Watcher::read()
来检索当前值。
以下是与上面相同的示例,但这次使用 Watcher::into_stream
与 Tokio 一起
use futures_util::StreamExt;
use watchable::{Watchable, Watcher};
#[tokio::main]
async fn main() {
// Create a Watchable<u32> which holds a u32 and notifies watchers when the
// contained value changes.
let watchable = Watchable::default();
// Create a watcher that will efficiently be able to monitor and read the
// contained value as it is updated.
let watcher = watchable.watch();
// Spawn a background worker that will print out the values the watcher reads.
let watching_task = tokio::task::spawn(watching_task(watcher));
// Store a sequence of values. Each time a new value is written, any waiting
// watchers will be notified there is a new value available.
for i in 1_u32..=1000 {
watchable.replace(i);
}
// Once we're done sending values, dropping the Watchable will ensure
// watchers are notified of the disconnection. Watchers are guaranteed to be
// able to read the final value.
drop(watchable);
// Wait for the spawned task to exit.
watching_task.await.unwrap();
}
async fn watching_task(watcher: Watcher<u32>) {
// A Watcher can be converted into a Stream, which allows for asynchronous
// iteration.
let mut stream = watcher.into_stream();
while let Some(value) = stream.next().await {
// The value we received will not necessarily be sequential, even though
// the main thread is publishing a complete sequence.
println!("Read value: {value}");
}
}
watchable
与所有异步运行时兼容。
开源许可证
该项目,如同 Khonsu Labs 的所有项目一样,是开源的。此存储库在 MIT 许可证 或 Apache License 2.0 下可用。
有关更多贡献信息,请参阅 CONTRIBUTING.md。
依赖项
~1.5–7MB
~47K SLoC