15个版本

使用旧的Rust 2015

0.4.9 2017年12月3日
0.4.8 2017年12月3日
0.4.7 2017年11月30日
0.3.1 2017年9月30日
0.1.1 2017年5月10日

#1502 in 异步

Download history 24/week @ 2024-04-02 3/week @ 2024-04-23 1/week @ 2024-06-04 2/week @ 2024-06-11

每月63次下载
用于amqpr

MIT/Apache

44KB
953

futures_ext

我非常希望得到您的贡献,特别是英文资料。正如您所见,我的英文能力不好。请帮助我改进文档。

MIT licensed Apache-2.0 licensed Crates.io

futures的扩展。

文档

目前,此crate使您能够

  • 创建发布/订阅通道(将来将被移除)
  • 将任何类型的流/接收器转换为"可复制的"
  • 分叉任何类型的流
  • 转换与Error关联的类型,该类型为()

如何使用

发布-订阅通道

使用方法几乎与futures::unsync::mpsc::unbounded相同。

use ex_futures::unsync::pubsub::unbounded;

fn main() {
    let (tx, rx) = unbounded::<usize>();
    let rx2 = rx.clone();
    let mut rx = rx.wait();
    let mut rx2 = rx.wait(); // Subscriber is cloneable

    tx.send(1).wait().unwrap();

    assert_eq!(rx.next().unwrap().map(|i| *i), Ok(1));
    assert_eq!(rx2.next().unwrap().map(|i| *i), Ok(1));
}

可复制的流/接收器

use ex_futures::StreamExt;
use futures::unsycn::mpsc::channel;

fn main() {
    let (tx, rx) = channel(42);;

    let cloneable_rx = rx.unsync_cloneable(); // Convert "rx" into cloneable
    let cloneable_rx2 = cloneable.clone();  // Now you can clone it

    let tx = tx.wait();
    tx.send(0);
    tx.send(1);
    tx.send(2);
    tx.send(3);

    assert_eq!(cloneable_rx.collect().wait().unwrap(), [0, 1, 2, 3]);
    assert_eq!(cloneable_rx2.collect().wait().unwrap(), [0, 1, 2, 3]);
}

分叉的流

use ex_futures::StreamExt;

fn main() {
    let int_stream = gen_stream(); // Somehow you create some stream

    let (even, odd) = int_stream.fork(|i| i % 2 == 0);

    assert_eq!(even.collect().wait().unwrap(), [0, 2]);
    assert_eq!(odd.collect().wait().unwrap(), [1, 3]);
}

依赖项

~53KB