#client-connection #actix #actor #nsq #async #queue

nightly nsq-client

Rust语言编写的NSQ实时消息处理系统客户端

12个版本

使用旧的Rust 2015

0.1.12 2019年2月21日
0.1.11 2019年2月19日

#736 in 异步

每月下载 31

MIT 许可证

78KB
1K SLoC

用Rust编写的NSQ客户端 构建状态 构建状态

赞助

基于 Actix 的NSQ实时消息处理系统客户端实现。

Nsq-client默认支持多个连接的多个Reader,Reader通过轮询算法分配到单个连接。

用法

要使用nsq-client,将以下内容添加到您的Cargo.toml中

[dependencies]
actix = "0.7"
nsq-client = "0.1.12"

创建您的第一个消费者

为了使用nsq-client,您首先需要创建一个Reader actor,该actor实现您想从连接中接收的消息类型的Handler,然后将其订阅到连接以接收您选择的消息类型。

可用消息有

简单消费者(SUB)

extern crate actix;
extern crate nsq_client;

use std::sync::Arc;

use actix::prelude::*;

use nsq_client::{Connection, Msg, Fin, Subscribe, Config};

struct MyReader {
    pub conn: Arc<Addr<Connection>>,
}

impl Actor for MyReader {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        self.subscribe::<Msg>(ctx, self.conn.clone());
    }
}

impl Handler<Msg> for MyReader {
    fn handle(&mut self, msg: Msg, _: &mut Self::Context) {
        println!("MyReader received {:?}", msg);
        if let Ok(body) = String::from_utf8(msg.body) {
              println!("utf8 msg: {}", body);
        }
        self.conn.do_send(Fin(msg.id));
    }
}

fn main() {
    let sys = System::new("consumer");
    let config = Config::default().client_id("consumer");
    let c = Supervisor::start(|_| Connection::new(
        "test", // <- topic
        "test", // <- channel
        "0.0.0.0:4150", // <- nsqd tcp address
        Some(config), // <- config (Optional)
        None, // secret for Auth (Optional)
        Some(2) // <- RDY (Optional default: 1)
    ));
    let conn = Arc::new(c);
    let _ = MyReader{ conn: conn.clone() }.start(); // <- Same thread reader
    let _ = Arbiter::start(|_| MyReader{ conn: conn }); // <- start another reader in different thread
    sys.run();
}

启动nsqd

$ nsqd -verbose

启动reader

$ RUST_LOG=nsq_client=debug cargo run

启动producer

$ cargo run

asciicast

示例

待办事项

  • 发现
  • TLS
  • Snappy
  • 先到先服务读者路由算法。

许可证

许可于

依赖项

~10MB
~183K SLoC