#tokio #queue #nsq #asynchronous

nsqueue

Rust 的 NSQ 实时消息处理系统客户端

6 个版本

使用旧的 Rust 2015

0.1.5 2017 年 7 月 8 日
0.1.4 2017 年 7 月 1 日
0.1.3 2017 年 6 月 18 日
0.1.2 2017 年 5 月 29 日

#232缓存

MIT/Apache

25KB
543

Build Status Crates.io

nsqueue

基于 TokioNSQ 实时消息处理系统的客户端实现

开发中

当前功能

  • PUB
  • SUB
  • 发现
  • 退避
  • TLS
  • Snappy
  • 认证

启动 NSQ

$ ./nsqlookupd & 
$ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 &
$ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 &

MPUB

extern crate futures;
extern crate tokio_core;
extern crate nsqueue;

use futures::Future;
use tokio_core::reactor::Core;

use nsqueue::config::*;
use nsqueue::producer::*;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let addr = "127.0.0.1:4150".parse().unwrap();

    let mut messages: Vec<String> = Vec::new();
    messages.push("First message".into());
    messages.push("Second message".into());

    let res = Producer::connect(&addr, &handle, Config::default())
       .and_then(|conn| {
           conn.mpublish("some_topic".into(), messages)
           .and_then(move |response| {
              println!("Response: {:?}", response);
              Ok(())
           })
       });
    core.run(res).unwrap();
}

SUB

extern crate futures;
extern crate tokio_core;
extern crate nsqueue;

use futures::{Stream, Future};
use tokio_core::reactor::Core;
use nsqueue::config::*;
use nsqueue::consumer::*;

fn main() {
     let mut core = Core::new().unwrap();
     let handle = core.handle();
     let addr = "127.0.0.1:4150".parse().unwrap();

     core.run(
         Consumer::connect(&addr, &handle, Config::default())
         .and_then(|conn| {
            conn.subscribe("some_topic".into(), "some_channel".into())
            .and_then(move |response| {
                let ret = response.for_each(move |message| {
                    if message.message_id == "_heartbeat_" {
                        conn.nop();
                    } else {
                        println!("Response {:?} {:?}", message.message_id, message.message_body);
                        conn.fin(message.message_id); // Inform NSQ (Message consumed)
                    }
                    Ok(())
                });
                ret
            })
         })
     ).unwrap();
}

许可证

许可协议为以下之一

依赖关系

~8MB
~135K SLoC