13 个版本
0.2.1 | 2024 年 7 月 23 日 |
---|---|
0.2.0 | 2024 年 7 月 15 日 |
0.1.10 | 2024 年 7 月 13 日 |
0.1.1 | 2024 年 6 月 29 日 |
在 并发 类别中排名第 135
每月下载量 158
71KB
2K SLoC
channel-rs
英语 | 简体中文
Rust 高级队列库
摘要
此库主要用于高级队列应用场景,用于简化逻辑代码。
手册
安装:在项目目录中运行 cargo add channel
无界队列
特性:无限缓存容量,生产者和消费者可以有多个,消息只能消费一次
let (tx, rx) = channel::new(None, false);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let a = rx.len(); // 5
let rx2 = rx.clone();
let b = rx.recv().unwrap(); // 1
let c = rx2.recv_items(3); // vec![2, 3, 4]
let d = rx.recv().unwrap(); // 5
有界队列
特性:仅缓存指定数量,超出部分最早数据被覆盖,生产者和消费者可以有多个,消息只能消费一次
let (tx, rx) = channel::new(Some(4), false);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let rx2 = rx.clone();
let a = rx.recv_items(2); // vec![2, 3]
let b = rx2.recv_items(2); // vec![4, 5]
let c = rx.is_empty(); // true
无界分发队列
特性:缓存的最大数量是理论上的,可以有多个生产者和消费者,任何消息都将被所有消费者消费
let (tx, rx) = channel::new(None, true);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let rx2 = rx.clone();
let a = rx.recv_items(3); // vec![1, 2, 3]
let b = rx2.recv_items(3); // vec![1, 2, 3]
let c = rx.recv_items_weak(3); // vec![4, 5]
let d = rx2.recv_items_weak(3); // vec![4, 5]
有界分发队列
特性:仅缓存指定数量,超过部分最早数据被覆盖,生产者和消费者可以有多个,任何消息只要未被覆盖都将被所有消费者消费
let (tx, rx) = channel::new(Some(4), true);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let rx2 = rx.clone();
let a = rx.recv_items(3); // vec![2, 3, 4]
let a = rx2.recv_items(3); // vec![2, 3, 4]
let a = rx.recv_items_weak(3); // vec![5]
let a = rx2.recv_items_weak(3); // vec![5]
时间序列队列
特性:效果与上述队列类似,但增加了必须在数据达到时间后才接收数据的功能。可以理解为在播放视频文件时将帧延迟推送到屏幕上
#[derive(Clone)]
struct MyTSStruct {
time: NaiveDateTime,
data: i32,
}
impl MyTSStruct {
pub fn new(time: NaiveDateTime, data: i32) -> Self { Self { time, data } }
}
impl channel::GetDataTimeExt for MyTSStruct {
fn get_data_time(&self) -> NaiveDateTime { self.time.clone() }
}
// ...
let (tx, rx) = channel::new_time_series(None, false, NaiveDateTime::now(), 1.0);
// let (tx, rx) = channel::new_time_series(Some(10), false, NaiveDateTime::now(), 1.0);
// let (tx, rx) = channel::new_time_series(None, true, NaiveDateTime::now(), 1.0);
// let (tx, rx) = channel::new_time_series(Some(10), true, NaiveDateTime::now(), 1.0);
tx.send_items(vec![
MyTSStruct::new(NaiveDateTime::now() - chrono::Duration::milliseconds(10), 111),
MyTSStruct::new(NaiveDateTime::now() + chrono::Duration::milliseconds(10), 222),
]);
let a = rx.len(); // 2
let rx2 = rx.clone();
let b = rx.recv().unwrap().data; // 111
let c = rx2.recv().is_none(); // true
sleep(Duration::from_millis(10));
let d = rx2.recv().unwrap().data; // 222
观察者
特性:观察者不直接接收管道数据,但可以检测当前缓存使用情况并直接从缓存中提取数据。观察者和接收者可以互换
let (tx, rx) = channel::new_time_series(None, true, NaiveDateTime::now(), 1.0);
let ox = rx.get_observer();
tx.send_items(vec![
MyTSStruct::new(NaiveDateTime::now() - Duration::milliseconds(10), 111),
MyTSStruct::new(NaiveDateTime::now() + Duration::milliseconds(10), 222),
]);
let a = rx.recv().unwrap().data; // 111
let tx2 = ox.get_receiver();
let b = tx2.len(); // 1
// The following code is available when the `metrics` feature is enabled
let result = ox.get_metrics_result(true);
println!("{:?}", result);
let send_count: usize = result.sender_counts.iter().map(|(_, v)| *v).sum(); // 2
let recv_count: usize = result.receiver_counts.iter().map(|(_, v)| *v).sum(); // 1
依赖
~6–17MB
~208K SLoC