#stream #future #macro #async #output-stream

futures-to-stream

将同类型的输出类型的异构Futures转换为流

1 个不稳定版本

0.2.0 2022年3月19日

#1824 in 异步

MIT 许可证

8KB
89

Futures to stream

宏,用于从异构Futures创建流

用法

async fn test1() -> u8 { 1 }
async fn test2() -> u8 { 2 }
async fn test3() -> u8 { 3 }
async fn test4() -> u8 { 4 }

fn ordered_stream() -> impl Stream<Item = u8> {
  futures_to_ordered_stream!(test1(), test2(), test3(), test4())
}

fn unordered_stream() -> impl Stream<Item = u8> {
  futures_to_unordered_stream!(test1(), test2(), test3(), test4())
}

目标

允许从具有相同相关Item的异构Future创建流。

问题

从一组Future创建Stream的方式是创建一个FuturesOrderedFuturesUnordered。然而,由于这些存储着它们参数化的类型的Future,你不能给它们一个异构的Futures集合。

这是一个可以编译的示例

async fn test1() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(test1());
  futs.push(test1());
  futs
}

这是一个无法编译的示例

async fn test1() -> () { () }

async fn test2() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(test1());
  futs.push(test2()); // Error: expected opaque type, found a different opaque type
  futs
}

很好,非常有帮助的rustc。我们以不同的名字创建了完全相同的功能,但是Future在某处有所不同。

好吧,有一种方法可以很容易地合并两个不同的futures - 使用future::Either

async fn test1() -> () { () }

async fn test2() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(Either::Left(test1()));
  futs.push(Either::Right(test2()));
  futs
}

那很好,现在让我们尝试使用四个Future

async fn test1() -> () { () }
async fn test2() -> () { () }
async fn test3() -> () { () }
async fn test4() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(Either::Left(test1()));
  futs.push(Either::Right(Either::Left(test2())));
  futs.push(Either::Right(Either::Right(Either::Left(test3()))));
  futs.push(Either::Right(Either::Right(Either::Right(test4()))));
  futs
}

对于四个,它已经相当难以控制。幸运的是,这个包导出了宏来自动生成所有这些。

回到 Usage

依赖项

~2.9–4.5MB
~78K SLoC