18个版本
0.7.4 | 2024年7月23日 |
---|---|
0.7.3 | 2023年9月28日 |
0.7.2 | 2023年7月29日 |
0.5.0 | 2023年3月31日 |
0.1.0 | 2021年11月23日 |
在WebAssembly中排名226
每月下载量2,217
在5个crate(3直接)中使用
285KB
5.5K SLoC
Fluvio SmartModules
此crate提供了用于创建自定义SmartModules的类型和宏,SmartModules是WebAssembly模块,可用于修改Fluvio流的行为。目前支持的SmartModule类型是filter
,它可以描述要保留在流中或丢弃的记录。
编写SmartModules
请参阅
examples
目录以获取完整示例。
所有SmartModules都需要在Cargo.toml中添加crate-type = ['cdylib']
。对于使用cargo-generate
的快速设置,请参阅SmartModule模板。
[package]
name = "fluvio-wasm-filter"
version = "0.1.0"
authors = ["Fluvio Contributors <[email protected]>"]
edition = "2018"
[lib]
crate-type = ['cdylib']
[dependencies]
fluvio-smartmodule = "0.1.0"
过滤
对于过滤,使用#[smartmodule(filter)]
在您的顶级函数上编写您的smartmodule。将其视为SmartModule的“主”函数。
use fluvio_smartmodule::{smartmodule, Record, Result};
#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains('a'))
}
此过滤器将仅保留包含字母a
的记录。
映射
映射函数使用#[smartmodule(map)]
,也是顶级入口点。
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();
let string = std::str::from_utf8(record.value.as_ref())?;
let int = string.parse::<i32>()?;
let value = (int * 2).to_string();
Ok((key, value.into()))
}
此SmartModule将读取每个输入记录作为整数(i32
),然后将其乘以2。
聚合
聚合函数是一种将多个输入记录中的数据合并的方法。每次调用聚合函数时,它都会收到一个“累积”值以及流中当前记录的值,并期望将累积值与当前值合并以生成一个新的累积值。这个新的累积值将被传递给下一次调用 aggregate
的下一个记录值。结果值流是每一步的输出累积值。
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?;
let next = std::str::from_utf8(current.value.as_ref())?;
acc.push_str(next);
Ok(acc.into())
}
此SmartModule将每个记录读取为字符串并将其追加到累积字符串中。
ArrayMap
ArrayMap函数用于将一个输入记录转换成零个或多个输出记录。这可以用来将逻辑上表示多个数据点的输入记录分割成独立的记录。以下是一个示例,我们将JSON数组转换成内部JSON对象的流。
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Read the input record as a JSON array
let array = serde_json::from_slice::<Vec<serde_json::Value>>(record.value.as_ref())?;
// Convert each individual value from the array into its own JSON string
let strings = array
.into_iter()
.map(|value| serde_json::to_string(&value))
.collect::<core::result::Result<Vec<String>, _>>()?;
// Return a list of records to be flattened into the output stream
let kvs = strings
.into_iter()
.map(|s| (None, RecordData::from(s)))
.collect::<Vec<_>>();
Ok(kvs)
}
许可证
本项目采用Apache许可证。
贡献
除非你明确表示,否则你提交给Fluvio的任何贡献,均应按照Apache许可证发布,没有任何附加条款或条件。
依赖关系
~4MB
~79K SLoC