26 个版本
新版本 0.2.12 | 2024 年 8 月 23 日 |
---|---|
0.2.11 | 2024 年 4 月 27 日 |
0.2.10 | 2024 年 1 月 12 日 |
0.2.9 | 2023 年 10 月 21 日 |
0.1.0-alpha.1 | 2019 年 7 月 31 日 |
927 在 异步 中排名
每月 10,682 次下载
在 9 个 crate 中使用 (8 个直接使用)
38KB
356 行
futures-async-stream
为 Rust 和 futures crate 实现的异步流。
此 crate 通过使用 async_await
和不稳定 coroutines
为流提供有用的功能。
用法
将以下内容添加到您的 Cargo.toml
[dependencies]
futures-async-stream = "0.2"
futures = "0.3"
编译器支持:需要 rustc nightly-2024-04-25+
#[for_await]
使用 for 循环处理流。
这是对 futures-await 的 #[async]
for 循环的重新实现,用于 futures 0.3,并且是对 async/await 下一个步骤中列出想法的实验性实现。
#![feature(proc_macro_hygiene, stmt_expr_attributes)]
use futures::stream::Stream;
use futures_async_stream::for_await;
async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
let mut vec = vec![];
#[for_await]
for value in stream {
vec.push(value);
}
vec
}
value
具有传入流的 Item
类型。请注意,异步 for 循环只能在 async
函数、闭包、块、#[stream]
函数和 stream_block!
宏内部使用。
#[stream]
通过协程创建流。
这是对 futures-await 的 #[stream]
for 循环的重新实现,用于 futures 0.3,并且是对 async/await 下一个步骤中列出想法的实验性实现。
#![feature(coroutines)]
use futures::stream::Stream;
use futures_async_stream::stream;
// Returns a stream of i32
#[stream(item = i32)]
async fn foo(stream: impl Stream<Item = String>) {
// `for_await` is built into `stream`. If you use `for_await` only in `stream`, there is no need to import `for_await`.
#[for_await]
for x in stream {
yield x.parse().unwrap();
}
}
要从 #[stream]
函数或块中提前退出,请使用 return
。
在异步函数中使用 #[stream]
必须指定一个通过 item = some::Path
指定的项目类型,并且流输出的值必须通过 yield
表达式产生。
#[stream]
也可以用在异步块上。
#![feature(coroutines, proc_macro_hygiene, stmt_expr_attributes)]
use futures::stream::Stream;
use futures_async_stream::stream;
fn foo() -> impl Stream<Item = i32> {
#[stream]
async move {
for i in 0..10 {
yield i;
}
}
}
注意,在异步块上使用 #[stream]
不需要 item
参数,但它可能需要额外的类型注解。
在特质中使用异步流函数
您可以通过传递 boxed
或 boxed_local
作为参数,在特质中使用异步流函数。
#![feature(coroutines)]
use futures_async_stream::stream;
trait Foo {
#[stream(boxed, item = u32)]
async fn method(&mut self);
}
struct Bar(u32);
impl Foo for Bar {
#[stream(boxed, item = u32)]
async fn method(&mut self) {
while self.0 < u32::MAX {
self.0 += 1;
yield self.0;
}
}
}
接收 boxed
参数的异步流函数被转换为返回 Pin<Box<dyn Stream<Item = item> + 'lifetime>>
的函数。如果您传递了 boxed_local
而不是 boxed
,异步流函数返回一个非线程安全的流 (Pin<Box<dyn Stream<Item = item> + 'lifetime>>
)。
#![feature(coroutines)]
use std::pin::Pin;
use futures::stream::Stream;
use futures_async_stream::stream;
// The trait itself can be defined without unstable features.
trait Foo {
fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
}
struct Bar(u32);
impl Foo for Bar {
#[stream(boxed, item = u32)]
async fn method(&mut self) {
while self.0 < u32::MAX {
self.0 += 1;
yield self.0;
}
}
}
#[try_stream]
?
操作符可以与 #[try_stream]
一起使用。返回流的 Item
是 Result
,其中 Ok
是产生的值,而 Err
是由 ?
操作符返回的错误类型或 return Err(...)
返回的错误类型。
#![feature(coroutines)]
use futures::stream::Stream;
use futures_async_stream::try_stream;
#[try_stream(ok = i32, error = Box<dyn std::error::Error>)]
async fn foo(stream: impl Stream<Item = String>) {
#[for_await]
for x in stream {
yield x.parse()?;
}
}
#[try_stream]
可以在可以使用 #[stream]
的任何地方使用。
要从 #[try_stream]
函数或块中提前退出,请使用 return Ok(())
。
如何在不使用此 API 的情况下编写等效代码?
#[for_await]
您可以通过组合while let
循环,.await
,pin!
宏和StreamExt::next()
方法来编写此内容
use std::pin::pin;
use futures::stream::{Stream, StreamExt};
async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
let mut vec = vec![];
let mut stream = pin!(stream);
while let Some(value) = stream.next().await {
vec.push(value);
}
vec
}
#[stream]
您也可以通过手动实现组合器来编写此内容
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use futures::stream::Stream;
use pin_project::pin_project;
fn foo<S>(stream: S) -> impl Stream<Item = i32>
where
S: Stream<Item = String>,
{
Foo { stream }
}
#[pin_project]
struct Foo<S> {
#[pin]
stream: S,
}
impl<S> Stream for Foo<S>
where
S: Stream<Item = String>,
{
type Item = i32;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
Poll::Ready(Some(x.parse().unwrap()))
} else {
Poll::Ready(None)
}
}
}
许可证
您可以根据自己的选择,在以下任一许可下使用:[Apache License, Version 2.0](https://github.com/taiki-e/futures-async-stream/blob/2fd8c4e89ec9444fec61843612f7d1c79a718e19/LICENSE-APACHE) 或 [MIT 许可](https://github.com/taiki-e/futures-async-stream/blob/2fd8c4e89ec9444fec61843612f7d1c79a718e19/LICENSE-MIT)。
除非您明确说明,否则根据 Apache-2.0 许可证定义,您有意提交的任何贡献都将根据上述条款进行双重许可,不附加任何额外条款或条件。
依赖项
~0.3–0.8MB
~18K SLoC