1 个不稳定版本

0.1.0 2019 年 9 月 8 日

#11#kinesis

MIT 许可证

15KB
237

Kineasy


lib.rs:

Kineasy

Kineasy 是一个帮助您使用 AWS Kinesis 服务的库。它非常注重性能。使用此库,您可以在不关心编排的情况下消费具有多个分片的流,您将获得来自多个分片的记录流。

您还可以启用自动检查点,以便在需要时安全地重启服务,此检查点功能将检查点写入磁盘,但您也可以实现自己的写入功能。

示例


use kineasy::{Kineasy, Region, shard::ShardIterator, Record};
use futures_util::stream::StreamExt;
use futures::future;
use tokio;

fn main () {
#

    let run = tokio::runtime::Runtime::new().unwrap();
    

    run.block_on(async {

        let kns = Kineasy::new(Region::Custom {
            name: "custom-region".to_owned(),
            endpoint: "https://127.0.0.1:4568".to_owned()
        }, "kineasy_test_stream".to_owned(), ShardIterator::Latest);

        let stream = kns.stream().await;


        stream
            .take(1)
            .map(|r: Record| {
               let r: TestExample = serde_json::from_str(&String::from_utf8(r.data.to_vec())
                   .expect("Cannot parse this."))
                   .expect("Cannot parse json");
               r
           }).for_each(|parsed| {
               assert_eq!(TestExample {
                   example: "example".to_owned()
               }, parsed);

               future::ready(())
           }).await;
    });

}

依赖项

~21MB
~388K SLoC