#bounded-channel #channel #async-channel #bounded #async-task #async #single-threaded

async-local-bounded-channel

同一生产者、同一消费者限界通道,用于单个异步任务

1 个不稳定版本

0.1.0 2020年4月11日

#13 in #bounded-channel

MIT 许可证

17KB
200

async-local-bounded-channel

Build Status Crates.io Docs.rs

同一生产者、同一消费者限界通道,用于单个异步任务。请参阅文档以了解说明和示例。


lib.rs:

同一生产者、同一消费者通道,绑定到单个异步任务。

实现细节

内部,此功能使用 generic-array 包,该包利用 typenum 中的类型在编译时指定容量,从而允许在行内分配队列空间。因此,此通道还需要在编译时预先指定容量。

示例

futures::future::select 结合使用,可以实现类似协程的功能,其中两个异步生成器协同生成和消费值。

futures::executor::block_on(async move {
    // create a new channel with a capacity of 8 items
    let mut channel = channel::<_, U8>();
    let (mut tx, mut rx) = channel.split();
    let producer = async move {
        for i in 0..100 {
            tx.send(i).await.expect("consumer still alive");
        }
    };
    let consumer = async move {
        let mut expected = 0;
        loop {
            if let Ok(v) = rx.receive().await {
                assert_eq!(v, expected);
                expected += 1;
            } else {
                break;
            }
        }
    };
    pin_mut!(producer, consumer);
    let remaining = select(producer, consumer).await.factor_first().1;
    match remaining {
        Either::Left(f) => f.await,
        Either::Right(f) => f.await,
    }
});

例如,在实现服务器时,这很有用。一个任务可以处理每个客户端,其中生产者等待传入请求并写入响应;消费者等待请求,处理它们,然后生成响应。

使用说明

通过 split() 获取传输端点后,通道不能移动。这是出于安全考虑,因为每个端点都包含对通道的引用;因此,如果通道移动,这些引用将变成悬空引用。

let mut channel = channel::<isize, U8>();
let (tx, rx) = channel.split();
std::thread::spawn(move || {
    // nope!
    let channel = channel;
    let tx = tx;
    let rx = rx;
});

此外,端点必须锚定到单个线程,因为对底层数据结构的访问不是线程安全的。不幸的是,编译器并没有强制执行这一点,并且作用域线程库可以允许不安全的使用。例如

// shouldn't compile, but unfortunately does.
let mut channel = channel::<isize, U8>();
crossbeam::thread::scope(|s| {
    let (tx, rx) = channel.split();
    // don't do this!
    s.spawn(move |_| {
        let tx = tx;
    });
    s.spawn(move |_| {
        let rx = rx;
    });
});

如果没有打开端点,通道可以安全地移动和发送。通道甚至可以在端点释放后再次使用。

type C = async_local_bounded_channel::Channel<isize, U8>;

async fn test_channel(mut channel: C) -> C {
    // run the producer-consumer example above.
    # {
    #     let (mut tx, mut rx) = channel.split();
    #     let producer = async move {
    #         for i in 0..100 {
    #             tx.send(i).await.expect("consumer still alive");
    #         }
    #     };
    #     let consumer = async move {
    #         let mut expected = 0;
    #         loop {
    #             if let Ok(v) = rx.receive().await {
    #                 assert_eq!(v, expected);
    #                 expected += 1;
    #             } else {
    #                 break;
    #             }
    #         }
    #     };
    #     pin_mut!(producer, consumer);
    #     let remaining = select(producer, consumer).await.factor_first().1;
    #     match remaining {
    #         Either::Left(f) => f.await,
    #         Either::Right(f) => f.await,
    #     }
    # }
    channel
}

let channel = channel();
let t = std::thread::spawn(move || {
    let channel = block_on(async move {
       test_channel(channel).await
    });
    block_on(async move {
        test_channel(channel).await
    });
});
t.join().expect("test to pass");

依赖关系

~1MB
~22K SLoC