6个版本
0.2.4 | 2022年11月7日 |
---|---|
0.2.3 | 2022年7月5日 |
0.2.2 | 2022年5月1日 |
0.2.1 | 2022年3月11日 |
0.1.0 | 2020年1月21日 |
#310 在 异步
每月 38,599 次下载
在 20 个crate中使用 (直接使用 13)
28KB
358 代码行
Deadqueue

Deadqueue是一个具有背压支持的简单异步队列。
此crate提供三种实现
-
无限制的 (
deadqueue::unlimited::Queue
)- 基于
crossbeam_queue::SegQueue
- 具有无限容量,在推送时没有背压
- 通过您的
unlimited
功能在Cargo.toml
中启用
- 基于
-
可调整大小的 (
deadqueue::resizable::Queue
)- 基于
deadqueue::unlimited::Queue
- 具有有限容量,在推送时具有背压
- 支持调整大小
- 通过您的
resizable
功能在Cargo.toml
中启用
- 基于
-
有限 (
deadqueue::limited::Queue
)- 基于
crossbeam_queue::ArrayQueue
- 具有有限容量,在推送时具有背压
- 不支持调整大小
- 通过您的
limited
功能在Cargo.toml
中启用
- 基于
功能
功能 | 描述 | 额外依赖 | 默认 |
---|---|---|---|
unlimited |
启用无限制队列实现 | – | yes |
resizable |
启用可调整大小的队列实现 | deadqueue/unlimited |
yes |
limited |
启用有限队列实现 | – | yes |
示例
use std::sync::Arc;
use tokio::time::{sleep, Duration};
const TASK_COUNT: usize = 1000;
const WORKER_COUNT: usize = 10;
type TaskQueue = deadqueue::limited::Queue<usize>;
#[tokio::main]
async fn main() {
let queue = Arc::new(TaskQueue::new(TASK_COUNT));
for i in 0..TASK_COUNT {
queue.try_push(i).unwrap();
}
for worker in 0..WORKER_COUNT {
let queue = queue.clone();
tokio::spawn(async move {
loop {
let task = queue.pop().await;
println!("worker[{}] processing task[{}] ...", worker, task);
}
});
}
while queue.len() > 0 {
println!("Waiting for workers to finish...");
sleep(Duration::from_millis(100)).await;
}
println!("All tasks done. :-)");
}
为何还需要另一个队列
死队列绝不是唯一可用的队列实现。它以略微不同的方式做事,并提供了其他实现所缺乏的功能。
-
可调整大小的队列。 通常您必须在
有限
和无限
队列之间进行选择。这个包提供了一个可调整大小
的队列,可以根据需要调整大小。这可能是这个包的一个非常大的 独特卖点。 -
内省支持。 方法
.len()
、.capacity()
和.available()
提供了对队列当前状态的访问。 -
公平调度。 调用
pop
的任务将以先到先得的方式接收项目。这主要归功于使用了本质上公平的tokio::sync::Semaphore
。 -
一个结构体,而不是两个。
tokio
、async_std
和futures-intrusive
的通道将队列分为两个结构体(Sender
和Receiver
),这使得使用略微复杂。 -
自带
Arc
。 由于Sender
和Receiver
之间没有分离,因此也不需要内部Arc
。(所有将通道分成Sender
和Receiver
的实现都需要某种形式的Arc
。) -
完全并发访问。 无需将
Receiver
部分包装在Mutex
中。所有方法都支持并发访问,无需额外的同步原语。 -
支持
try__
方法。 可以使用try_push
和try_pop
方法从非阻塞同步代码访问队列。
替代方案
包 | 限制 | 文档 |
---|---|---|
tokio |
没有可调整大小的队列。没有内省支持。需要 Receiver 的同步。 |
tokio::sync::mpsc::channel 、tokio::sync::mpsc::unbounded_channel |
async-std |
没有可调整大小或无限队列。没有内省支持。没有 try_send 或 try_recv 方法。 |
async_std::sync::channel |
futures |
没有可调整大小的队列。没有内省支持。 | futures::channel::mpsc::channel 、futures::channel::mpsc::unbounded |
许可
根据以下任一许可授权:
- Apache许可证版本2.0(LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT许可证(LICENSE-MIT 或 http://opensource.org/licenses/MIT)
由您选择。
依赖项
~2.4–4MB
~67K SLoC