3 个稳定版本
4.1.3 | 2022年5月25日 |
---|---|
4.1.2 | 2022年3月7日 |
4.1.1 | 2022年3月2日 |
#2 in #pulsar
每月 21 次下载
305KB
7K SLoC
Pulsar
基于未来的 Rust 客户端,用于 Apache Pulsar
这是一个纯 Rust 客户端,用于 Apache Pulsar,不依赖于 C++ Pulsar 库。它提供了一个基于异步/等待的 API,兼容 Tokio 和 async-std。
特性
- 基于 URL 的(
pulsar:// 和
pulsar+ssl://
)连接,具有 DNS 查找功能 - 多主题消费者(基于正则表达式或列表)
- TLS 连接
- 可配置的执行器(Tokio 或 async-std)
- 自动重新连接,具有指数退避
- 消息批处理
- 使用 LZ4、zlib、zstd 或 Snappy(可以使用 Cargo 功能禁用)进行压缩
入门指南
Cargo.toml
futures = "0.3"
pulsar = { version = "4.1.2", package = "sn_pulsar" }
tokio = "1.0"
生产
use serde::{Serialize, Deserialize};
use pulsar::{
message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};
#[derive(Serialize, Deserialize)]
struct TestData {
data: String,
}
impl<'a> SerializeMessage for &'a TestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload = serde_json::to_vec(input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
}
}
#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
env_logger::init();
let addr = "pulsar://127.0.0.1:6650";
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
let mut producer = pulsar
.producer()
.with_topic("non-persistent://public/default/test")
.with_name("my producer")
.with_options(producer::ProducerOptions {
schema: Some(proto::Schema {
type_: proto::schema::Type::String as i32,
..Default::default()
}),
..Default::default()
})
.build()
.await?;
let mut counter = 0usize;
loop {
producer
.send(TestData {
data: "data".to_string(),
})
.await?;
counter += 1;
println!("{} messages", counter);
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
}
}
消费
#[macro_use]
extern crate serde;
use futures::TryStreamExt;
use pulsar::{
message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage,
Pulsar, TokioExecutor,
};
#[derive(Serialize, Deserialize)]
struct TestData {
data: String,
}
impl DeserializeMessage for TestData {
type Output = Result<TestData, serde_json::Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}
#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
env_logger::init();
let addr = "pulsar://127.0.0.1:6650";
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
let mut consumer: Consumer<TestData, _> = pulsar
.consumer()
.with_topic("test")
.with_consumer_name("test_consumer")
.with_subscription_type(SubType::Exclusive)
.with_subscription("test_subscription")
.build()
.await?;
let mut counter = 0usize;
while let Some(msg) = consumer.try_next().await? {
consumer.ack(&msg).await?;
let data = match msg.deserialize() {
Ok(data) => data,
Err(e) => {
log::error!("could not deserialize message: {:?}", e);
break;
}
};
if data.data.as_str() != "data" {
log::error!("Unexpected payload: {}", &data.data);
break;
}
counter += 1;
log::info!("got {} messages", counter);
}
Ok(())
}
许可证
此库根据 MIT 许可证和 Apache 许可证(版本 2.0)的条款许可,可能包括第三方编写的软件包,这些软件包包含其自己的版权声明和许可条款。
请参阅 LICENSE-APACHE、LICENSE-MIT 和 COPYRIGHT 以获取详细信息。
依赖关系
~11–29MB
~500K SLoC