13次重大版本更新
0.16.0 | 2021年10月23日 |
---|---|
0.14.0 | 2021年10月23日 |
0.11.0 | 2021年7月4日 |
42 在 #threshold
50 每月下载量
12KB
216 行
Relabuf - 带释放阀门的智能缓冲区
- 从外部
future
消费项目 - 内部缓冲最多到
hard_cap
- 当
hard_cap
达到时不再消费,导致生产者退避和减速 - 仅在特定条件下才能释放内容
- 自上次成功内容释放(或自开始以来)已通过
release_after
,且缓冲区不为空 - 添加了项目
soft_cap
- 每次消费应确认或
return
到缓冲区 - 返回通常由于错误(例如数据库宕机)而发生 - 因此可以配置退避
- 退避本质上覆盖了时间释放阀门
- 通过
future
暴露释放的项目,用户可以在其上await
安装
[dependencies]
relabuf = "~0.16.0"
示例
use anyhow::Context;
use flume::{bounded, Sender};
use relabuf::{ExponentialBackoff, RelaBuf, RelaBufConfig};
use std::time::{Duration, Instant};
use async_io::Timer;
async fn producer(tx: Sender<u32>) {
for i in 0..16 {
let dur = Duration::from_millis(150_u64 * (i as u64));
println!("waiting {:?} before emitting {}", &dur, i);
Timer::interval(dur).await;
let t = Instant::now();
let r = tx.send_async(i).await;
println!("emit for {} took {:?}: {:?}", i, t.elapsed(), r);
}
println!("producer is finished!")
}
#[tokio::main]
async fn main() {
let (tx, rx) = bounded(5);
tokio::spawn(producer(tx));
let opts = RelaBufConfig {
soft_cap: 3,
hard_cap: 5,
release_after: Duration::from_secs(5),
backoff: Some(ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
}),
};
let (buf, proxy) = RelaBuf::new(opts, move || {
let rx = rx.clone();
Box::pin(async move { rx.recv_async().await.context("cannot read") })
});
tokio::spawn(proxy.go());
let mut i = 0;
while let Ok(consumed) = buf.next().await {
i += 1;
if i <= 7 {
println!(
"consumed {:?} because {:?}, since last consumption {:?} - returning due to err",
consumed.items, consumed.reason, consumed.elapsed
);
consumed.return_on_err();
} else {
println!(
"consumed {:?} because {:?}, since last consumption {:?}",
consumed.items, consumed.reason, consumed.elapsed
);
consumed.confirm();
}
}
println!("done ;)");
}
依赖项
~6–20MB
~237K SLoC