3 个不稳定版本
新功能 0.2.1 | 2024 年 8 月 21 日 |
---|---|
0.2.0 | 2024 年 7 月 28 日 |
0.1.0 | 2022 年 10 月 12 日 |
#104 在 并发
27,814 每月下载量
用于 152 个 Crates (5 直接)
54KB
616 代码行
diatomic-waker
异步、快速的任务唤醒同步原语。
概述
diatomic-waker
与 atomic-waker
类似,因为它允许对包装的 Waker
进行并发更新和通知。然而,与后者不同,它不使用自旋锁[^spinlocks],并且速度显著更快,尤其是在消费者需要定期而不是一次性通知时。它可以特别用作非常快速的单消费者 事件计数,将非阻塞数据结构转换为异步数据结构(参见 MPSC 通道接收器示例)。
这个库是 Asynchronix 的一个分支,Asynchronix 是一个针对系统模拟高性能异步计算框架的持续努力。
[^spinlocks]: AtomicWaker 的实现会在竞争时让步于运行时,这在实际上是一种执行器介导的自旋锁。
使用方法
将以下内容添加到您的 Cargo.toml
[dependencies]
diatomic-waker = "0.2.1"
功能标志
默认情况下,此包启用了 alloc
功能,以提供所有权的 WakeSink
和 WakeSource
。可以通过指定 default-features = false
使其与 no-std
兼容。
示例
一个容量为 1 的多生产者、单消费者通道,用于发送 NonZeroUsize
值,具有异步接收器
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use diatomic_waker::{WakeSink, WakeSource};
// The sending side of the channel.
#[derive(Clone)]
struct Sender {
wake_src: WakeSource,
value: Arc<AtomicUsize>,
}
// The receiving side of the channel.
struct Receiver {
wake_sink: WakeSink,
value: Arc<AtomicUsize>,
}
// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
let value = Arc::new(AtomicUsize::new(0));
let wake_sink = WakeSink::new();
let wake_src = wake_sink.source();
(
Sender {
wake_src,
value: value.clone(),
},
Receiver { wake_sink, value },
)
}
impl Sender {
// Sends a value if the channel is empty.
fn try_send(&self, value: NonZeroUsize) -> bool {
let success = self
.value
.compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
.is_ok();
if success {
self.wake_src.notify()
};
success
}
}
impl Receiver {
// Receives a value asynchronously.
async fn recv(&mut self) -> NonZeroUsize {
// Wait until the predicate returns `Some(value)`, i.e. when the atomic
// value becomes non-zero.
self.wake_sink
.wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
.await
}
}
安全性
这是一个低级原语,其实现依赖于 unsafe
。测试套件广泛使用 Loom 来评估其正确性。然而,尽管 Loom 非常强大,但它只是一个工具:它不能形式化证明数据竞争的不存在。
实现细节
diatomic-waker
的一个显著特点是它使用两个唤醒存储槽(因此得名),而不是一个。这使得在唤醒注册和通知并发执行的情况下实现无锁成为可能。在并发通知的情况下,尽管一个通知者确实持有通知锁,但其他通知者永远不会阻塞:它们只是请求锁的持有者发送另一个通知,这是一个无等待操作。
与atomic-waker
相比,无唤醒者注册的虚拟通知(没有注册唤醒者)的成本要低得多。在注册/未注册的唤醒者始终相同的情况下,成功的通知(注册+通知本身)的总成本也低得多,因为最后一个唤醒者总是缓存的,以避免不必要的克隆。从数量上来说,以原子读-改-写(RMW)操作的成本为
- 无唤醒者注册的虚拟通知:1 RMW 对
atomic-waker
的 2 RMW - 注册与最后一个注册的唤醒者相同的唤醒者 + 通知:1+3=4 RMW 对
atomic-waker
的 3+4=7 RMW(假设Waker::wake_by_ref
需要 1 RMW,Waker::wake
需要 2 RMW,Waker::clone
需要 1 RMW)。 - 注册新的唤醒者 + 通知:3+3=6 RMW 对
atomic-waker
的 3+4=7 RMW(与上面相同的假设 +Waker::drop
需要 1 RMW);这通常只在第一次注册时才是必要的。 - 由于没有自旋锁,RMW操作很少,且在争用时的成本可预测。
许可证
本软件根据您的选择,许可为Apache License,版本 2.0或MIT许可证。
贡献
除非您明确声明,否则根据Apache-2.0许可证定义的,您有意提交以包含在您的工作中的任何贡献,将根据上述方式双许可,不附加任何额外条款或条件。
依赖项
~0–24MB
~332K SLoC