0.1.0 |
|
---|
#6 in #背压
11KB
86 行
faucet-drain
deadqueue::limited::Queue
+tokio_util::sync::CancellationToken
=faucet_drain::Faucet
Faucet
是一个背压式 MPMC 队列,在完成信号后可以进行排空。
一旦发出完成信号,队列中就不能再添加更多项,只能排空队列中剩余的项。这个属性有助于确保在关闭之前处理已入队的所有项。
您可以自由地 clone()
一个 Facuet
以便轻松地在您的生产者和消费者之间共享。您不需要将 Faucet
包装在额外的 Arc
中,因为 Faucet
内部使用了一个 Arc<deadqueue::limited::Queue<T>>
示例
您可以通过运行以下命令来克隆此仓库并运行此示例:cargo run --example sigint
。
use std::error::Error;
use std::time::Duration;
use tokio::time::sleep;
use tokio::{spawn, try_join};
use tokio_util::sync::CancellationToken;
use faucet_drain::Faucet;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let app_cancellation = CancellationToken::new();
ctrlc::set_handler({
let cancellation = app_cancellation.clone();
move || cancellation.cancel()
})?;
let faucet = Faucet::new_with_cancellation(5, app_cancellation.clone());
let producer = spawn({
let faucet = faucet.clone();
async move {
for i in 1.. {
if faucet.push(i).await.is_break() { break; }
sleep(Duration::from_millis(100)).await;
}
}
});
let consumer = spawn({
let faucet = faucet.clone();
async move {
while let Some(i) = faucet.next().await {
sleep(Duration::from_millis(500)).await;
let status = if faucet.is_cancelled() { "drain" } else { "got" };
println!("{status} #{i} ({} items waiting)", faucet.len());
}
}
});
try_join!(producer, consumer)?;
println!("done");
Ok(())
}
示例运行
got #1 (4 items waiting)
got #2 (5 items waiting)
^Cdrain #3 (5 items waiting)
drain #4 (4 items waiting)
drain #5 (3 items waiting)
drain #6 (2 items waiting)
drain #7 (1 items waiting)
drain #8 (0 items waiting)
done
许可证
在以下许可证中选择一项
- Apache License 2.0 (LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT 许可证 (LICENSE-MIT 或 https://open-source.org.cn/licenses/MIT)
由您选择。
贡献
除非您明确声明,否则根据 Apache-2.0 许可证定义的,您有意提交以包含在作品中的任何贡献,都应按照上述方式双授权,不附加任何额外的条款或条件。
依赖关系
~2.9–4.5MB
~74K SLoC