#async-channel #single-consumer #future #single-producer #communication #receiver #send

handoff

一个无缓冲、异步、单生产者/单消费者通道

3个稳定版本

1.0.2 2023年2月5日

#748异步

每月 22 次下载

MPL-2.0 许可证

31KB
402

handoff

handoff 是一个单生产者/单消费者、无缓冲、异步通道。它适用于需要在两个异步组件之间进行阻塞通信的场景,其中所有发送都会阻塞,直到接收器接收项目。

使用 channel 创建一个新通道,它返回一个 SenderReceiver。可以使用 Sender::send 将项目发送到通道,并使用 Receiver::recv 接收。 Receiver 还实现了 futures::Stream。通道的任一端都可以被丢弃,这将导致另一端解除阻塞并报告通道断开。

虽然通道以异步方式运行,但也可以通过使用大多数异步运行时提供的 block_on 或类似实用工具以完全同步方式使用。

示例

基本示例

use handoff::channel;
use futures::future::join;

let (mut sender, mut receiver) = channel();

let send_task = async move {
    for i in 0..100 {
        sender.send(i).await.expect("channel disconnected");
    }
};

let recv_task = async move {
    for i in 0..100 {
        let value = receiver.recv().await.expect("channel disconnected");
        assert_eq!(value, i);
    }
};

// All sends block until the receiver accepts the value, so we need to make
// sure the tasks happen concurrently
join(send_task, recv_task).await;

同步使用

use std::thread;
use handoff::channel;
use futures::executor::block_on;

let (mut sender, mut receiver) = channel();

let sender_thread = thread::spawn(move || {
    for i in 0..100 {
        block_on(sender.send(i)).expect("receiver disconnected");
    }
});

let receiver_thread = thread::spawn(move || {
    for i in 0..100 {
        let value = block_on(receiver.recv()).expect("sender disconnected");
        assert_eq!(value, i);
    }
});

sender_thread.join().expect("sender panicked");
receiver_thread.join().expect("receiver panicked");

断开连接

use handoff::channel;
use futures::future::join;

let (mut sender, mut receiver) = channel();

let send_task = async move {
    for i in 0..50 {
        sender.send(i).await.expect("channel disconnected");
    }
};

let recv_task = async move {
    for i in 0..50 {
        let value = receiver.recv().await.expect("channel disconnected");
        assert_eq!(value, i);
    }

    assert!(receiver.recv().await.is_none());
};

// All sends block until the receiver accepts the value, so we need to make
// sure the tasks happen concurrently
join(send_task, recv_task).await;

依赖项

~0.9–1.5MB
~31K SLoC