#reactive #iterable #stream #observable #frp

callbag

Rust语言实现反应式/迭代式编程的callbag规范

13个重大版本发布

0.14.0 2022年1月14日
0.13.0 2021年12月28日
0.12.0 2021年12月25日
0.6.0 2021年11月23日

异步 中排名 729

每月下载量 36

MIT/Apache

110KB
1.5K SLoC

callbag-rs

Rust语言实现 callbag规范,用于反应式/迭代式编程。

提供基本的 callbag 工厂和操作符,以开始使用。

亮点

  • 支持反应式流编程
  • 支持迭代式编程(也是!)
  • 相同的操作符适用于以上两种
  • 可扩展

想象一个介于 Observable(Async)Iterable 之间的混合体,这就是callbag的全部。所有这些都可以通过一些简单的回调,按照 callbag规范 来实现。

CI Crates.io Documentation MIT OR Apache-2.0 licensed

示例

反应式编程示例

从一个每秒滴答一次的时钟中选取前5个奇数,然后开始观察它们

use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{filter, for_each, interval, map, pipe, take};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual = Arc::new(SegQueue::new());

pipe!(
    interval(Duration::from_millis(1_000), nursery.clone()),
    map(|x| x + 1),
    filter(|x| x % 2 == 1),
    take(5),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{}", x);
            actual.push(x);
        }
    }),
);

drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [1, 3, 5, 7, 9]
);

迭代式编程示例

从一组数字中选择5个,将它们除以4,然后逐个拉取

use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{for_each, from_iter, map, pipe, take};

#[derive(Clone)]
struct Range {
    i: usize,
    to: usize,
}

impl Range {
    fn new(from: usize, to: usize) -> Self {
        Range { i: from, to }
    }
}

impl Iterator for Range {
    type Item = usize;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.i;
        if i <= self.to {
            self.i += 1;
            Some(i)
        } else {
            None
        }
    }
}

let actual = Arc::new(SegQueue::new());

pipe!(
    from_iter(Range::new(40, 99)),
    take(5),
    map(|x| x as f64 / 4.0),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{}", x);
            actual.push(x);
        }
    }),
);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [10.0, 10.25, 10.5, 10.75, 11.0]
);

Ok::<(), Box<dyn std::error::Error>>(())

API

以下列表显示了包含的内容。

源工厂

接收工厂

转换操作符

过滤操作符

组合操作符

实用工具

术语

  • :传递数据的callbag
  • 接收器:接收数据的callbag
  • 拉取接收器:主动从源请求数据的接收器
  • 可拉取源:仅在需要时(接收到请求时)传递数据的源
  • 监听接收器:从源被动接收数据的接收器
  • 可监听源:向接收器发送数据而不等待请求的源
  • 操作符:基于另一个callbag并应用某些操作的callbag

许可

根据以下任一许可进行许可

由您选择。

贡献

除非您明确声明,否则任何根据Apache-2.0许可证定义的、您有意提交以包含在本工作中的贡献,将如上双许可,无需任何附加条款或条件。

致谢

感谢André Staltz(@staltz)创建了callbag规范

此库是https://github.com/staltz/callbag-basics的移植。一些灵感来自https://github.com/f5io/callbag.rs

感谢Rust用户论坛上的出色人士的帮助,尤其是

依赖关系

~185–570KB
~10K SLoC