1 个不稳定版本
新版本 0.1.0 | 2024 年 8 月 15 日 |
---|
#224 in 异步
94 每月下载量
用于 2 crates
59KB
713 行
Rust 的 Kapacitor UDF 库
概述
此库为 Kapacitor(InfluxDB 时间序列数据库的本地数据处理引擎)提供创建用户自定义函数(UDFs)的 Rust 实现。它提供了一种高性能、内存高效的替代方案,优于现有的 Python 和 Go 实现。
特性
- 使用 async-std 实现异步 I/O,以有效地处理数据流
- 支持 Unix 套接字和 stdio 通信
- 使用
tracing
crate 进行适当的错误处理和日志记录 - 模块化设计,易于扩展和定制
- 适用于大规模时间序列数据的内存高效处理
安装
将此添加到您的 Cargo.toml
[dependencies]
kapacitor-udf = "0.1.0" # Replace with the actual version
快速入门
以下是如何创建 UDF 处理程序和接受器的简单示例
use kapacitor_udf::{Handler, AccepterTrait, InfoResponse, Point, Server, StdioServer, SocketServer, // ... other imports};
use async_std::os::unix::net::UnixStream;
use async_trait::async_trait;
struct MyHandler;
#[async_trait]
impl Handler for MyHandler {
async fn info(&self) -> Result<InfoResponse, std::io::Error> {
// Implement info method
}
async fn init(&mut self, init: &InitRequest) -> Result<InitResponse, std::io::Error> {
// Implement init method
}
async fn point(&mut self, point: &Point) -> Result<(), std::io::Error> {
// Process each data point
}
// Implement other required methods...
}
#[derive(Debug)]
struct MyAccepter;
impl AccepterTrait for MyAccepter {
fn accept(&self, conn: UnixStream) {
// Handle the incoming connection
async_std::task::spawn(async move {
// Your connection handling logic here
});
}
}
#[async_std::main]
async fn main() -> std::io::Result<()> {
// For Unix socket communication
let handler = Box::new(MyHandler);
let accepter = MyAccepter;
let listener = async_std::os::unix::net::UnixListener::bind("/tmp/my_udf.sock").await?;
let server = SocketServer::new(listener, accepter);
server.serve().await?;
// Or for stdio communication
// let handler = Box::new(MyHandler);
// let mut server = StdioServer::new(handler)?;
// server.run().await
}
用法
- 为您的 UDF 逻辑实现
Handler
特性。 - 如果使用 Unix 套接字通信,实现
AccepterTrait
以处理连接。 - 根据您的通信需求选择
SocketServer
(用于 Unix 套接字)或StdioServer
(用于 stdio)。 - 使用适当的服务器实现启动您的 UDF 服务器。
- 配置 Kapacitor 以使用您的 Rust UDF(有关此步骤,请参阅 InfluxDB 文档)。
性能
虽然全面的基准测试仍在进行中,但初步测试显示与 Python 实现相比内存使用量显著提高。预计性能将与 Go 实现相当或更好,尤其是在计算密集型任务中。
当前状态
这是一个初始版本,已知存在正在积极修复的 bug,尤其是批量处理(请参阅已知问题部分)。核心功能已就绪,但用户应预期未来版本中可能存在一些不稳定性和潜在的 API 变更。流操作已显示出稳定性,但在发布更彻底测试的版本之前,建议谨慎用于生产。
已知问题
-
批量处理错误:批量处理中存在一个间歇性问题,即
beginBatch
消息发送失败。这并非每次都会发生,但用户应意识到批量处理中可能存在的不一致性。我们正在积极调查此问题。 -
流稳定:与批量处理问题相反,到目前为止,流操作已被观察到是稳定的。
我们正在积极解决这些问题,并欢迎社区提供任何反馈或贡献,以帮助提高库的稳定性和性能。
贡献
欢迎贡献!请随时提交拉取请求。
许可证
本项目采用MIT许可证。
致谢
- 感谢InfluxData团队创建了Kapacitor和UDF接口
- 感谢Rust社区提供了出色的异步工具和库
依赖项
~10–22MB
~308K SLoC