6个版本

0.2.4 2022年11月7日
0.2.3 2022年7月5日
0.2.2 2022年5月1日
0.2.1 2022年3月11日
0.1.0 2020年1月21日

#310异步

Download history 6557/week @ 2024-03-14 5312/week @ 2024-03-21 4983/week @ 2024-03-28 5820/week @ 2024-04-04 7493/week @ 2024-04-11 7190/week @ 2024-04-18 6412/week @ 2024-04-25 7932/week @ 2024-05-02 7691/week @ 2024-05-09 8981/week @ 2024-05-16 8372/week @ 2024-05-23 7085/week @ 2024-05-30 8100/week @ 2024-06-06 8676/week @ 2024-06-13 10932/week @ 2024-06-20 9479/week @ 2024-06-27

每月 38,599 次下载
20 个crate中使用 (直接使用 13)

MIT/Apache

28KB
358 代码行

Deadqueue 最新版本 构建状态

Deadqueue是一个具有背压支持的简单异步队列。

此crate提供三种实现

  • 无限制的 (deadqueue::unlimited::Queue)

    • 基于 crossbeam_queue::SegQueue
    • 具有无限容量,在推送时没有背压
    • 通过您的 unlimited 功能在 Cargo.toml 中启用
  • 可调整大小的 (deadqueue::resizable::Queue)

    • 基于 deadqueue::unlimited::Queue
    • 具有有限容量,在推送时具有背压
    • 支持调整大小
    • 通过您的 resizable 功能在 Cargo.toml 中启用
  • 有限 (deadqueue::limited::Queue)

    • 基于 crossbeam_queue::ArrayQueue
    • 具有有限容量,在推送时具有背压
    • 不支持调整大小
    • 通过您的 limited 功能在 Cargo.toml 中启用

功能

功能 描述 额外依赖 默认
unlimited 启用无限制队列实现 yes
resizable 启用可调整大小的队列实现 deadqueue/unlimited yes
limited 启用有限队列实现 yes

示例

use std::sync::Arc;
use tokio::time::{sleep, Duration};

const TASK_COUNT: usize = 1000;
const WORKER_COUNT: usize = 10;

type TaskQueue = deadqueue::limited::Queue<usize>;

#[tokio::main]
async fn main() {
    let queue = Arc::new(TaskQueue::new(TASK_COUNT));
    for i in 0..TASK_COUNT {
        queue.try_push(i).unwrap();
    }
    for worker in 0..WORKER_COUNT {
        let queue = queue.clone();
        tokio::spawn(async move {
            loop {
                let task = queue.pop().await;
                println!("worker[{}] processing task[{}] ...", worker, task);
            }
        });
    }
    while queue.len() > 0 {
        println!("Waiting for workers to finish...");
        sleep(Duration::from_millis(100)).await;
    }
    println!("All tasks done. :-)");
}

为何还需要另一个队列

死队列绝不是唯一可用的队列实现。它以略微不同的方式做事,并提供了其他实现所缺乏的功能。

  • 可调整大小的队列。 通常您必须在 有限无限 队列之间进行选择。这个包提供了一个 可调整大小 的队列,可以根据需要调整大小。这可能是这个包的一个非常大的 独特卖点

  • 内省支持。 方法 .len().capacity().available() 提供了对队列当前状态的访问。

  • 公平调度。 调用 pop 的任务将以先到先得的方式接收项目。这主要归功于使用了本质上公平的 tokio::sync::Semaphore

  • 一个结构体,而不是两个。 tokioasync_stdfutures-intrusive 的通道将队列分为两个结构体(SenderReceiver),这使得使用略微复杂。

  • 自带 Arc 由于 SenderReceiver 之间没有分离,因此也不需要内部 Arc。(所有将通道分成 SenderReceiver 的实现都需要某种形式的 Arc。)

  • 完全并发访问。 无需将 Receiver 部分包装在 Mutex 中。所有方法都支持并发访问,无需额外的同步原语。

  • 支持 try__ 方法。 可以使用 try_pushtry_pop 方法从非阻塞同步代码访问队列。

替代方案

限制 文档
tokio 没有可调整大小的队列。没有内省支持。需要 Receiver 的同步。 tokio::sync::mpsc::channeltokio::sync::mpsc::unbounded_channel
async-std 没有可调整大小或无限队列。没有内省支持。没有 try_sendtry_recv 方法。 async_std::sync::channel
futures 没有可调整大小的队列。没有内省支持。 futures::channel::mpsc::channelfutures::channel::mpsc::unbounded

许可

根据以下任一许可授权:

由您选择。

依赖项

~2.4–4MB
~67K SLoC