80 个版本

0.23.1 2024 年 8 月 6 日
0.23.0 2024 年 7 月 23 日
0.22.3 2024 年 6 月 7 日
0.21.7 2024 年 3 月 4 日
0.2.3 2020 年 11 月 11 日

#71数据库接口

Download history 338/week @ 2024-04-27 286/week @ 2024-05-04 154/week @ 2024-05-11 204/week @ 2024-05-18 175/week @ 2024-05-25 225/week @ 2024-06-01 156/week @ 2024-06-08 126/week @ 2024-06-15 77/week @ 2024-06-22 119/week @ 2024-06-29 108/week @ 2024-07-06 58/week @ 2024-07-13 220/week @ 2024-07-20 124/week @ 2024-07-27 246/week @ 2024-08-03 118/week @ 2024-08-10

709 每月下载
用于 4 crates

Apache-2.0

1MB
22K SLoC

CI Status CD Status fluvio Crates.io version Fluvio Rust documentation Fluvio dependency status Fluvio Discord

什么是 Fluvio?

Fluvio 是一个用 Rust 编写的可编程数据流平台。使用 Fluvio,您可以创建可扩展的高性能实时应用程序。

官方网站 上了解更多关于 Fluvio 的信息。

入门

让我们用 Fluvio 编写一个非常简单的解决方案,在下面的演示中,我们将使用 Fluvio CLI 创建一个主题,然后在此主题上生成一些记录。最后,这些记录将从主题中消费并打印到 stdout。

  1. 如果您尚未安装,请安装 Fluvio CLI

  2. 使用 CLI 创建新主题

fluvio topic create "echo-test"
  1. 创建一个新的 cargo 项目并安装 fluviofuturesasync-std
cargo add fluvio
cargo add futures
cargo add async-std --features attributes
  1. 将以下片段复制并粘贴到您的 src/main.rs
use std::time::Duration;

use fluvio::{Offset, RecordKey};
use futures::StreamExt;

const TOPIC: &str = "echo-test";
const MAX_RECORDS: u8 = 10;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let producer = fluvio::producer(TOPIC).await?;
    let consumer = fluvio::consumer(TOPIC, 0).await?;
    let mut consumed_records: u8 = 0;

    for i in 0..10 {
        producer.send(RecordKey::NULL, format!("Hello from Fluvio {}!", i)).await?;
        println!("[PRODUCER] sent record {}", i);
        async_std::task::sleep(Duration::from_secs(1)).await;
    }

    // Fluvio batches records by default, call flush() when done producing
    // to ensure all records are sent
    producer.flush().await?;

    let mut stream = consumer.stream(Offset::beginning()).await?;

    while let Some(Ok(record)) = stream.next().await {
        let value_str = record.get_value().as_utf8_lossy_string();

        println!("[CONSUMER] Got record: {}", value_str);
        consumed_records += 1;

        if consumed_records >= MAX_RECORDS {
            break;
        }
    }

    Ok(())
}
  1. 运行 cargo run 并期望以下输出
[PRODUCER] sent record 0
[PRODUCER] sent record 1
[PRODUCER] sent record 2
[PRODUCER] sent record 3
[PRODUCER] sent record 4
[PRODUCER] sent record 5
[PRODUCER] sent record 6
[PRODUCER] sent record 7
[PRODUCER] sent record 8
[PRODUCER] sent record 9
[CONSUMER] Got record: Hello, Fluvio 0!
[CONSUMER] Got record: Hello, Fluvio 1!
[CONSUMER] Got record: Hello, Fluvio 2!
[CONSUMER] Got record: Hello, Fluvio 3!
[CONSUMER] Got record: Hello, Fluvio 4!
[CONSUMER] Got record: Hello, Fluvio 5!
[CONSUMER] Got record: Hello, Fluvio 6!
[CONSUMER] Got record: Hello, Fluvio 7!
[CONSUMER] Got record: Hello, Fluvio 8!
[CONSUMER] Got record: Hello, Fluvio 9!
  1. 清理
fluvio topic delete echo-test
topic "echo-test" deleted

了解更多

  • 阅读教程 以充分利用 Fluvio 和 InfinyOn Cloud 以扩展您的流式解决方案。

  • 您可以使用 Fluvio 通过 连接器 从不同的来源发送或接收记录。

  • 如果您想在运行时过滤或转换记录,请阅读有关 智能模块 的更多信息。

依赖项

~13–30MB
~494K SLoC