#event-streaming #nats #streaming #tokio #async #events #server-client

ratsio_fork_040

ratsio 的分支,以避免重复的错误转换。Ratsio 是一个用于 NATS 消息系统和 NATS 事件流的 Rust 客户端库。

2 个版本

0.4.1 2021 年 9 月 8 日
0.4.0 2021 年 8 月 29 日

#2114 in 网络编程

38 每月下载量

MIT 许可证

140KB
3K SLoC

Ratsio

Ratsio 是一个用于 NATS 消息系统和 NATS 事件流 的 Rust 客户端库。

受到 nitoxrust-nats 的启发,但我的项目需要 NATS 流,所以不能使用这两个中的任何一个。如果您觉得这个项目有用,请随意贡献或建议功能。目前它只是我需要的功能。

将以下内容添加到您的 Cargo.toml 中。

[dependencies]
ratsio = "0.3.0-alpha.6"

Rust -stable,-beta 和 -nightly 都受支持。

功能

  • NATS 消息队列。发布、订阅和请求。
  • NATS 集群支持,自动重连
  • 动态更新集群主机
  • 完全异步,使用 tokiofutures
  • TLS 模式
  • NATS 1.x 身份验证
  • NATS 2.0 基于 JWT 的客户端身份验证
  • NATS 流服务器

用法

订阅和发布到 NATS 主题:请参阅 examples/nats_subscribe.rs

use ratsio::{NatsClient, RatsioError};
use log::info;
use futures::StreamExt;

pub fn logger_setup() {
    use log::LevelFilter;
    use std::io::Write;
    use env_logger::Builder;

    let _ = Builder::new()
        .format(|buf, record| {
            writeln!(buf,
                     "[{}] - {}",
                     record.level(),
                     record.args()
            )
        })
        .filter(None, LevelFilter::Trace)
        .try_init();
}


#[tokio::main]
async fn main() -> Result<(), RatsioError> {
    logger_setup();

    //Create nats client
    let nats_client = NatsClient::new("nats://127.0.0.1:4222").await?;
    
    //subscribe to nats subject 'foo'
    let (sid, mut subscription) = nats_client.subscribe("foo").await?;
    tokio::spawn(async move {
        //Listen for messages on the 'foo' description 
        //The loop terminates when the upon un_subscribe
        while let Some(message) = subscription.next().await {
            info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
                  String::from_utf8_lossy(message.payload.as_ref()));
        }
        info!(" << 1 >> unsubscribed. loop is terminated.")
    });

    //subscribe to nats subject 'foo', another subscription 
    let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
    tokio::spawn(async move {
        //Listen for messages on the 'foo' description
        while let Some(message) = subscription2.next().await {
            info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
                  String::from_utf8_lossy(message.payload.as_ref()));
        }
    });

    //Publish some messages, restart nats server during this time.
    use std::{thread, time};
    thread::sleep(time::Duration::from_secs(5));


    //Publish message
    let _ = nats_client.publish("foo", b"Publish Message 1").await?;
    thread::sleep(time::Duration::from_secs(1));

    //Unsubscribe
    let _ = nats_client.un_subscribe(&sid).await?;
    thread::sleep(time::Duration::from_secs(3));

    //Publish some messages.
    thread::sleep(time::Duration::from_secs(1));
    let _ = nats_client.publish("foo", b"Publish Message 2").await?;
    thread::sleep(time::Duration::from_secs(600));
    info!(" ---- done --- ");
    Ok(())
}

订阅和发布到 NATS 流主题:请参阅 tests/stan_subscribe.rs

use log::info;
use futures::StreamExt;
use ratsio::{RatsioError, StanClient, StanOptions};

pub fn logger_setup() {
    use log::LevelFilter;
    use std::io::Write;
    use env_logger::Builder;

    let _ = Builder::new()
        .format(|buf, record| {
            writeln!(buf,
                     "[{}] - {}",
                     record.level(),
                     record.args()
            )
        })
        .filter(None, LevelFilter::Trace)
        .try_init();
}


#[tokio::main]
async fn main() -> Result<(), RatsioError> {
    logger_setup();
    // Create stan options
    let client_id = "test1".to_string();
    let opts = StanOptions::with_options("localhost:4222", "test-cluster", &client_id[..]);
    //Create STAN client
    let stan_client = StanClient::from_options(opts).await?;
    
    //Subscribe to STAN subject 'foo'
    let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
    tokio::spawn(async move {
        while let Some(message) = subscription.next().await {
            info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
                  String::from_utf8_lossy(message.payload.as_ref()));
        }
        info!(" ----- the subscription loop is done ---- ")
    });
    
    //Publish some mesesages to 'foo', use 'cargo run --example stan_publish foo "hi there"' 
    use std::{thread, time};
    thread::sleep(time::Duration::from_secs(60));
    
    //Unsubscribe 
    let _ = stan_client.un_subscribe(&sid).await;
    thread::sleep(time::Duration::from_secs(10));
    info!(" ---- done --- ");
    Ok(())
}    

重要变更

版本 0.2

有关 0.2.* 版本的所有信息均可在以下位置找到 版本 0.2.*

版本 0.3.0-alpha.1

0.2版本API的重大变更 这是最初个支持async/await的版本,目前尚未准备好投入生产,仍在开发中。请参考examples/文件夹中的示例。

版本 0.3.0-alpha.3

这是第一个能够正常工作的async/await,但仍然缺少0.2版本的一些功能。

版本 0.3.0-alpha.4

将std::sync::RwLock替换为futures:🔒:Mutex以获得+ Send + Sync的能力。

版本 0.3.0-alpha.5

将failure::*替换为thiserror crate。

版本 0.3.0-alpha.6

合并了pull requests #12#13

版本 0.4.0-alpha.1

升级到使用tokio 1.0 合并了pull requests #21

联系方式

对于错误报告、补丁、功能请求或其他消息,请发送电子邮件至 [email protected]

许可协议

本项目采用MIT许可协议。

依赖关系

~11–27MB
~361K SLoC