#同步原语 #唤醒器 #异步任务 #异步 #原子 #无分配 #无标准库

无标准库 diatomic-waker

异步、无锁的任务唤醒同步原语

3 个不稳定版本

新功能 0.2.1 2024 年 8 月 21 日
0.2.0 2024 年 7 月 28 日
0.1.0 2022 年 10 月 12 日

#104并发

Download history 4340/week @ 2024-05-03 3885/week @ 2024-05-10 3983/week @ 2024-05-17 2827/week @ 2024-05-24 4695/week @ 2024-05-31 4151/week @ 2024-06-07 4264/week @ 2024-06-14 2572/week @ 2024-06-21 3041/week @ 2024-06-28 5306/week @ 2024-07-05 6055/week @ 2024-07-12 5881/week @ 2024-07-19 4757/week @ 2024-07-26 5076/week @ 2024-08-02 8886/week @ 2024-08-09 8118/week @ 2024-08-16

27,814 每月下载量
用于 152 个 Crates (5 直接)

MIT/Apache

54KB
616 代码行

diatomic-waker

异步、快速的任务唤醒同步原语。

Cargo Documentation License

概述

diatomic-wakeratomic-waker 类似,因为它允许对包装的 Waker 进行并发更新和通知。然而,与后者不同,它不使用自旋锁[^spinlocks],并且速度显著更快,尤其是在消费者需要定期而不是一次性通知时。它可以特别用作非常快速的单消费者 事件计数,将非阻塞数据结构转换为异步数据结构(参见 MPSC 通道接收器示例)。

这个库是 Asynchronix 的一个分支,Asynchronix 是一个针对系统模拟高性能异步计算框架的持续努力。

[^spinlocks]: AtomicWaker 的实现会在竞争时让步于运行时,这在实际上是一种执行器介导的自旋锁。

使用方法

将以下内容添加到您的 Cargo.toml

[dependencies]
diatomic-waker = "0.2.1"

功能标志

默认情况下,此包启用了 alloc 功能,以提供所有权的 WakeSinkWakeSource。可以通过指定 default-features = false 使其与 no-std 兼容。

示例

一个容量为 1 的多生产者、单消费者通道,用于发送 NonZeroUsize 值,具有异步接收器

use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use diatomic_waker::{WakeSink, WakeSource};

// The sending side of the channel.
#[derive(Clone)]
struct Sender {
    wake_src: WakeSource,
    value: Arc<AtomicUsize>,
}

// The receiving side of the channel.
struct Receiver {
    wake_sink: WakeSink,
    value: Arc<AtomicUsize>,
}

// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
    let value = Arc::new(AtomicUsize::new(0));
    let wake_sink = WakeSink::new();
    let wake_src = wake_sink.source();
    (
        Sender {
            wake_src,
            value: value.clone(),
        },
        Receiver { wake_sink, value },
    )
}

impl Sender {
    // Sends a value if the channel is empty.
    fn try_send(&self, value: NonZeroUsize) -> bool {
        let success = self
            .value
            .compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
            .is_ok();
        if success {
            self.wake_src.notify()
        };
        success
    }
}

impl Receiver {
    // Receives a value asynchronously.
    async fn recv(&mut self) -> NonZeroUsize {
        // Wait until the predicate returns `Some(value)`, i.e. when the atomic
        // value becomes non-zero.
        self.wake_sink
            .wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
            .await
    }
}

安全性

这是一个低级原语,其实现依赖于 unsafe。测试套件广泛使用 Loom 来评估其正确性。然而,尽管 Loom 非常强大,但它只是一个工具:它不能形式化证明数据竞争的不存在。

实现细节

diatomic-waker的一个显著特点是它使用两个唤醒存储槽(因此得名),而不是一个。这使得在唤醒注册和通知并发执行的情况下实现无锁成为可能。在并发通知的情况下,尽管一个通知者确实持有通知锁,但其他通知者永远不会阻塞:它们只是请求锁的持有者发送另一个通知,这是一个无等待操作。

atomic-waker相比,无唤醒者注册的虚拟通知(没有注册唤醒者)的成本要低得多。在注册/未注册的唤醒者始终相同的情况下,成功的通知(注册+通知本身)的总成本也低得多,因为最后一个唤醒者总是缓存的,以避免不必要的克隆。从数量上来说,以原子读-改-写(RMW)操作的成本为

  • 无唤醒者注册的虚拟通知:1 RMW 对 atomic-waker 的 2 RMW
  • 注册与最后一个注册的唤醒者相同的唤醒者 + 通知:1+3=4 RMW 对 atomic-waker 的 3+4=7 RMW(假设 Waker::wake_by_ref 需要 1 RMW,Waker::wake 需要 2 RMW,Waker::clone 需要 1 RMW)。
  • 注册新的唤醒者 + 通知:3+3=6 RMW 对 atomic-waker 的 3+4=7 RMW(与上面相同的假设 + Waker::drop 需要 1 RMW);这通常只在第一次注册时才是必要的。
  • 由于没有自旋锁,RMW操作很少,且在争用时的成本可预测。

许可证

本软件根据您的选择,许可为Apache License,版本 2.0MIT许可证

贡献

除非您明确声明,否则根据Apache-2.0许可证定义的,您有意提交以包含在您的工作中的任何贡献,将根据上述方式双许可,不附加任何额外条款或条件。

依赖项

~0–24MB
~332K SLoC