22 个版本
0.8.2 | 2023 年 12 月 17 日 |
---|---|
0.8.1 | 2021 年 6 月 20 日 |
0.8.0 | 2021 年 1 月 6 日 |
0.7.0 | 2020 年 10 月 20 日 |
0.4.2 | 2018 年 6 月 22 日 |
#38 在 异步 中
每月 124,649 次下载
在 81 个 Crates 中使用 (直接使用 20 个)
29KB
352 代码行
该 crate 提供了多种机制来中断一个 Stream
。
流组合器
扩展 trait StreamExt
提供了一个新的 Stream
组合器:take_until_if
。 StreamExt::take_until_if
会继续从底层 Stream
产生元素,直到一个 Future
解决,在那个时刻立即产生 None
并停止产生更多的元素。
为了方便,该 crate 还包括 Tripwire
类型,它产生一个可复制的 Future
,然后可以将其传递给 take_until_if
。当创建一个新的 Tripwire
时,还会返回一个相关的 Trigger
,当它被丢弃时,它会中断 Stream
。
use stream_cancel::{StreamExt, Tripwire};
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
#[tokio::main]
async fn main() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let (trigger, tripwire) = Tripwire::new();
tokio::spawn(async move {
let mut incoming = TcpListenerStream::new(listener).take_until_if(tripwire);
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
// tell the listener to stop accepting new connections
drop(trigger);
// the spawned async block will terminate cleanly, allowing main to return
}
流包装器
任何流都可以被包装在一个Valved
中,这样就可以通过相关的Trigger
远程终止。这对于实现类似TcpListener
的“无限”流的优雅关闭非常有用。一旦在给定流的Valved
句柄上调用Trigger::cancel
,流将返回None
以表示它已终止。
use stream_cancel::Valved;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
use std::thread;
#[tokio::main]
async fn main() {
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
tokio::spawn(async move {
let (exit, mut incoming) = Valved::new(TcpListenerStream::new(listener));
exit_tx.send(exit).unwrap();
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
let exit = exit_rx.await;
// the server thread will normally never exit, since more connections
// can always arrive. however, with a Valved, we can turn off the
// stream of incoming connections to initiate a graceful shutdown
drop(exit);
}
您可以通过首先创建一个Valve
,然后使用Valve::Wrap
包装多个流,在多个流之间共享同一个Trigger
。
use stream_cancel::Valve;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
#[tokio::main]
async fn main() {
let (exit, valve) = Valve::new();
let listener1 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener2 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
tokio::spawn(async move {
let incoming1 = valve.wrap(TcpListenerStream::new(listener1));
let incoming2 = valve.wrap(TcpListenerStream::new(listener2));
use futures_util::stream::select;
let mut incoming = select(incoming1, incoming2);
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
});
}
});
// the runtime will not become idle until both incoming1 and incoming2 have stopped
// (due to the select). this checks that they are indeed both interrupted when the
// valve is closed.
drop(exit);
}
依赖项
~2.3–4MB
~63K SLoC