4 个版本
0.1.3 | 2019 年 5 月 22 日 |
---|---|
0.1.2 | 2019 年 5 月 22 日 |
0.1.1 | 2018 年 10 月 31 日 |
0.1.0 | 2018 年 10 月 30 日 |
#7 在 #pulsar 中
390KB
9K SLoC
Pulsar
Apache Pulsar 的基于未来的 Rust 绑定
当前状态:最简单功能的基本实现。应处于添加额外功能的好位置,但至今仅实现了最基本的操作。
入门
Cargo.toml
futures = "0.1.23"
pulsar = "0.1.1"
tokio = "0.1.11"
# If you want connection pooling
r2d2 = "0.8.2"
r2d2_pulsar = "0.1.1"
# If you want to use serde
serde = "1.0.80"
serde_derive = "1.0"
serde_json = "1.0.32"
main.rs
extern crate pulsar;
// if you want connection pooling
extern crate r2d2_pulsar;
extern crate r2d2;
// if you want serde
extern crate serde;
extern crate serde_json;
#[macro_use] extern crate serde_derive;
生产
use tokio::runtime::Runtime;
use futures::future;
use pulsar::Producer;
pub struct SomeData {
...
}
fn serialize(data: &SomeData) -> Vec<u8> {
...
}
fn main() {
let pulsar_addr = "...";
let producer_name = "some_producer_name";
let runtime = Runtime::new().unwrap();
let producer = Producer::new(pulsar_addr, producer_name, runtime.executor())
.wait()
.unwrap();
let data = SomeData { ... };
let serialized = serialize(&data);
let send_1 = producer.send_raw("some_topic", serialized);
let send_2 = producer.send_raw("some_topic", serialized);
let send_3 = producer.send_raw("some_topic", serialized);
future::join_all(vec![send_1, send_2, send_3]).wait().unwrap();
runtime.shutdown_now().wait().unwrap();
}
消费
use tokio::runtime::Runtime;
use futures::{Stream, future};
use pulsar::{Ack, Consumer, Error, SubType};
pub struct SomeData {
...
}
fn deserialize(data: &[u8]) -> Result<SomeData, Error> {
...
}
fn main() {
let pulsar_addr = "...";
let runtime = Runtime::new().unwrap();
let consumer = ConsumerBuilder::new(pulsar_addr, None, None, runtime.executor())
.with_topic("some_topic")
.with_subscription_type(SubType::Exclusive)
.with_subscription("some_subscription_name")
.with_deserializer(|payload| deserialize(&payload.data))
.build()
.wait()
.unwrap();
let consumption_result = consumer
.for_each(|(msg, ack): (Result<SomeData, Error>, Ack)| match msg {
Ok(data) => {
// process data
ack.ack();
Ok(())
},
Err(e) => {
println!("Got an error: {}", e);
Ok(())
// return Err(_) to instead shutdown consumer
}
})
.wait();
// handle error, reconnect, etc
runtime.shutdown_now().wait().unwrap();
}
连接池
use r2d2_pulsar::ProducerConnectionManager;
let addr = "127.0.0.1:6650";
let runtime = Runtime::new().unwrap();
let pool = r2d2::Pool::new(ProducerConnectionManager::new(
addr,
"r2d2_test_producer",
runtime.executor()
)).unwrap();
let mut a = pool.get().unwrap();
let mut b = pool.get().unwrap();
let data1: Vec<u8> = ...;
let data2: Vec<u8> = ...;
let send_1 = a.send_raw("some_topic", data1);
let send_2 = b.send_raw("some_topic", data2);
send_1.join(send_2).wait().unwrap();
runtime.shutdown_now().wait().unwrap();
Serde
#[derive(Debug, Serialize, Deserialize)]
struct SomeData {
...
}
fn process_data(data: Result<SomeData, Error>) -> Result<(), Error> {
...
}
fn main() {
let pulsar_addr = "...";
let runtime = Runtime::new().unwrap();
let producer = Producer::new(pulsar_addr, None, None, None, runtime.executor())
.wait()
.unwrap();
let consumer = ConsumerBuilder::new(pulsar_addr, runtime.executor())
.with_topic("some_topic")
.with_subscription_type(SubType::Exclusive)
.with_subscription("some_subscription_name")
.build()
.wait()
.unwrap();
let send_1 = producer.send_json("some_topic", &SomeData { ... });
let send_2 = producer.send_json("some_topic", &SomeData { ... });
let send_3 = producer.send_json("some_topic", &SomeData { ... });
future::join_all(vec![send_1, send_2, send_3]).wait().unwrap();
let consumption_result = consumer
.for_each(|(msg, ack)| process_data(msg, ack))
.wait();
runtime.shutdown_now().wait().unwrap();
}
许可证
此库根据 MIT 许可证和 Apache 许可证(版本 2.0)的条款许可,可能包含第三方编写的包,这些包具有自己的版权声明和许可条款。
请参阅 LICENSE-APACHE、LICENSE-MIT 和 COPYRIGHT 以获取详细信息。
依赖项
~17–27MB
~406K SLoC