13 个版本

0.2.1 2024 年 7 月 23 日
0.2.0 2024 年 7 月 15 日
0.1.10 2024 年 7 月 13 日
0.1.1 2024 年 6 月 29 日

并发 类别中排名第 135

Download history 108/week @ 2024-06-22 471/week @ 2024-06-29 444/week @ 2024-07-06 408/week @ 2024-07-13 107/week @ 2024-07-20 43/week @ 2024-07-27 1/week @ 2024-08-03

每月下载量 158

MIT 许可证

71KB
2K SLoC

channel-rs

version status

英语 | 简体中文

Rust 高级队列库

摘要

此库主要用于高级队列应用场景,用于简化逻辑代码。

手册

安装:在项目目录中运行 cargo add channel

无界队列

特性:无限缓存容量,生产者和消费者可以有多个,消息只能消费一次

let (tx, rx) = channel::new(None, false);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let a = rx.len();           // 5
let rx2 = rx.clone();
let b = rx.recv().unwrap(); // 1
let c = rx2.recv_items(3);  // vec![2, 3, 4]
let d = rx.recv().unwrap(); // 5

有界队列

特性:仅缓存指定数量,超出部分最早数据被覆盖,生产者和消费者可以有多个,消息只能消费一次

let (tx, rx) = channel::new(Some(4), false);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let rx2 = rx.clone();
let a = rx.recv_items(2);  // vec![2, 3]
let b = rx2.recv_items(2); // vec![4, 5]
let c = rx.is_empty();     // true

无界分发队列

特性:缓存的最大数量是理论上的,可以有多个生产者和消费者,任何消息都将被所有消费者消费

let (tx, rx) = channel::new(None, true);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let rx2 = rx.clone();
let a = rx.recv_items(3);       // vec![1, 2, 3]
let b = rx2.recv_items(3);      // vec![1, 2, 3]
let c = rx.recv_items_weak(3);  // vec![4, 5]
let d = rx2.recv_items_weak(3); // vec![4, 5]

有界分发队列

特性:仅缓存指定数量,超过部分最早数据被覆盖,生产者和消费者可以有多个,任何消息只要未被覆盖都将被所有消费者消费

let (tx, rx) = channel::new(Some(4), true);
tx.send_items(vec![1, 2, 3, 4]);
tx.send(5);
let rx2 = rx.clone();
let a = rx.recv_items(3);       // vec![2, 3, 4]
let a = rx2.recv_items(3);      // vec![2, 3, 4]
let a = rx.recv_items_weak(3);  // vec![5]
let a = rx2.recv_items_weak(3); // vec![5]

时间序列队列

特性:效果与上述队列类似,但增加了必须在数据达到时间后才接收数据的功能。可以理解为在播放视频文件时将帧延迟推送到屏幕上

#[derive(Clone)]
struct MyTSStruct {
    time: NaiveDateTime,
    data: i32,
}

impl MyTSStruct {
    pub fn new(time: NaiveDateTime, data: i32) -> Self { Self { time, data } }
}

impl channel::GetDataTimeExt for MyTSStruct {
    fn get_data_time(&self) -> NaiveDateTime { self.time.clone() }
}

// ...
let (tx, rx) = channel::new_time_series(None, false, NaiveDateTime::now(), 1.0);
// let (tx, rx) = channel::new_time_series(Some(10), false, NaiveDateTime::now(), 1.0);
// let (tx, rx) = channel::new_time_series(None, true, NaiveDateTime::now(), 1.0);
// let (tx, rx) = channel::new_time_series(Some(10), true, NaiveDateTime::now(), 1.0);
tx.send_items(vec![
    MyTSStruct::new(NaiveDateTime::now() - chrono::Duration::milliseconds(10), 111),
    MyTSStruct::new(NaiveDateTime::now() + chrono::Duration::milliseconds(10), 222),
]);
let a = rx.len(); // 2
let rx2 = rx.clone();
let b = rx.recv().unwrap().data; // 111
let c = rx2.recv().is_none(); // true
sleep(Duration::from_millis(10));
let d = rx2.recv().unwrap().data; // 222

观察者

特性:观察者不直接接收管道数据,但可以检测当前缓存使用情况并直接从缓存中提取数据。观察者和接收者可以互换

let (tx, rx) = channel::new_time_series(None, true, NaiveDateTime::now(), 1.0);
let ox = rx.get_observer();
tx.send_items(vec![
    MyTSStruct::new(NaiveDateTime::now() - Duration::milliseconds(10), 111),
    MyTSStruct::new(NaiveDateTime::now() + Duration::milliseconds(10), 222),
]);
let a = rx.recv().unwrap().data; // 111
let tx2 = ox.get_receiver();
let b = tx2.len(); // 1

// The following code is available when the `metrics` feature is enabled
let result = ox.get_metrics_result(true);
println!("{:?}", result);
let send_count: usize = result.sender_counts.iter().map(|(_, v)| *v).sum(); // 2
let recv_count: usize = result.receiver_counts.iter().map(|(_, v)| *v).sum(); // 1

依赖

~6–17MB
~208K SLoC