12次发布

0.3.5 2023年4月5日
0.3.4 2023年2月18日
0.3.3 2022年3月13日
0.3.2 2021年5月23日
0.0.0 2019年6月7日

#14 in Rust模式

Download history 703645/week @ 2024-03-14 699385/week @ 2024-03-21 694378/week @ 2024-03-28 701353/week @ 2024-04-04 704983/week @ 2024-04-11 710510/week @ 2024-04-18 700867/week @ 2024-04-25 709846/week @ 2024-05-02 710269/week @ 2024-05-09 762107/week @ 2024-05-16 748353/week @ 2024-05-23 844329/week @ 2024-05-30 847025/week @ 2024-06-06 835250/week @ 2024-06-13 848001/week @ 2024-06-20 637110/week @ 2024-06-27

3,344,468 每月下载量
3,063 个crate中(543个直接使用)

MIT 许可证

19KB
217 代码行

Rust的异步流

元素异步流。

提供两个宏,stream!try_stream!,允许调用者定义异步元素流。这些使用 async & await 语法实现。这个crate不需要不稳定功能。

stream! 宏返回一个实现了 Stream 特质的匿名类型。关联类型 Item 是从流中产生的值的类型。try_stream! 也返回一个实现了 Stream 特质的匿名类型,但关联类型 ItemResult<T, Error>try_stream! 宏支持在实现中使用 ? 语法。

使用方法

一个基本的产生数字的流。使用 yield 关键字产生值。流块必须返回 ()

use async_stream::stream;

use futures_util::pin_mut;
use futures_util::stream::StreamExt;

#[tokio::main]
async fn main() {
    let s = stream! {
        for i in 0..3 {
            yield i;
        }
    };

    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

可以通过使用 impl Stream<Item = T> 返回流

use async_stream::stream;

use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = zero_to_three();
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

流可以用其他流实现 - async-stream 提供了 for await 语法以协助实现

use async_stream::stream;

use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

fn double<S: Stream<Item = u32>>(input: S)
    -> impl Stream<Item = u32>
{
    stream! {
        for await value in input {
            yield value * 2;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = double(zero_to_three());
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

可以使用 Rust 的 try 语法(?)与 try_stream! 宏一起使用。返回流中的 ItemResult 类型,其中 Ok 是产生的值,而 Err 是由 ? 返回的错误类型。

use tokio::net::{TcpListener, TcpStream};

use async_stream::try_stream;
use futures_core::stream::Stream;

use std::io;
use std::net::SocketAddr;

fn bind_and_accept(addr: SocketAddr)
    -> impl Stream<Item = io::Result<TcpStream>>
{
    try_stream! {
        let mut listener = TcpListener::bind(addr).await?;

        loop {
            let (stream, addr) = listener.accept().await?;
            println!("received on {:?}", addr);
            yield stream;
        }
    }
}

实现

stream!try_stream! 使用 proc 宏实现。宏在语法树中搜索 yield $expr 的实例,并将它们转换为 sender.send($expr).await

流使用轻量级的发送者将值从流实现发送到调用者。进入流时,在栈上存储一个 Option<T>。将单元格的指针存储在线程局部变量中,并在异步块上调用 poll。当 poll 返回时,sender.send(value) 存储单元格的值并返回给调用者。

支持的 Rust 版本

当前最低支持的 Rust 版本是 1.56。

许可证

本项目受 MIT 许可证的许可,详情请见 MIT 许可证

贡献

除非您明确声明,否则您提交给 async-stream 的任何有意贡献都应按照 MIT 许可,不附加任何额外的条款或条件。

依赖项

~0.3–0.8MB
~19K SLoC