#async-stream #stream #async

async-fn-stream

无宏的轻量级async-stream实现

4个版本

0.2.2 2024年4月23日
0.2.1 2024年4月18日
0.2.0 2022年8月3日
0.1.0 2022年7月7日

#151Rust模式

Download history 2240/week @ 2024-05-03 3085/week @ 2024-05-10 2782/week @ 2024-05-17 2906/week @ 2024-05-24 3837/week @ 2024-05-31 3120/week @ 2024-06-07 2604/week @ 2024-06-14 1878/week @ 2024-06-21 1476/week @ 2024-06-28 2754/week @ 2024-07-05 1804/week @ 2024-07-12 2557/week @ 2024-07-19 3999/week @ 2024-07-26 3501/week @ 2024-08-02 3322/week @ 2024-08-09 2836/week @ 2024-08-16

14,294 每月下载量
10 个crate中使用 (直接使用 8)

MIT 许可证

18KB
305

这是一个没有宏的async-stream版本。此crate提供了Stream trait的泛型实现。Streamstd::iter::Iterator的异步版本。

提供了两个函数 - fn_streamtry_fn_stream

使用方法

基本使用

如果需要创建可能产生错误的流,请使用 try_fn_stream,否则使用 fn_stream

创建流

  1. 调用 fn_streamtry_fn_stream,并传递一个闭包。
  2. 闭包将接受一个 emitter。要从中返回值,请在 emitter 上调用 .emit(value) 并在它的结果上调用 .await

返回错误

try_fn_stream 提供了一些返回错误的便捷功能。

  1. 错误可以通过 return Err(...) 或问号 (?) 操作符从闭包中返回。这将结束流。
  2. emitter 还有一个 emit_err() 方法来返回错误而不结束流。

示例

有限数字流

use async_fn_stream::fn_stream;
use futures_util::Stream;

fn build_stream() -> impl Stream<Item = i32> {
    fn_stream(|emitter| async move {
        for i in 0..3 {
            // yield elements from stream via `emitter`
            emitter.emit(i).await;
        }
    })
}

从文本文件读取数字,带有错误处理

use anyhow::Context;
use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

fn read_numbers(file_name: String) -> impl Stream<Item = Result<i32, anyhow::Error>> {
    try_fn_stream(|emitter| async move {
        // Return errors via `?` operator.
        let file = BufReader::new(File::open(file_name).await.context("Failed to open file")?);
        pin_mut!(file);
        let mut line = String::new();
        loop {
            line.clear();
            let byte_count = file
                .read_line(&mut line)
                .await
                .context("Failed to read line")?;
            if byte_count == 0 {
                break;
            }

            for token in line.split_ascii_whitespace() {
                let Ok(number) = token.parse::<i32>() else {
                    // Return errors via the `emit_err` method.
                    emitter.emit_err(
                        anyhow::anyhow!("Failed to convert string \"{token}\" to number")
                    ).await;
                    continue;
                };
                emitter.emit(number).await;
            }
        }

        Ok(())
    })
}

为什么不使用async-stream

async-stream 很棒!它有很好的语法,但它基于宏,这带来了一些缺陷。

依赖项

~560–750KB
~14K SLoC