#tokio #nsq #client #built #package #complete #aims

tokio-nsq

基于Tokio构建的Rust NSQ客户端。Tokio NSQ旨在成为一个功能完整的NSQ客户端实现。

34次发布

0.14.0 2023年6月28日
0.13.0 2022年2月2日
0.12.0 2021年3月31日
0.11.0 2020年8月26日
0.5.1 2020年7月30日

#950 in 异步


3 个crate中使用(通过caisin

BSD-3-Clause

82KB
2K SLoC

Tokio NSQ

GitHub Actions crates.io

基于 NSQ 的Rust客户端,建立在 Tokio 之上。Tokio NSQ旨在成为一个功能完整的NSQ客户端实现。

Tokio NSQ作为cargo包提供,API文档可在docs.rs上找到。

版本控制

本项目遵循严格的语义版本控制。在 1.0.0 之前,破坏性变更只会增加小版本号。

基本消费者示例

let topic   = NSQTopic::new("names").unwrap();
let channel = NSQChannel::new("first").unwrap();

let mut addresses = HashSet::new();
addresses.insert("http://127.0.0.1:4161".to_string());

let mut consumer = NSQConsumerConfig::new(topic, channel)
    .set_max_in_flight(15)
    .set_sources(
        NSQConsumerConfigSources::Lookup(
            NSQConsumerLookupConfig::new().set_addresses(addresses)
        )
    )
    .build();

let mut message = consumer.consume_filtered().await.unwrap();

let message_body_str = std::str::from_utf8(&message.body).unwrap();
println!("message body = {}", message_body_str);

message.finish();

基本生产者示例

let topic = NSQTopic::new("names").unwrap();

let mut producer = NSQProducerConfig::new("127.0.0.1:4150").build();

// Wait until a connection is initialized
assert_matches!(producer.consume().await.unwrap(), NSQEvent::Healthy());
// Publish a single message
producer.publish(&topic, b"alice1".to_vec()).unwrap();
// Wait until the message is acknowledged by NSQ
assert_matches!(producer.consume().await.unwrap(), NSQEvent::Ok());

功能

  • 订阅
  • 发布
  • 基于NSQLookupd的发现。
  • 消息重新入队回退
  • NSQD TLS协商
    • 未经验证的服务器证书
    • 自定义证书颁发机构
    • 客户端证书
  • Deflate NSQD压缩
  • Snappy NSQD压缩
  • 抽样
  • 身份验证

依赖

~17–29MB
~534K SLoC