#tarantool #kafka #分布式系统 #通信 #线程 #消费者 #驱动

picokafka

基于 librdkafka 的 tarantool-module 的 Kafka 库

15 个版本

0.3.0 2024年2月19日
0.2.0 2023年9月8日
0.1.12 2024年6月21日
0.1.11 2023年12月14日
0.1.2 2022年11月25日

#102 in 并发

Download history 170/week @ 2024-05-02 153/week @ 2024-05-09 132/week @ 2024-05-16 264/week @ 2024-05-23 384/week @ 2024-05-30 332/week @ 2024-06-06 168/week @ 2024-06-13 194/week @ 2024-06-20 104/week @ 2024-06-27 102/week @ 2024-07-04 32/week @ 2024-07-11 55/week @ 2024-07-18 102/week @ 2024-07-25 72/week @ 2024-08-01 84/week @ 2024-08-08 64/week @ 2024-08-15

每月 353 次下载

BSD-2-Clause

26KB
494 代码行

PICOKAFKA

Latest Version Docs badge

PICOKAFKA 是基于 tarantool-module 构建的分布式系统 Kafka 驱动程序。此驱动程序使用 cbus 通道在 tokio 和 tarantool 线程之间进行通信。请先熟悉它。

消费者

创建新消费者

use std::rc::Rc;
use picokafka::consumer;
use tarantool::fiber::{Fiber, Mutex};
use tarantool::cbus::Endpoint;

pub fn main() {
    // create cbus endpoint in separate fiber
    let mut fiber = Fiber::new("f1", &mut |_: Box<()>| {
        let cbus_endpoint =
            Endpoint::new("my_endpoint").expect("error on start cbus endpoint");
        cbus_endpoint.cbus_loop();
        0
    });
    fiber.start(());

    // buffer for consumed messages
    let consumed = Rc::new(Mutex::new(vec![]));

    // create handler for received messages, this handler will executed in
    // tarantool TX thread, so any tarantool API's can used
    let message_handler = {
        let consumed = consumed.clone();
        move |msg, _ctrl| consumed.lock().push(msg)
    };
    
    // create consumer and set the callback for consumed messages
    let consumer =
        consumer::Builder::new("kafka:29092")
            .with_group("group_1")
            .append_topic("topic_1")
            .start("my_endpoint", message_handler);
}

您可以通过 Builder::with_opts 方法传递额外的 librdkafka 配置参数,见: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

use picokafka::consumer;

let consumer = consumer::Builder::new("kafka:29092")
    .with_opt("enable.auto.offset.store", "false")
    .with_session_timeout(Duration::from_secs(10))
    .start("my_endpoint", |_, _| {});

请注意,回调在 tarantool TX 线程中的特殊纤维中执行。

生产者

创建新生产者

use picokafka::producer;

let producer = producer::Builder::new(&*BROKER_ADDR)
    .with_message_timeout(Duration::from_secs(1))
    .build("my_endpoint")
    .unwrap();

您可以通过 Builder::with_opts 方法传递额外的 librdkafka 配置参数。

发送消息

static SEEN_RESULT: AtomicBool = AtomicBool::new(false);

producer.send(
    Record::new("topic_1")
        .key(String::from("key_1"))
        .payload(String::from("payload_1")),
    Duration::from_secs(1),
    |res, _| {
        assert!(res.result.is_ok());
        SEEN_RESULT.store(true, Ordering::SeqCst);
    },
);

请注意,发送回调在 tarantool TX 线程中的特殊纤维中执行。

SSL 和 SASL

要启用 ssl 和 sasl 协议,请使用 "ssl" 功能。有关熟悉它的信息,请参阅身份验证测试。

统计信息

Picokafka 支持统计信息回调,使用生产者/接收器的自定义上下文实现来获取它。请注意,所有在上下文中实现的回调都将执行在 librdkafka 线程中,而不是 TX 线程中。有关统计信息格式: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md。有关示例,请参阅测试。

  • 生产者 - test_producer_statistic
  • 消费者 - test_consumer_statistic

测试

在测试之前需要启动 Kafka。您可以使用 tests/docker-compose.yml 文件

    docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
    docker-compose -f tests/docker-compose.yml up -d

或创建自己的环境(如果您这样做,请设置 KAFKA_ADDR 和 KAFKA_REST_ADDR)。

然后使用 tarantool-test 工具

    cargo build
    tarantool-test -p ./target/debug/libtests.so

如果您在 Kafka 实例中没有配置 SASL,则可能需要跳过身份验证测试

    cargo build --features skip_ssl_test
    tarantool-test -p ./target/debug/libtests.so

基准测试

运行基准测试(使用 tarantool-runner 工具)

    cargo build
    tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint

发送 1000 条消息的结果

producer_sync 10000 messages (1 samples)
[ave.] 32.461472ms
32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)

消费 1_000_000 条消息的结果

consume 1000000 messages (1 samples)
[ave.] 6.182463391s
6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)

依赖项

~20–27MB
~357K SLoC