9 个版本
0.2.3 | 2024 年 5 月 25 日 |
---|---|
0.2.2 | 2024 年 5 月 8 日 |
0.2.1 | 2024 年 1 月 3 日 |
0.1.0 | 2024 年 1 月 2 日 |
0.0.1 | 2023 年 8 月 1 日 |
#131 在 异步
每月 307 次下载
用于 20 个 crate(直接使用 3 个)
110KB
2K SLoC
🗂️ 可中断的
在接收到中断时停止未来生产者或流产生值。
对于返回 Result<T, ()>
或 ControlFlow<T, ()>
的未来,调用 fut.interruptible_*(tx)
会在收到中断信号时返回 Err(())
或 Break(T)
。
这意味着未来将完成,但返回值会指示生产者停止产生未来。
对于流,当接收到中断信号时,当前的未来会运行到完成,但不会轮询下一个项目。
用法
将以下内容添加到 Cargo.toml
interruptible = "0.2.3"
# Enables `InterruptibleStreamExt`
interruptible = { version = "0.2.3", features = ["stream"] }
# Enables:
#
# * `InterruptibleFutureExt::{interruptible_control_ctrl_c, interruptible_result_ctrl_c}`
# * `InterruptibleStreamExt::interruptible_ctrl_c` if the `"stream"` feature is also enabled.
interruptible = { version = "0.2.3", features = ["ctrl_c"] }
示例
未来<输出 = ControlFlow<B, C>>
use std::ops::ControlFlow;
use futures::FutureExt;
use tokio::{
join,
sync::{mpsc, oneshot},
};
use interruptible::{InterruptSignal, InterruptibleFutureExt};
#[tokio::main(flavor = "current_thread")]
async fn main() {
let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
let (ready_tx, ready_rx) = oneshot::channel::<()>();
let interruptible_control = async {
let () = ready_rx.await.expect("Expected to be notified to start.");
ControlFlow::Continue(())
}
.boxed()
.interruptible_control(&mut interrupt_rx);
let interrupter = async move {
interrupt_tx
.send(InterruptSignal)
.await
.expect("Expected to send `InterruptSignal`.");
ready_tx
.send(())
.expect("Expected to notify sleep to start.");
};
let (control_flow, ()) = join!(interruptible_control, interrupter);
assert_eq!(ControlFlow::Break(InterruptSignal), control_flow);
}
InterruptibleStreamExt
与 features = ["stream"]
在接收到中断信号时停止流产生值。
有关流中断的不同处理方式的示例,请参阅 interrupt_strategy
模块。
#[cfg(not(feature = "stream"))]
fn main() {}
#[cfg(feature = "stream")]
#[tokio::main(flavor = "current_thread")]
async fn main() {
use futures::{stream, StreamExt};
use tokio::sync::mpsc;
use interruptible::{
InterruptibleStreamExt, InterruptSignal, Interruptibility, PollOutcome,
};
let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
let mut interruptible_stream =
stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
.interruptible(interrupt_rx.into());
interrupt_tx
.send(InterruptSignal)
.await
.expect("Expected to send `InterruptSignal`.");
assert_eq!(
Some(PollOutcome::Interrupted(None)),
interruptible_stream.next().await
);
assert_eq!(None, interruptible_stream.next().await);
}
许可证
根据您的选择,许可以下任一项
- Apache 许可证 2.0 版本,(LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT 许可证 (LICENSE-MIT 或 https://opensource.org/licenses/MIT)
。
贡献
除非您明确声明,否则您根据Apache-2.0许可证定义提交的任何有意贡献,包括在本作品中的内容,应如上所述双授权,不附加任何额外条款或条件。
依赖项
~3–11MB
~102K SLoC