18个版本

0.7.4 2024年7月23日
0.7.3 2023年9月28日
0.7.2 2023年7月29日
0.5.0 2023年3月31日
0.1.0 2021年11月23日

WebAssembly中排名226

Download history 1417/week @ 2024-04-22 876/week @ 2024-04-29 691/week @ 2024-05-06 496/week @ 2024-05-13 866/week @ 2024-05-20 989/week @ 2024-05-27 512/week @ 2024-06-03 495/week @ 2024-06-10 722/week @ 2024-06-17 442/week @ 2024-06-24 440/week @ 2024-07-01 400/week @ 2024-07-08 417/week @ 2024-07-15 804/week @ 2024-07-22 388/week @ 2024-07-29 601/week @ 2024-08-05

每月下载量2,217
5个crate(3直接)中使用

Apache-2.0

285KB
5.5K SLoC

Fluvio SmartModules

此crate提供了用于创建自定义SmartModules的类型和宏,SmartModules是WebAssembly模块,可用于修改Fluvio流的行为。目前支持的SmartModule类型是filter,它可以描述要保留在流中或丢弃的记录。

编写SmartModules

请参阅examples目录以获取完整示例。

所有SmartModules都需要在Cargo.toml中添加crate-type = ['cdylib']。对于使用cargo-generate的快速设置,请参阅SmartModule模板

[package]
name = "fluvio-wasm-filter"
version = "0.1.0"
authors = ["Fluvio Contributors <[email protected]>"]
edition = "2018"

[lib]
crate-type = ['cdylib']

[dependencies]
fluvio-smartmodule = "0.1.0"

过滤

对于过滤,使用#[smartmodule(filter)]在您的顶级函数上编写您的smartmodule。将其视为SmartModule的“主”函数。

use fluvio_smartmodule::{smartmodule, Record, Result};

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
    let string = std::str::from_utf8(record.value.as_ref())?;
    Ok(string.contains('a'))
}

此过滤器将仅保留包含字母a的记录。

映射

映射函数使用#[smartmodule(map)],也是顶级入口点。

use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
    let key = record.key.clone();

    let string = std::str::from_utf8(record.value.as_ref())?;
    let int = string.parse::<i32>()?;
    let value = (int * 2).to_string();

    Ok((key, value.into()))
}

此SmartModule将读取每个输入记录作为整数(i32),然后将其乘以2。

聚合

聚合函数是一种将多个输入记录中的数据合并的方法。每次调用聚合函数时,它都会收到一个“累积”值以及流中当前记录的值,并期望将累积值与当前值合并以生成一个新的累积值。这个新的累积值将被传递给下一次调用 aggregate 的下一个记录值。结果值流是每一步的输出累积值。

use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
    let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?;
    let next = std::str::from_utf8(current.value.as_ref())?;
    acc.push_str(next);
    Ok(acc.into())
}

此SmartModule将每个记录读取为字符串并将其追加到累积字符串中。

ArrayMap

ArrayMap函数用于将一个输入记录转换成零个或多个输出记录。这可以用来将逻辑上表示多个数据点的输入记录分割成独立的记录。以下是一个示例,我们将JSON数组转换成内部JSON对象的流。

use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
    // Read the input record as a JSON array
    let array = serde_json::from_slice::<Vec<serde_json::Value>>(record.value.as_ref())?;
    
    // Convert each individual value from the array into its own JSON string
    let strings = array
        .into_iter()
        .map(|value| serde_json::to_string(&value))
        .collect::<core::result::Result<Vec<String>, _>>()?;
        
    // Return a list of records to be flattened into the output stream
    let kvs = strings
        .into_iter()
        .map(|s| (None, RecordData::from(s)))
        .collect::<Vec<_>>();
    Ok(kvs)
}

许可证

本项目采用Apache许可证

贡献

除非你明确表示,否则你提交给Fluvio的任何贡献,均应按照Apache许可证发布,没有任何附加条款或条件。

依赖关系

~4MB
~79K SLoC