5个版本

0.2.2 2023年1月11日
0.2.1 2023年1月6日
0.2.0 2023年1月3日
0.1.1 2022年10月6日
0.1.0 2022年9月8日

#5 in #released

BSD-3-Clause

78KB
2K SLoC

hstreamdb-rust

github crates.io docs.rs build status

Rust客户端库用于HStreamDB。

兼容性

此库为实验性且仍在开发中,请使用在 crates.io 上发布的最新版本。

客户端库版本 HStream服务器版本
v0.1.* >= v0.9.4 && <= v0.9.7
v0.2.* >= v0.9.4 && <= v0.12.0

示例用法

将数据写入流

use std::env;

use hstreamdb::client::Client;
use hstreamdb::producer::FlushSettings;
use hstreamdb::{CompressionType, Payload, Record, Stream};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

async fn produce_example() -> anyhow::Result<()> {
    let mut client = Client::new(env::var("TEST_SERVER_ADDR")?).await?;

    let stream_name = "test_stream";

    client
        .create_stream(Stream {
            stream_name: "test_stream".to_string(),
            replication_factor: 3,
            backlog_duration: 7 * 24 * 3600,
            shard_count: 12,
        })
        .await?;
    println!("{:?}", client.list_streams().await?);

    // `Appender` is cheap to clone
    let (appender, mut producer) = client
        .new_producer(
            stream_name.to_string(),
            hstreamdb_pb::CompressionType::Zstd,
            FlushSettings {
                len: 10,
                size: 4000 * 20,
            },
        )
        .await?;

    _ = tokio::spawn(async move {
        let mut appender = appender;

        for _ in 0..10 {
            for _ in 0..100 {
                let i: u32 = rand::random();
                let payload: Vec<u8> = thread_rng()
                    .sample_iter(&Alphanumeric)
                    .take(20)
                    .map(char::from)
                    .collect::<String>()
                    .into_bytes();
                appender
                    .append(Record {
                        partition_key: format!("test_partition_key_{i}"),
                        payload: Payload::RawRecord(payload),
                    })
                    .unwrap();
            }
        }
        drop(appender)
    });

    // when all `Appender`s for the corresponding `Producer` have been dropped,
    // the `Producer` will wait for all requests to be done and then stop
    producer.start().await;

    Ok(())
}

从订阅中读取数据

use std::env;

use hstreamdb::client::Client;
use hstreamdb::{SpecialOffset, Subscription};
use tokio_stream::StreamExt;

async fn consume_example() -> anyhow::Result<()> {
    let addr = env::var("TEST_SERVER_ADDR").unwrap();
    let mut client = Client::new(addr).await.unwrap();

    let stream_name = "test_stream";
    let subscription_id = "test_subscription";

    client
        .create_subscription(Subscription {
            subscription_id: subscription_id.to_string(),
            stream_name: stream_name.to_string(),
            ack_timeout_seconds: 60 * 60,
            max_unacked_records: 1000,
            offset: SpecialOffset::Earliest,
        })
        .await?;
    println!("{:?}", client.list_subscriptions().await?);

    let mut stream = client
        .streaming_fetch("test_consumer".to_string(), subscription_id.to_string())
        .await
        .unwrap();
    let mut records = Vec::new();
    while let Some((record, ack)) = stream.next().await {
        println!("{record:?}");
        records.push(record);
        ack().unwrap();
        if records.len() == 10 * 100 {
            println!("done");
            break;
        }
    }

    client
        .delete_subscription(subscription_id.to_string(), true)
        .await?;

    Ok(())
}

依赖项

~17–29MB
~540K SLoC