1 个不稳定版本
0.1.0-alpha.11 | 2020 年 7 月 25 日 |
---|
#21 in #事件循环
每月 106 次下载
155KB
3K SLoC
一个纯 Rust MQTT 客户端,力求鲁棒、高效且易于使用。
- 事件循环就是一个异步
Stream
,可以被 tokio 轮询。 - 对事件循环的请求也是一个
Stream
。解决了有界和无界用例。 - 鲁棒性只需一个循环即可实现。
- 灵活访问事件循环的状态以控制其行为。
接受任何请求流。
构建有界、无界、可中断或其他任何满足您需求的流(用于向事件循环提供数据)。
我们的一些实际用例
- 一个流,通过检测背压来协调磁盘和内存之间的数据,并且(实际上)不会丢失数据。
- 一个流,根据数据的优先级在多个通道之间处理数据。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
}
鲁棒性只需一个循环即可实现。
网络是不可靠的。但鲁棒性是容易实现的。
- 只需从现有的事件循环创建一个新的流。
- 从上次停止的地方继续。
- 访问事件循环的状态以自定义下一次连接的行为。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
// loop to reconnect and resume
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
time::delay_for(Duration::from_secs(1)).await;
}
}
事件循环就是一个可以被 tokio 轮询的流。
- 将其连接到
select!
或join!
以在同一个线程上与其他流交错。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
// plug it into tokio ecosystem
let mut stream = eventloop.connect().await.unwrap();
}
强大的通知系统以控制运行时
事件循环流产生所有有趣的事件,从网络上的数据到断开连接和重新连接。按您认为合适的方式使用它。
- 重新连接后重新订阅
- 接收第 N 个 puback 后停止
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let (requests_tx, requests_rx) = channel(10);
let mut eventloop = eventloop(mqttoptions, requests_rx);
// loop to reconnect and resume
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(notification) = stream.next().await {
println!("Received = {:?}", item);
match notification {
Notification::Connect => requests_tx.send(subscribe).unwrap(),
}
}
time::delay_for(Duration::from_secs(1)).await;
}
}
依赖关系
~15MB
~351K SLoC