2 个版本
0.4.1 | 2021 年 9 月 8 日 |
---|---|
0.4.0 | 2021 年 8 月 29 日 |
#2114 in 网络编程
38 每月下载量
140KB
3K SLoC
Ratsio
Ratsio 是一个用于 NATS 消息系统和 NATS 事件流 的 Rust 客户端库。
受到 nitox 和 rust-nats 的启发,但我的项目需要 NATS 流,所以不能使用这两个中的任何一个。如果您觉得这个项目有用,请随意贡献或建议功能。目前它只是我需要的功能。
将以下内容添加到您的 Cargo.toml 中。
[dependencies]
ratsio = "0.3.0-alpha.6"
Rust -stable,-beta 和 -nightly 都受支持。
功能
- NATS 消息队列。发布、订阅和请求。
- NATS 集群支持,自动重连
- 动态更新集群主机
- 完全异步,使用 tokio 和 futures。
- TLS 模式
- NATS 1.x 身份验证
- NATS 2.0 基于 JWT 的客户端身份验证
- NATS 流服务器
用法
订阅和发布到 NATS 主题:请参阅 examples/nats_subscribe.rs
use ratsio::{NatsClient, RatsioError};
use log::info;
use futures::StreamExt;
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
//Create nats client
let nats_client = NatsClient::new("nats://127.0.0.1:4222").await?;
//subscribe to nats subject 'foo'
let (sid, mut subscription) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
//Listen for messages on the 'foo' description
//The loop terminates when the upon un_subscribe
while let Some(message) = subscription.next().await {
info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" << 1 >> unsubscribed. loop is terminated.")
});
//subscribe to nats subject 'foo', another subscription
let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
//Listen for messages on the 'foo' description
while let Some(message) = subscription2.next().await {
info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
});
//Publish some messages, restart nats server during this time.
use std::{thread, time};
thread::sleep(time::Duration::from_secs(5));
//Publish message
let _ = nats_client.publish("foo", b"Publish Message 1").await?;
thread::sleep(time::Duration::from_secs(1));
//Unsubscribe
let _ = nats_client.un_subscribe(&sid).await?;
thread::sleep(time::Duration::from_secs(3));
//Publish some messages.
thread::sleep(time::Duration::from_secs(1));
let _ = nats_client.publish("foo", b"Publish Message 2").await?;
thread::sleep(time::Duration::from_secs(600));
info!(" ---- done --- ");
Ok(())
}
订阅和发布到 NATS 流主题:请参阅 tests/stan_subscribe.rs
use log::info;
use futures::StreamExt;
use ratsio::{RatsioError, StanClient, StanOptions};
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
// Create stan options
let client_id = "test1".to_string();
let opts = StanOptions::with_options("localhost:4222", "test-cluster", &client_id[..]);
//Create STAN client
let stan_client = StanClient::from_options(opts).await?;
//Subscribe to STAN subject 'foo'
let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
tokio::spawn(async move {
while let Some(message) = subscription.next().await {
info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" ----- the subscription loop is done ---- ")
});
//Publish some mesesages to 'foo', use 'cargo run --example stan_publish foo "hi there"'
use std::{thread, time};
thread::sleep(time::Duration::from_secs(60));
//Unsubscribe
let _ = stan_client.un_subscribe(&sid).await;
thread::sleep(time::Duration::from_secs(10));
info!(" ---- done --- ");
Ok(())
}
重要变更
版本 0.2
有关 0.2.* 版本的所有信息均可在以下位置找到 版本 0.2.*。
版本 0.3.0-alpha.1
0.2版本API的重大变更 这是最初个支持async/await的版本,目前尚未准备好投入生产,仍在开发中。请参考examples/文件夹中的示例。
版本 0.3.0-alpha.3
这是第一个能够正常工作的async/await,但仍然缺少0.2版本的一些功能。
版本 0.3.0-alpha.4
将std::sync::RwLock替换为futures:🔒:Mutex以获得+ Send + Sync的能力。
版本 0.3.0-alpha.5
将failure::*替换为thiserror crate。
版本 0.3.0-alpha.6
版本 0.4.0-alpha.1
升级到使用tokio 1.0 合并了pull requests #21
联系方式
对于错误报告、补丁、功能请求或其他消息,请发送电子邮件至 [email protected]
许可协议
本项目采用MIT许可协议。
依赖关系
~11–27MB
~361K SLoC