15 个版本
0.3.0 | 2024年2月19日 |
---|---|
0.2.0 | 2023年9月8日 |
0.1.12 | 2024年6月21日 |
0.1.11 | 2023年12月14日 |
0.1.2 | 2022年11月25日 |
#102 in 并发
每月 353 次下载
26KB
494 代码行
PICOKAFKA
PICOKAFKA 是基于 tarantool-module 构建的分布式系统 Kafka 驱动程序。此驱动程序使用 cbus 通道在 tokio 和 tarantool 线程之间进行通信。请先熟悉它。
消费者
创建新消费者
use std::rc::Rc;
use picokafka::consumer;
use tarantool::fiber::{Fiber, Mutex};
use tarantool::cbus::Endpoint;
pub fn main() {
// create cbus endpoint in separate fiber
let mut fiber = Fiber::new("f1", &mut |_: Box<()>| {
let cbus_endpoint =
Endpoint::new("my_endpoint").expect("error on start cbus endpoint");
cbus_endpoint.cbus_loop();
0
});
fiber.start(());
// buffer for consumed messages
let consumed = Rc::new(Mutex::new(vec![]));
// create handler for received messages, this handler will executed in
// tarantool TX thread, so any tarantool API's can used
let message_handler = {
let consumed = consumed.clone();
move |msg, _ctrl| consumed.lock().push(msg)
};
// create consumer and set the callback for consumed messages
let consumer =
consumer::Builder::new("kafka:29092")
.with_group("group_1")
.append_topic("topic_1")
.start("my_endpoint", message_handler);
}
您可以通过 Builder::with_opts
方法传递额外的 librdkafka 配置参数,见: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
use picokafka::consumer;
let consumer = consumer::Builder::new("kafka:29092")
.with_opt("enable.auto.offset.store", "false")
.with_session_timeout(Duration::from_secs(10))
.start("my_endpoint", |_, _| {});
请注意,回调在 tarantool TX 线程中的特殊纤维中执行。
生产者
创建新生产者
use picokafka::producer;
let producer = producer::Builder::new(&*BROKER_ADDR)
.with_message_timeout(Duration::from_secs(1))
.build("my_endpoint")
.unwrap();
您可以通过 Builder::with_opts
方法传递额外的 librdkafka 配置参数。
发送消息
static SEEN_RESULT: AtomicBool = AtomicBool::new(false);
producer.send(
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
Duration::from_secs(1),
|res, _| {
assert!(res.result.is_ok());
SEEN_RESULT.store(true, Ordering::SeqCst);
},
);
请注意,发送回调在 tarantool TX 线程中的特殊纤维中执行。
SSL 和 SASL
要启用 ssl 和 sasl 协议,请使用 "ssl" 功能。有关熟悉它的信息,请参阅身份验证测试。
统计信息
Picokafka 支持统计信息回调,使用生产者/接收器的自定义上下文实现来获取它。请注意,所有在上下文中实现的回调都将执行在 librdkafka 线程中,而不是 TX 线程中。有关统计信息格式: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md。有关示例,请参阅测试。
- 生产者 -
test_producer_statistic
- 消费者 -
test_consumer_statistic
测试
在测试之前需要启动 Kafka。您可以使用 tests/docker-compose.yml 文件
docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
docker-compose -f tests/docker-compose.yml up -d
或创建自己的环境(如果您这样做,请设置 KAFKA_ADDR 和 KAFKA_REST_ADDR)。
然后使用 tarantool-test 工具
cargo build
tarantool-test -p ./target/debug/libtests.so
如果您在 Kafka 实例中没有配置 SASL,则可能需要跳过身份验证测试
cargo build --features skip_ssl_test
tarantool-test -p ./target/debug/libtests.so
基准测试
运行基准测试(使用 tarantool-runner 工具)
cargo build
tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint
发送 1000 条消息的结果
producer_sync 10000 messages (1 samples)
[ave.] 32.461472ms
32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)
消费 1_000_000 条消息的结果
consume 1000000 messages (1 samples)
[ave.] 6.182463391s
6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)
依赖项
~20–27MB
~357K SLoC