22 个版本

0.8.2 2023 年 12 月 17 日
0.8.1 2021 年 6 月 20 日
0.8.0 2021 年 1 月 6 日
0.7.0 2020 年 10 月 20 日
0.4.2 2018 年 6 月 22 日

#38异步

Download history 47014/week @ 2024-04-27 41613/week @ 2024-05-04 44424/week @ 2024-05-11 39220/week @ 2024-05-18 41259/week @ 2024-05-25 44854/week @ 2024-06-01 39734/week @ 2024-06-08 38455/week @ 2024-06-15 39583/week @ 2024-06-22 34052/week @ 2024-06-29 31057/week @ 2024-07-06 27988/week @ 2024-07-13 33415/week @ 2024-07-20 30606/week @ 2024-07-27 31468/week @ 2024-08-03 24018/week @ 2024-08-10

每月 124,649 次下载
81 Crates 中使用 (直接使用 20 个)

MIT/Apache

29KB
352 代码行

Crates.io Documentation Coverage Status

该 crate 提供了多种机制来中断一个 Stream

流组合器

扩展 trait StreamExt 提供了一个新的 Stream 组合器:take_until_ifStreamExt::take_until_if 会继续从底层 Stream 产生元素,直到一个 Future 解决,在那个时刻立即产生 None 并停止产生更多的元素。

为了方便,该 crate 还包括 Tripwire 类型,它产生一个可复制的 Future,然后可以将其传递给 take_until_if。当创建一个新的 Tripwire 时,还会返回一个相关的 Trigger,当它被丢弃时,它会中断 Stream

use stream_cancel::{StreamExt, Tripwire};
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;

#[tokio::main]
async fn main() {
    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
    let (trigger, tripwire) = Tripwire::new();

    tokio::spawn(async move {
        let mut incoming = TcpListenerStream::new(listener).take_until_if(tripwire);
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    // tell the listener to stop accepting new connections
    drop(trigger);
    // the spawned async block will terminate cleanly, allowing main to return
}

流包装器

任何流都可以被包装在一个Valved中,这样就可以通过相关的Trigger远程终止。这对于实现类似TcpListener的“无限”流的优雅关闭非常有用。一旦在给定流的Valved句柄上调用Trigger::cancel,流将返回None以表示它已终止。

use stream_cancel::Valved;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;
use std::thread;

#[tokio::main]
async fn main() {
    let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();
    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();

    tokio::spawn(async move {
        let (exit, mut incoming) = Valved::new(TcpListenerStream::new(listener));
        exit_tx.send(exit).unwrap();
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    let exit = exit_rx.await;

    // the server thread will normally never exit, since more connections
    // can always arrive. however, with a Valved, we can turn off the
    // stream of incoming connections to initiate a graceful shutdown
    drop(exit);
}

您可以通过首先创建一个Valve,然后使用Valve::Wrap包装多个流,在多个流之间共享同一个Trigger

use stream_cancel::Valve;
use futures::prelude::*;
use tokio_stream::wrappers::TcpListenerStream;

#[tokio::main]
async fn main() {
    let (exit, valve) = Valve::new();
    let listener1 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
    let listener2 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();

    tokio::spawn(async move {
        let incoming1 = valve.wrap(TcpListenerStream::new(listener1));
        let incoming2 = valve.wrap(TcpListenerStream::new(listener2));

        use futures_util::stream::select;
        let mut incoming = select(incoming1, incoming2);
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    // the runtime will not become idle until both incoming1 and incoming2 have stopped
    // (due to the select). this checks that they are indeed both interrupted when the
    // valve is closed.
    drop(exit);
}

依赖项

~2.3–4MB
~63K SLoC