#stream #interrupt #producer #future #signal #ctrl-c #values

可中断的

在接收到中断信号时停止未来生产者或流产生值

9 个版本

0.2.3 2024 年 5 月 25 日
0.2.2 2024 年 5 月 8 日
0.2.1 2024 年 1 月 3 日
0.1.0 2024 年 1 月 2 日
0.0.1 2023 年 8 月 1 日

#131异步

Download history 5/week @ 2024-04-20 14/week @ 2024-04-27 161/week @ 2024-05-04 28/week @ 2024-05-11 65/week @ 2024-05-18 274/week @ 2024-05-25 121/week @ 2024-06-01 50/week @ 2024-06-08 51/week @ 2024-06-15 67/week @ 2024-06-22 24/week @ 2024-06-29 14/week @ 2024-07-06 79/week @ 2024-07-13 44/week @ 2024-07-20 130/week @ 2024-07-27 52/week @ 2024-08-03

每月 307 次下载
用于 20 个 crate(直接使用 3 个)

MIT/Apache

110KB
2K SLoC

🗂️ 可中断的

Crates.io docs.rs CI Coverage Status

在接收到中断时停止未来生产者或流产生值。

对于返回 Result<T, ()>ControlFlow<T, ()> 的未来,调用 fut.interruptible_*(tx) 会在收到中断信号时返回 Err(())Break(T)

这意味着未来将完成,但返回值会指示生产者停止产生未来。

对于流,当接收到中断信号时,当前的未来会运行到完成,但不会轮询下一个项目。

用法

将以下内容添加到 Cargo.toml

interruptible = "0.2.3"

# Enables `InterruptibleStreamExt`
interruptible = { version = "0.2.3", features = ["stream"] }

# Enables:
#
# * `InterruptibleFutureExt::{interruptible_control_ctrl_c, interruptible_result_ctrl_c}`
# * `InterruptibleStreamExt::interruptible_ctrl_c` if the `"stream"` feature is also enabled.
interruptible = { version = "0.2.3", features = ["ctrl_c"] }

示例

未来<输出 = ControlFlow<B, C>>

use std::ops::ControlFlow;

use futures::FutureExt;
use tokio::{
    join,
    sync::{mpsc, oneshot},
};

use interruptible::{InterruptSignal, InterruptibleFutureExt};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
    let (ready_tx, ready_rx) = oneshot::channel::<()>();

    let interruptible_control = async {
        let () = ready_rx.await.expect("Expected to be notified to start.");
        ControlFlow::Continue(())
    }
    .boxed()
    .interruptible_control(&mut interrupt_rx);

    let interrupter = async move {
        interrupt_tx
            .send(InterruptSignal)
            .await
            .expect("Expected to send `InterruptSignal`.");
        ready_tx
            .send(())
            .expect("Expected to notify sleep to start.");
    };

    let (control_flow, ()) = join!(interruptible_control, interrupter);

    assert_eq!(ControlFlow::Break(InterruptSignal), control_flow);
}

InterruptibleStreamExtfeatures = ["stream"]

在接收到中断信号时停止流产生值。

有关流中断的不同处理方式的示例,请参阅 interrupt_strategy 模块。

#[cfg(not(feature = "stream"))]
fn main() {}

#[cfg(feature = "stream")]
#[tokio::main(flavor = "current_thread")]
async fn main() {

use futures::{stream, StreamExt};
use tokio::sync::mpsc;

use interruptible::{
    InterruptibleStreamExt, InterruptSignal, Interruptibility, PollOutcome,
};

    let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);

    let mut interruptible_stream =
        stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
            .interruptible(interrupt_rx.into());

    interrupt_tx
        .send(InterruptSignal)
        .await
        .expect("Expected to send `InterruptSignal`.");

    assert_eq!(
        Some(PollOutcome::Interrupted(None)),
        interruptible_stream.next().await
    );
    assert_eq!(None, interruptible_stream.next().await);
}

许可证

根据您的选择,许可以下任一项

贡献

除非您明确声明,否则您根据Apache-2.0许可证定义提交的任何有意贡献,包括在本作品中的内容,应如上所述双授权,不附加任何额外条款或条件。

依赖项

~3–11MB
~102K SLoC