10个版本
0.1.0-alpha.10 | 2020年4月15日 |
---|---|
0.1.0-alpha.9 | 2020年3月29日 |
0.1.0-alpha.5 | 2020年2月16日 |
0.1.0-alpha.3 | 2020年1月6日 |
0.1.0-alpha.2 | 2019年12月26日 |
#14 in #connected
每月60次下载
155KB
3K SLoC
一个追求健壮、高效且易于使用的纯Rust MQTT客户端。
- 事件循环只是一个可以被tokio轮询的异步
Stream
。 - 对事件循环的请求也是一个
Stream
。解决了有界和无界两种用例。 - 健壮性只需一个循环即可实现。
- 灵活访问事件循环的状态以控制其行为。
接受任何请求流。
构建有界、无界、可中断或其他任何满足您需求的流(以喂养事件循环)。
一些我们的实际应用场景
- 一个流,通过检测背压在磁盘和内存之间编排数据,永远不会(实际上)丢失数据。
- 一个流,根据数据的优先级在多个通道之间切换数据。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
}
健壮性只需一个循环即可实现。
网络是不可靠的。但健壮性是容易实现的。
- 只需从现有的事件循环创建一个新的流即可。
- 从上次离开的地方继续。
- 访问事件循环的状态以自定义下一个连接的行为。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
// loop to reconnect and resume
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
time::delay_for(Duration::from_secs(1)).await;
}
}
事件循环只是一个可以被tokio轮询的流。
- 将其插入到
select!
join!
中,与其他线程上的流交织。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
// plug it into tokio ecosystem
let mut stream = eventloop.connect().await.unwrap();
}
强大的通知系统来控制运行时。
事件循环流产生所有有趣的事件,从网络上的数据到断开和重新连接。
- 重新连接后重新订阅。
- 在收到第N个puback后停止。
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let (requests_tx, requests_rx) = channel(10);
let mut eventloop = eventloop(mqttoptions, requests_rx);
// loop to reconnect and resume
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(notification) = stream.next().await {
println!("Received = {:?}", item);
match notification {
Notification::Connect => requests_tx.send(subscribe).unwrap(),
}
}
time::delay_for(Duration::from_secs(1)).await;
}
}
依赖项
~15MB
~351K SLoC