13 个不稳定版本 (5 个破坏性更新)

0.7.1 2024年6月2日
0.7.0 2024年2月8日
0.6.0 2023年10月16日
0.5.1 2023年2月18日
0.1.0 2020年5月29日

#390 in 异步

Download history 152190/week @ 2024-05-03 158967/week @ 2024-05-10 164343/week @ 2024-05-17 162905/week @ 2024-05-24 179485/week @ 2024-05-31 167315/week @ 2024-06-07 173453/week @ 2024-06-14 190058/week @ 2024-06-21 185385/week @ 2024-06-28 185610/week @ 2024-07-05 198348/week @ 2024-07-12 201927/week @ 2024-07-19 204693/week @ 2024-07-26 216356/week @ 2024-08-02 284805/week @ 2024-08-09 295268/week @ 2024-08-16

每月下载量 1,038,322
1,065 个 crate 中使用 (直接使用 41 个)

MIT/Apache

73KB
724

async-broadcast

Build License Cargo Documentation

一个异步多生产者多消费者广播通道,其中每个消费者都会接收到通道上发送的每条消息的副本。由于明显的原因,该通道只能用于广播实现了 Clone 的类型。

通道有 SenderReceiver 两端。两端都是可复制的,可以在多个线程之间共享。

当所有 Sender 或所有 Receiver 被丢弃时,通道变为关闭。当通道关闭时,无法发送更多消息,但可以继续接收剩余的消息。

还可以通过调用 Sender::close()Receiver::close() 手动关闭通道。

示例

use async_broadcast::{broadcast, TryRecvError};
use futures_lite::{future::block_on, stream::StreamExt};

block_on(async move {
    let (s1, mut r1) = broadcast(2);
    let s2 = s1.clone();
    let mut r2 = r1.clone();

    // Send 2 messages from two different senders.
    s1.broadcast(7).await.unwrap();
    s2.broadcast(8).await.unwrap();

    // Channel is now at capacity so sending more messages will result in an error.
    assert!(s2.try_broadcast(9).unwrap_err().is_full());
    assert!(s1.try_broadcast(10).unwrap_err().is_full());

    // We can use `recv` method of the `Stream` implementation to receive messages.
    assert_eq!(r1.next().await.unwrap(), 7);
    assert_eq!(r1.recv().await.unwrap(), 8);
    assert_eq!(r2.next().await.unwrap(), 7);
    assert_eq!(r2.recv().await.unwrap(), 8);

    // All receiver got all messages so channel is now empty.
    assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
    assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));

    // Drop both senders, which closes the channel.
    drop(s1);
    drop(s2);

    assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
    assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
})

async-channel 的区别

这个 crate 与 async-channel 类似,因为它们都提供 MPMC 通道,但主要区别在于在 async-channel 中,每个发送到通道的消息只由一个接收器接收。《async-broadcast》另一方面,通过为每个接收器克隆它来将每条消息传递给每个接收器(换句话说,广播)。

与其他广播 crate 的区别

  • broadcaster:主要区别在于 broadcaster 没有发送器和接收器分离,并且两端都使用同一 BroadcastChannel 实例的克隆。消息被发送到所有通道克隆。虽然这可以适用于许多情况,但缺乏发送器和接收器分离意味着你通常需要自己手动在发送端清空通道。

  • postage:这个包提供类似于 async_broadcast广播 API。然而,它在编写本文时重复了 futures API,这并不理想。

    • (截至本文编写时) 它与 futures API 重复,这并不理想。
    • 不支持溢出模式,也没有非活动接收者的概念,因此慢速或非活动接收者阻塞整个通道是一个无法解决的问题。
    • 提供各种通道,这通常很好,但如果您只需要广播通道,async_broadcast 可能是更好的选择。
  • tokio::sync:Tokio 的 sync 模块提供了一个 广播通道 API。这里的区别在于

    • 尽管这个实现确实提供了 溢出模式,但它默认是这种行为,而不是可选的。
    • 没有非活动接收者的等效功能。
    • 虽然可以仅使用 sync 模块构建 tokio,但它附带了一些您可能不需要的其他 API。

安全性

此包使用 #![deny(unsafe_code)] 来确保所有内容都以 100% 安全的 Rust 实现。

贡献

想加入我们吗?查看我们的 “贡献”指南 并查看一些这些问题

许可证

许可协议为 Apache License, Version 2.0MIT 许可证,您可任选其一。
除非您明确表示,否则根据 Apache-2.0 许可证定义,您提交给本包的任何有意贡献,都将按上述方式双重许可,而无需任何附加条款或条件。

依赖关系

~395KB