17 个版本
0.4.1 | 2022 年 1 月 17 日 |
---|---|
0.4.0 | 2021 年 9 月 9 日 |
0.4.0-alpha.1 | 2021 年 4 月 14 日 |
0.3.0-alpha.6 | 2020 年 9 月 6 日 |
0.2.1 | 2019 年 1 月 19 日 |
在 异步 中排名第 1512
140KB
2.5K SLoC
比索
比索是 NATS 消息系统 NATS 和 NATS 事件流 NATS Event Streaming 的 Rust 客户端库。
受 nitox 和 rust-nats 启发,但我的项目需要 NATS 流,因此无法使用这两个库。如果你觉得这个项目有用,欢迎贡献或提出特性建议。目前它只包含我需要的特性。
将以下内容添加到你的 Cargo.toml 中。
[dependencies]
ratsio = "0.4.0"
Rust -stable, -beta 和 -nightly 都受支持。
特性
- NATS 消息队列。发布、订阅和请求。
- NATS 集群支持,自动重连
- 动态更新集群主机。
- 从头开始异步,使用 tokio 和 futures。
- TLS 模式
- NATS 1.x 认证
- NATS 2.0 基于 JWT 的客户端认证
- NATS 流服务器
用法
订阅和发布到 NATS 主题:请参阅 examples/nats_subscribe.rs
use ratsio::{NatsClient, RatsioError};
use log::info;
use futures::StreamExt;
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
//Create nats client
let nats_client = NatsClient::new("nats://127.0.0.1:4222").await?;
//subscribe to nats subject 'foo'
let (sid, mut subscription) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
//Listen for messages on the 'foo' description
//The loop terminates when the upon un_subscribe
while let Some(message) = subscription.next().await {
info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" << 1 >> unsubscribed. loop is terminated.")
});
//subscribe to nats subject 'foo', another subscription
let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
//Listen for messages on the 'foo' description
while let Some(message) = subscription2.next().await {
info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
});
//Publish some messages, restart nats server during this time.
use std::{thread, time};
thread::sleep(time::Duration::from_secs(5));
//Publish message
let _ = nats_client.publish("foo", b"Publish Message 1").await?;
thread::sleep(time::Duration::from_secs(1));
//Unsubscribe
let _ = nats_client.un_subscribe(&sid).await?;
thread::sleep(time::Duration::from_secs(3));
//Publish some messages.
thread::sleep(time::Duration::from_secs(1));
let _ = nats_client.publish("foo", b"Publish Message 2").await?;
thread::sleep(time::Duration::from_secs(600));
info!(" ---- done --- ");
Ok(())
}
订阅和发布到 NATS 流主题:请参阅 tests/stan_subscribe.rs
use log::info;
use futures::StreamExt;
use ratsio::{RatsioError, StanClient, StanOptions};
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
// Create stan options
let client_id = "test1".to_string();
let opts = StanOptions::with_options("localhost:4222", "test-cluster", &client_id[..]);
//Create STAN client
let stan_client = StanClient::from_options(opts).await?;
//Subscribe to STAN subject 'foo'
let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
tokio::spawn(async move {
while let Some(message) = subscription.next().await {
info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" ----- the subscription loop is done ---- ")
});
//Publish some mesesages to 'foo', use 'cargo run --example stan_publish foo "hi there"'
use std::{thread, time};
thread::sleep(time::Duration::from_secs(60));
//Unsubscribe
let _ = stan_client.un_subscribe(&sid).await;
thread::sleep(time::Duration::from_secs(10));
info!(" ---- done --- ");
Ok(())
}
重要变更
版本 0.2
有关 0.2.* 相关的所有信息,请参阅此处 版本 0.2.*。
版本 0.3.0-alpha.1
从 0.2 版本的 API 破坏性变更。这是第一个兼容 async/await 的版本,但尚未准备用于生产环境,仍在开发中。请参阅 examples 文件夹中的示例。
版本 0.3.0-alpha.3
这是第一个真正工作的 async/await,但仍然缺少 0.2 版本的一些功能。
版本 0.3.0-alpha.4
用 futures:🔒:Mutex 替换了 std::sync::RwLock,以获得 + Send + Sync 能力。
版本 0.3.0-alpha.5
用 thiserror crate 替换了 failure::*。
版本 0.3.0-alpha.6
版本 0.4.0-alpha.1
升级到使用 tokio 1.0 合并了拉取请求 #21
版本 0.4.0
注释未使用的错误,因为如果更新版本这个错误,将丢弃冲突实现 合并了拉取请求 #23
联系方式
对于错误报告、补丁、功能请求或其他信息,请发送电子邮件至 [email protected]
许可证
本项目受MIT许可证许可。
依赖项
~12–25MB
~366K SLoC