1个稳定版本

4.1.1 2022年8月12日

#1966解析器实现

MIT/Apache

315KB
7K SLoC

Pulsar

基于未来的Rust客户端,用于Apache Pulsar

文档

这是一个纯Rust客户端,用于Apache Pulsar,不依赖于C++ Pulsar库。它提供了一个基于async/await的API,兼容Tokioasync-std

功能

  • 基于URL的(pulsar://pulsar+ssl://)连接,带有DNS查找
  • 基于正则表达式或列表的多主题消费者
  • TLS连接
  • 可配置的执行器(Tokio或async-std)
  • 自动重连,指数退避
  • 消息批处理
  • 使用LZ4、zlib、zstd或Snappy(可以通过Cargo功能禁用)进行压缩

入门指南

Cargo.toml

futures = "0.3"
pulsar = "4.0"
tokio = "1.0"

生产

use serde::{Serialize, Deserialize};
use pulsar::{
    message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl<'a> SerializeMessage for &'a TestData {
    fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
        let payload = serde_json::to_vec(input).map_err(|e| PulsarError::Custom(e.to_string()))?;
        Ok(producer::Message {
            payload,
            ..Default::default()
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("non-persistent://public/default/test")
        .with_name("my producer")
        .with_options(producer::ProducerOptions {
            schema: Some(proto::Schema {
                type_: proto::schema::Type::String as i32,
                ..Default::default()
            }),
            ..Default::default()
        })
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(TestData {
                data: "data".to_string(),
            })
            .await?;

        counter += 1;
        println!("{} messages", counter);
        tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
    }
}

消费

#[macro_use]
extern crate serde;
use futures::TryStreamExt;
use pulsar::{
    message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage,
    Pulsar, TokioExecutor,
};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl DeserializeMessage for TestData {
    type Output = Result<TestData, serde_json::Error>;

    fn deserialize_message(payload: &Payload) -> Self::Output {
        serde_json::from_slice(&payload.data)
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;

    let mut consumer: Consumer<TestData, _> = pulsar
        .consumer()
        .with_topic("test")
        .with_consumer_name("test_consumer")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("test_subscription")
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let data = match msg.deserialize() {
            Ok(data) => data,
            Err(e) => {
                log::error!("could not deserialize message: {:?}", e);
                break;
            }
        };

        if data.data.as_str() != "data" {
            log::error!("Unexpected payload: {}", &data.data);
            break;
        }
        counter += 1;
        log::info!("got {} messages", counter);
    }

    Ok(())
}

项目维护者

贡献

本项目欢迎您的PR和问题。例如,重构、添加功能、纠正英语等。

感谢所有已经做出贡献的人!

许可

本库根据MIT许可和Apache许可(版本2.0)的条款进行许可,可能包含第三方编写的软件包,这些软件包携带自己的版权声明和许可条款。

有关详细信息,请参阅LICENSE-APACHELICENSE-MITCOPYRIGHT

依赖

~10–28MB
~479K SLoC