1 个不稳定版本
0.1.0 | 2020年4月11日 |
---|
#13 in #bounded-channel
17KB
200 行
async-local-bounded-channel
同一生产者、同一消费者限界通道,用于单个异步任务。请参阅文档以了解说明和示例。
lib.rs
:
同一生产者、同一消费者通道,绑定到单个异步任务。
实现细节
内部,此功能使用 generic-array
包,该包利用 typenum
中的类型在编译时指定容量,从而允许在行内分配队列空间。因此,此通道还需要在编译时预先指定容量。
示例
与 futures::future::select
结合使用,可以实现类似协程的功能,其中两个异步生成器协同生成和消费值。
futures::executor::block_on(async move {
// create a new channel with a capacity of 8 items
let mut channel = channel::<_, U8>();
let (mut tx, mut rx) = channel.split();
let producer = async move {
for i in 0..100 {
tx.send(i).await.expect("consumer still alive");
}
};
let consumer = async move {
let mut expected = 0;
loop {
if let Ok(v) = rx.receive().await {
assert_eq!(v, expected);
expected += 1;
} else {
break;
}
}
};
pin_mut!(producer, consumer);
let remaining = select(producer, consumer).await.factor_first().1;
match remaining {
Either::Left(f) => f.await,
Either::Right(f) => f.await,
}
});
例如,在实现服务器时,这很有用。一个任务可以处理每个客户端,其中生产者等待传入请求并写入响应;消费者等待请求,处理它们,然后生成响应。
使用说明
通过 split()
获取传输端点后,通道不能移动。这是出于安全考虑,因为每个端点都包含对通道的引用;因此,如果通道移动,这些引用将变成悬空引用。
let mut channel = channel::<isize, U8>();
let (tx, rx) = channel.split();
std::thread::spawn(move || {
// nope!
let channel = channel;
let tx = tx;
let rx = rx;
});
此外,端点必须锚定到单个线程,因为对底层数据结构的访问不是线程安全的。不幸的是,编译器并没有强制执行这一点,并且作用域线程库可以允许不安全的使用。例如
// shouldn't compile, but unfortunately does.
let mut channel = channel::<isize, U8>();
crossbeam::thread::scope(|s| {
let (tx, rx) = channel.split();
// don't do this!
s.spawn(move |_| {
let tx = tx;
});
s.spawn(move |_| {
let rx = rx;
});
});
如果没有打开端点,通道可以安全地移动和发送。通道甚至可以在端点释放后再次使用。
type C = async_local_bounded_channel::Channel<isize, U8>;
async fn test_channel(mut channel: C) -> C {
// run the producer-consumer example above.
# {
# let (mut tx, mut rx) = channel.split();
# let producer = async move {
# for i in 0..100 {
# tx.send(i).await.expect("consumer still alive");
# }
# };
# let consumer = async move {
# let mut expected = 0;
# loop {
# if let Ok(v) = rx.receive().await {
# assert_eq!(v, expected);
# expected += 1;
# } else {
# break;
# }
# }
# };
# pin_mut!(producer, consumer);
# let remaining = select(producer, consumer).await.factor_first().1;
# match remaining {
# Either::Left(f) => f.await,
# Either::Right(f) => f.await,
# }
# }
channel
}
let channel = channel();
let t = std::thread::spawn(move || {
let channel = block_on(async move {
test_channel(channel).await
});
block_on(async move {
test_channel(channel).await
});
});
t.join().expect("test to pass");
依赖关系
~1MB
~22K SLoC