#mpmc-queue #queue #backpressure #async #tokio #tokio-util

faucet

在完成信号后可以排空的背压式 MPMC 队列

2 个版本

0.1.1 2023 年 12 月 29 日
0.1.0 2023 年 12 月 29 日

#935 in 并发

MIT/Apache

11KB
86

faucet

deadqueue::limited::Queue + tokio_util::sync::CancellationToken = faucet::Faucet

Faucet 是一个背压式 MPMC 队列,在完成信号后可以排空。

一旦完成信号,就不能再向队列中添加更多项目,只能排空队列中剩余的项目。这个特性对于确保在关闭之前处理所有已入队的项目非常有用。

您可以自由地 clone() 一个 Facuet,以便轻松地在异步任务之间共享,您不需要将 Faucet 包裹在额外的 Arc 中,因为 Faucet 内部使用了一个 Arc<deadqueue::limited::Queue<T>>

示例

您可以通过运行以下命令来克隆此存储库并运行此示例:cargo run --example sigint

use std::error::Error;
use std::time::Duration;
use tokio::time::sleep;
use tokio::{spawn, try_join};
use tokio_util::sync::CancellationToken;
use faucet::Faucet;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let app_cancellation = CancellationToken::new();
    ctrlc::set_handler({
        let cancellation = app_cancellation.clone();
        move || cancellation.cancel()
    })?;

    let faucet = Faucet::new_with_cancellation(5, app_cancellation.clone());

    let producer = spawn({
        let faucet = faucet.clone();
        async move {
            for i in 1.. {
                if faucet.push(i).await.is_break() { break; }
                sleep(Duration::from_millis(100)).await;
            }
        }
    });

    let consumer = spawn({
        let faucet = faucet.clone();
        async move {
            while let Some(i) = faucet.next().await {
                sleep(Duration::from_millis(500)).await;
                let status = if faucet.is_cancelled() { "drain" } else { "got" };
                println!("{status} #{i} ({} items waiting)", faucet.len());
            }
        }
    });

    try_join!(producer, consumer)?;
    println!("done");
    Ok(())
}

示例运行

got #1 (4 items waiting)
got #2 (5 items waiting)
^Cdrain #3 (5 items waiting)
drain #4 (4 items waiting)
drain #5 (3 items waiting)
drain #6 (2 items waiting)
drain #7 (1 items waiting)
drain #8 (0 items waiting)
done

许可证

以下许可证之一

任选其一。

贡献

除非您明确声明,否则您提交的任何有意包含在作品中的贡献,根据 Apache-2.0 许可证的定义,应按上述方式双授权,不附加任何额外条款或条件。

依赖项

~3–4.5MB
~75K SLoC