12次发布
0.3.5 | 2023年4月5日 |
---|---|
0.3.4 | 2023年2月18日 |
0.3.3 | 2022年3月13日 |
0.3.2 | 2021年5月23日 |
0.0.0 | 2019年6月7日 |
#14 in Rust模式
3,344,468 每月下载量
在 3,063 个crate中(543个直接使用)
19KB
217 代码行
Rust的异步流
元素异步流。
提供两个宏,stream!
和 try_stream!
,允许调用者定义异步元素流。这些使用 async
& await
语法实现。这个crate不需要不稳定功能。
stream!
宏返回一个实现了 Stream
特质的匿名类型。关联类型 Item
是从流中产生的值的类型。try_stream!
也返回一个实现了 Stream
特质的匿名类型,但关联类型 Item
是 Result<T, Error>
。try_stream!
宏支持在实现中使用 ?
语法。
使用方法
一个基本的产生数字的流。使用 yield
关键字产生值。流块必须返回 ()
。
use async_stream::stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
#[tokio::main]
async fn main() {
let s = stream! {
for i in 0..3 {
yield i;
}
};
pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
}
可以通过使用 impl Stream<Item = T>
返回流
use async_stream::stream;
use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
fn zero_to_three() -> impl Stream<Item = u32> {
stream! {
for i in 0..3 {
yield i;
}
}
}
#[tokio::main]
async fn main() {
let s = zero_to_three();
pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
}
流可以用其他流实现 - async-stream
提供了 for await
语法以协助实现
use async_stream::stream;
use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
fn zero_to_three() -> impl Stream<Item = u32> {
stream! {
for i in 0..3 {
yield i;
}
}
}
fn double<S: Stream<Item = u32>>(input: S)
-> impl Stream<Item = u32>
{
stream! {
for await value in input {
yield value * 2;
}
}
}
#[tokio::main]
async fn main() {
let s = double(zero_to_three());
pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
}
可以使用 Rust 的 try 语法(?
)与 try_stream!
宏一起使用。返回流中的 Item
是 Result
类型,其中 Ok
是产生的值,而 Err
是由 ?
返回的错误类型。
use tokio::net::{TcpListener, TcpStream};
use async_stream::try_stream;
use futures_core::stream::Stream;
use std::io;
use std::net::SocketAddr;
fn bind_and_accept(addr: SocketAddr)
-> impl Stream<Item = io::Result<TcpStream>>
{
try_stream! {
let mut listener = TcpListener::bind(addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
println!("received on {:?}", addr);
yield stream;
}
}
}
实现
宏 stream!
和 try_stream!
使用 proc 宏实现。宏在语法树中搜索 yield $expr
的实例,并将它们转换为 sender.send($expr).await
。
流使用轻量级的发送者将值从流实现发送到调用者。进入流时,在栈上存储一个 Option<T>
。将单元格的指针存储在线程局部变量中,并在异步块上调用 poll
。当 poll
返回时,sender.send(value)
存储单元格的值并返回给调用者。
支持的 Rust 版本
当前最低支持的 Rust 版本是 1.56。
许可证
本项目受 MIT 许可证的许可,详情请见 MIT 许可证。
贡献
除非您明确声明,否则您提交给 async-stream
的任何有意贡献都应按照 MIT 许可,不附加任何额外的条款或条件。
依赖项
~0.3–0.8MB
~19K SLoC