#async-stream #stream #future #async #macro

nightly no-std futures-async-stream

Rust 和 futures crate 的异步流

26 个版本

新版本 0.2.12 2024 年 8 月 23 日
0.2.11 2024 年 4 月 27 日
0.2.10 2024 年 1 月 12 日
0.2.9 2023 年 10 月 21 日
0.1.0-alpha.12019 年 7 月 31 日

927异步 中排名

Download history 4868/week @ 2024-05-03 6445/week @ 2024-05-10 5510/week @ 2024-05-17 3607/week @ 2024-05-24 3536/week @ 2024-05-31 2973/week @ 2024-06-07 2265/week @ 2024-06-14 3166/week @ 2024-06-21 3595/week @ 2024-06-28 3200/week @ 2024-07-05 3134/week @ 2024-07-12 2661/week @ 2024-07-19 2560/week @ 2024-07-26 2694/week @ 2024-08-02 2345/week @ 2024-08-09 2708/week @ 2024-08-16

每月 10,682 次下载
9 个 crate 中使用 (8 个直接使用)

Apache-2.0 或 MIT

38KB
356

futures-async-stream

crates.io docs.rs license github actions

为 Rust 和 futures crate 实现的异步流。

此 crate 通过使用 async_await 和不稳定 coroutines 为流提供有用的功能。

用法

将以下内容添加到您的 Cargo.toml

[dependencies]
futures-async-stream = "0.2"
futures = "0.3"

编译器支持:需要 rustc nightly-2024-04-25+

#[for_await]

使用 for 循环处理流。

这是对 futures-await#[async] for 循环的重新实现,用于 futures 0.3,并且是对 async/await 下一个步骤中列出想法的实验性实现

#![feature(proc_macro_hygiene, stmt_expr_attributes)]

use futures::stream::Stream;
use futures_async_stream::for_await;

async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
    let mut vec = vec![];
    #[for_await]
    for value in stream {
        vec.push(value);
    }
    vec
}

value 具有传入流的 Item 类型。请注意,异步 for 循环只能在 async 函数、闭包、块、#[stream] 函数和 stream_block! 宏内部使用。

#[stream]

通过协程创建流。

这是对 futures-await#[stream] for 循环的重新实现,用于 futures 0.3,并且是对 async/await 下一个步骤中列出想法的实验性实现

#![feature(coroutines)]

use futures::stream::Stream;
use futures_async_stream::stream;

// Returns a stream of i32
#[stream(item = i32)]
async fn foo(stream: impl Stream<Item = String>) {
    // `for_await` is built into `stream`. If you use `for_await` only in `stream`, there is no need to import `for_await`.
    #[for_await]
    for x in stream {
        yield x.parse().unwrap();
    }
}

要从 #[stream] 函数或块中提前退出,请使用 return

在异步函数中使用 #[stream] 必须指定一个通过 item = some::Path 指定的项目类型,并且流输出的值必须通过 yield 表达式产生。

#[stream] 也可以用在异步块上。

#![feature(coroutines, proc_macro_hygiene, stmt_expr_attributes)]

use futures::stream::Stream;
use futures_async_stream::stream;

fn foo() -> impl Stream<Item = i32> {
    #[stream]
    async move {
        for i in 0..10 {
            yield i;
        }
    }
}

注意,在异步块上使用 #[stream] 不需要 item 参数,但它可能需要额外的类型注解。

在特质中使用异步流函数

您可以通过传递 boxedboxed_local 作为参数,在特质中使用异步流函数。

#![feature(coroutines)]

use futures_async_stream::stream;

trait Foo {
    #[stream(boxed, item = u32)]
    async fn method(&mut self);
}

struct Bar(u32);

impl Foo for Bar {
    #[stream(boxed, item = u32)]
    async fn method(&mut self) {
        while self.0 < u32::MAX {
            self.0 += 1;
            yield self.0;
        }
    }
}

接收 boxed 参数的异步流函数被转换为返回 Pin<Box<dyn Stream<Item = item> + 'lifetime>> 的函数。如果您传递了 boxed_local 而不是 boxed,异步流函数返回一个非线程安全的流 (Pin<Box<dyn Stream<Item = item> + 'lifetime>>)。

#![feature(coroutines)]

use std::pin::Pin;

use futures::stream::Stream;
use futures_async_stream::stream;

// The trait itself can be defined without unstable features.
trait Foo {
    fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
}

struct Bar(u32);

impl Foo for Bar {
    #[stream(boxed, item = u32)]
    async fn method(&mut self) {
        while self.0 < u32::MAX {
            self.0 += 1;
            yield self.0;
        }
    }
}

#[try_stream]

? 操作符可以与 #[try_stream] 一起使用。返回流的 ItemResult,其中 Ok 是产生的值,而 Err 是由 ? 操作符返回的错误类型或 return Err(...) 返回的错误类型。

#![feature(coroutines)]

use futures::stream::Stream;
use futures_async_stream::try_stream;

#[try_stream(ok = i32, error = Box<dyn std::error::Error>)]
async fn foo(stream: impl Stream<Item = String>) {
    #[for_await]
    for x in stream {
        yield x.parse()?;
    }
}

#[try_stream] 可以在可以使用 #[stream] 的任何地方使用。

要从 #[try_stream] 函数或块中提前退出,请使用 return Ok(())

如何在不使用此 API 的情况下编写等效代码?

#[for_await]

您可以通过组合while let循环,.awaitpin!宏和StreamExt::next()方法来编写此内容

use std::pin::pin;

use futures::stream::{Stream, StreamExt};

async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
    let mut vec = vec![];
    let mut stream = pin!(stream);
    while let Some(value) = stream.next().await {
        vec.push(value);
    }
    vec
}

#[stream]

您也可以通过手动实现组合器来编写此内容

use std::{
    pin::Pin,
    task::{ready, Context, Poll},
};

use futures::stream::Stream;
use pin_project::pin_project;

fn foo<S>(stream: S) -> impl Stream<Item = i32>
where
    S: Stream<Item = String>,
{
    Foo { stream }
}

#[pin_project]
struct Foo<S> {
    #[pin]
    stream: S,
}

impl<S> Stream for Foo<S>
where
    S: Stream<Item = String>,
{
    type Item = i32;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
            Poll::Ready(Some(x.parse().unwrap()))
        } else {
            Poll::Ready(None)
        }
    }
}

许可证

您可以根据自己的选择,在以下任一许可下使用:[Apache License, Version 2.0](https://github.com/taiki-e/futures-async-stream/blob/2fd8c4e89ec9444fec61843612f7d1c79a718e19/LICENSE-APACHE) 或 [MIT 许可](https://github.com/taiki-e/futures-async-stream/blob/2fd8c4e89ec9444fec61843612f7d1c79a718e19/LICENSE-MIT)。

除非您明确说明,否则根据 Apache-2.0 许可证定义,您有意提交的任何贡献都将根据上述条款进行双重许可,不附加任何额外条款或条件。

依赖项

~0.3–0.8MB
~18K SLoC