#事件流 #nats #流式 #tokio #async #服务器-客户端

ratsio

比索是 NATS 消息系统和 NATS 事件流的 Rust 客户端库。

17 个版本

0.4.1 2022 年 1 月 17 日
0.4.0 2021 年 9 月 9 日
0.4.0-alpha.12021 年 4 月 14 日
0.3.0-alpha.62020 年 9 月 6 日
0.2.1 2019 年 1 月 19 日

异步 中排名第 1512

MIT 许可证

140KB
2.5K SLoC

比索

比索是 NATS 消息系统 NATS 和 NATS 事件流 NATS Event Streaming 的 Rust 客户端库。

nitoxrust-nats 启发,但我的项目需要 NATS 流,因此无法使用这两个库。如果你觉得这个项目有用,欢迎贡献或提出特性建议。目前它只包含我需要的特性。

将以下内容添加到你的 Cargo.toml 中。

[dependencies]
ratsio = "0.4.0"

Rust -stable, -beta 和 -nightly 都受支持。

特性

  • NATS 消息队列。发布、订阅和请求。
  • NATS 集群支持,自动重连
  • 动态更新集群主机。
  • 从头开始异步,使用 tokiofutures
  • TLS 模式
  • NATS 1.x 认证
  • NATS 2.0 基于 JWT 的客户端认证
  • NATS 流服务器

用法

订阅和发布到 NATS 主题:请参阅 examples/nats_subscribe.rs

use ratsio::{NatsClient, RatsioError};
use log::info;
use futures::StreamExt;

pub fn logger_setup() {
    use log::LevelFilter;
    use std::io::Write;
    use env_logger::Builder;

    let _ = Builder::new()
        .format(|buf, record| {
            writeln!(buf,
                     "[{}] - {}",
                     record.level(),
                     record.args()
            )
        })
        .filter(None, LevelFilter::Trace)
        .try_init();
}


#[tokio::main]
async fn main() -> Result<(), RatsioError> {
    logger_setup();

    //Create nats client
    let nats_client = NatsClient::new("nats://127.0.0.1:4222").await?;
    
    //subscribe to nats subject 'foo'
    let (sid, mut subscription) = nats_client.subscribe("foo").await?;
    tokio::spawn(async move {
        //Listen for messages on the 'foo' description 
        //The loop terminates when the upon un_subscribe
        while let Some(message) = subscription.next().await {
            info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
                  String::from_utf8_lossy(message.payload.as_ref()));
        }
        info!(" << 1 >> unsubscribed. loop is terminated.")
    });

    //subscribe to nats subject 'foo', another subscription 
    let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
    tokio::spawn(async move {
        //Listen for messages on the 'foo' description
        while let Some(message) = subscription2.next().await {
            info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
                  String::from_utf8_lossy(message.payload.as_ref()));
        }
    });

    //Publish some messages, restart nats server during this time.
    use std::{thread, time};
    thread::sleep(time::Duration::from_secs(5));


    //Publish message
    let _ = nats_client.publish("foo", b"Publish Message 1").await?;
    thread::sleep(time::Duration::from_secs(1));

    //Unsubscribe
    let _ = nats_client.un_subscribe(&sid).await?;
    thread::sleep(time::Duration::from_secs(3));

    //Publish some messages.
    thread::sleep(time::Duration::from_secs(1));
    let _ = nats_client.publish("foo", b"Publish Message 2").await?;
    thread::sleep(time::Duration::from_secs(600));
    info!(" ---- done --- ");
    Ok(())
}

订阅和发布到 NATS 流主题:请参阅 tests/stan_subscribe.rs

use log::info;
use futures::StreamExt;
use ratsio::{RatsioError, StanClient, StanOptions};

pub fn logger_setup() {
    use log::LevelFilter;
    use std::io::Write;
    use env_logger::Builder;

    let _ = Builder::new()
        .format(|buf, record| {
            writeln!(buf,
                     "[{}] - {}",
                     record.level(),
                     record.args()
            )
        })
        .filter(None, LevelFilter::Trace)
        .try_init();
}


#[tokio::main]
async fn main() -> Result<(), RatsioError> {
    logger_setup();
    // Create stan options
    let client_id = "test1".to_string();
    let opts = StanOptions::with_options("localhost:4222", "test-cluster", &client_id[..]);
    //Create STAN client
    let stan_client = StanClient::from_options(opts).await?;
    
    //Subscribe to STAN subject 'foo'
    let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
    tokio::spawn(async move {
        while let Some(message) = subscription.next().await {
            info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
                  String::from_utf8_lossy(message.payload.as_ref()));
        }
        info!(" ----- the subscription loop is done ---- ")
    });
    
    //Publish some mesesages to 'foo', use 'cargo run --example stan_publish foo "hi there"' 
    use std::{thread, time};
    thread::sleep(time::Duration::from_secs(60));
    
    //Unsubscribe 
    let _ = stan_client.un_subscribe(&sid).await;
    thread::sleep(time::Duration::from_secs(10));
    info!(" ---- done --- ");
    Ok(())
}    

重要变更

版本 0.2

有关 0.2.* 相关的所有信息,请参阅此处 版本 0.2.*

版本 0.3.0-alpha.1

从 0.2 版本的 API 破坏性变更。这是第一个兼容 async/await 的版本,但尚未准备用于生产环境,仍在开发中。请参阅 examples 文件夹中的示例。

版本 0.3.0-alpha.3

这是第一个真正工作的 async/await,但仍然缺少 0.2 版本的一些功能。

版本 0.3.0-alpha.4

用 futures:🔒:Mutex 替换了 std::sync::RwLock,以获得 + Send + Sync 能力。

版本 0.3.0-alpha.5

用 thiserror crate 替换了 failure::*。

版本 0.3.0-alpha.6

合并了拉取请求 #12#13

版本 0.4.0-alpha.1

升级到使用 tokio 1.0 合并了拉取请求 #21

版本 0.4.0

注释未使用的错误,因为如果更新版本这个错误,将丢弃冲突实现 合并了拉取请求 #23

联系方式

对于错误报告、补丁、功能请求或其他信息,请发送电子邮件至 [email protected]

许可证

本项目受MIT许可证许可。

依赖项

~12–25MB
~366K SLoC