#udf #io-error #time-series #streaming #kapacitor #influxdata #unix-socket

kapacitor-udf

用于创建 Kapacitor 用户自定义函数(UDFs)的 Rust 库

1 个不稳定版本

新版本 0.1.0 2024 年 8 月 15 日

#224 in 异步

Download history 94/week @ 2024-08-10

94 每月下载量
用于 2 crates

MIT 许可证

59KB
713

Rust 的 Kapacitor UDF 库

概述

此库为 Kapacitor(InfluxDB 时间序列数据库的本地数据处理引擎)提供创建用户自定义函数(UDFs)的 Rust 实现。它提供了一种高性能、内存高效的替代方案,优于现有的 Python 和 Go 实现。

特性

  • 使用 async-std 实现异步 I/O,以有效地处理数据流
  • 支持 Unix 套接字和 stdio 通信
  • 使用 tracing crate 进行适当的错误处理和日志记录
  • 模块化设计,易于扩展和定制
  • 适用于大规模时间序列数据的内存高效处理

安装

将此添加到您的 Cargo.toml

[dependencies]
kapacitor-udf = "0.1.0"  # Replace with the actual version

快速入门

以下是如何创建 UDF 处理程序和接受器的简单示例

use kapacitor_udf::{Handler, AccepterTrait, InfoResponse, Point, Server, StdioServer, SocketServer, // ... other imports};
use async_std::os::unix::net::UnixStream;
use async_trait::async_trait;

struct MyHandler;

#[async_trait]
impl Handler for MyHandler {
    async fn info(&self) -> Result<InfoResponse, std::io::Error> {
        // Implement info method
    }

    async fn init(&mut self, init: &InitRequest) -> Result<InitResponse, std::io::Error> {
        // Implement init method
    }

    async fn point(&mut self, point: &Point) -> Result<(), std::io::Error> {
        // Process each data point
    }

    // Implement other required methods...
}

#[derive(Debug)]
struct MyAccepter;

impl AccepterTrait for MyAccepter {
    fn accept(&self, conn: UnixStream) {
        // Handle the incoming connection
        async_std::task::spawn(async move {
            // Your connection handling logic here
        });
    }
}

#[async_std::main]
async fn main() -> std::io::Result<()> {
    // For Unix socket communication
    let handler = Box::new(MyHandler);
    let accepter = MyAccepter;
    let listener = async_std::os::unix::net::UnixListener::bind("/tmp/my_udf.sock").await?;
    let server = SocketServer::new(listener, accepter);
    server.serve().await?;

    // Or for stdio communication
    // let handler = Box::new(MyHandler);
    // let mut server = StdioServer::new(handler)?;
    // server.run().await
}

用法

  1. 为您的 UDF 逻辑实现 Handler 特性。
  2. 如果使用 Unix 套接字通信,实现 AccepterTrait 以处理连接。
  3. 根据您的通信需求选择 SocketServer(用于 Unix 套接字)或 StdioServer(用于 stdio)。
  4. 使用适当的服务器实现启动您的 UDF 服务器。
  5. 配置 Kapacitor 以使用您的 Rust UDF(有关此步骤,请参阅 InfluxDB 文档)。

性能

虽然全面的基准测试仍在进行中,但初步测试显示与 Python 实现相比内存使用量显著提高。预计性能将与 Go 实现相当或更好,尤其是在计算密集型任务中。

当前状态

这是一个初始版本,已知存在正在积极修复的 bug,尤其是批量处理(请参阅已知问题部分)。核心功能已就绪,但用户应预期未来版本中可能存在一些不稳定性和潜在的 API 变更。流操作已显示出稳定性,但在发布更彻底测试的版本之前,建议谨慎用于生产。

已知问题

  • 批量处理错误:批量处理中存在一个间歇性问题,即beginBatch消息发送失败。这并非每次都会发生,但用户应意识到批量处理中可能存在的不一致性。我们正在积极调查此问题。

  • 流稳定:与批量处理问题相反,到目前为止,流操作已被观察到是稳定的。

我们正在积极解决这些问题,并欢迎社区提供任何反馈或贡献,以帮助提高库的稳定性和性能。

贡献

欢迎贡献!请随时提交拉取请求。

许可证

本项目采用MIT许可证

致谢

  • 感谢InfluxData团队创建了Kapacitor和UDF接口
  • 感谢Rust社区提供了出色的异步工具和库

依赖项

~10–22MB
~308K SLoC