#buffer #threshold #async #rust

relabuf

在时间或数量阈值达到后释放阀门缓冲区中的项目

13次重大版本更新

0.16.0 2021年10月23日
0.14.0 2021年10月23日
0.11.0 2021年7月4日

42#threshold

50 每月下载量

MIT 许可证

12KB
216

crates.io Dependency status

Relabuf - 带释放阀门的智能缓冲区

  • 从外部 future 消费项目
  • 内部缓冲最多到 hard_cap
  • hard_cap 达到时不再消费,导致生产者退避和减速
  • 仅在特定条件下才能释放内容
  • 自上次成功内容释放(或自开始以来)已通过 release_after,且缓冲区不为空
  • 添加了项目 soft_cap
  • 每次消费应确认或 return 到缓冲区
  • 返回通常由于错误(例如数据库宕机)而发生 - 因此可以配置退避
  • 退避本质上覆盖了时间释放阀门
  • 通过 future 暴露释放的项目,用户可以在其上 await

安装

[dependencies]
relabuf = "~0.16.0"

示例

use anyhow::Context;
use flume::{bounded, Sender};
use relabuf::{ExponentialBackoff, RelaBuf, RelaBufConfig};
use std::time::{Duration, Instant};
use async_io::Timer;

async fn producer(tx: Sender<u32>) {
    for i in 0..16 {
        let dur = Duration::from_millis(150_u64 * (i as u64));
        println!("waiting {:?} before emitting {}", &dur, i);
        Timer::interval(dur).await;

        let t = Instant::now();
        let r = tx.send_async(i).await;
        println!("emit for {} took {:?}: {:?}", i, t.elapsed(), r);
    }
    println!("producer is finished!")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = bounded(5);

    tokio::spawn(producer(tx));

    let opts = RelaBufConfig {
        soft_cap: 3,
        hard_cap: 5,
        release_after: Duration::from_secs(5),
        backoff: Some(ExponentialBackoff {
            max_elapsed_time: None,
            ..ExponentialBackoff::default()
        }),
    };

    let (buf, proxy) = RelaBuf::new(opts, move || {
        let rx = rx.clone();
        Box::pin(async move { rx.recv_async().await.context("cannot read") })
    });
    
    tokio::spawn(proxy.go());

    let mut i = 0;

    while let Ok(consumed) = buf.next().await {
        i += 1;

        if i <= 7 {
            println!(
                "consumed {:?} because {:?}, since last consumption {:?} - returning due to err",
                consumed.items, consumed.reason, consumed.elapsed
            );
            consumed.return_on_err();
        } else {
            println!(
                "consumed {:?} because {:?}, since last consumption {:?}",
                consumed.items, consumed.reason, consumed.elapsed
            );
            consumed.confirm();
        }
    }
    println!("done ;)");
}

依赖项

~6–20MB
~237K SLoC