2个版本
0.1.1 | 2022年10月23日 |
---|---|
0.1.0 | 2022年10月22日 |
#1128 in 网络编程
18KB
181 行
云数据库的无服务器ETL运行时
MEGA代表让ETL再次伟大!
本项目是一个基于WasmEdge WebAssembly运行时的云原生ETL(提取、转换、加载)应用程序框架,供开发者筛选、映射和转换进入云数据库的数据管道。我们目前的目标后端数据库是TiDB Cloud。
ETL工具对于现代数据分析流程至关重要。然而,云数据库的ETL有其自身的挑战。由于公共云本质上是一个多租户环境,所有用户定义的ETL函数都在数据库外部隔离,在单独的VM或安全容器中。这是一个复杂且重量级的设置,不适合需要处理间歇性数据流的简单函数。
有了MEGA框架,开发者将能够创建安全、轻量、快速且跨平台的ETL函数,这些函数位于云数据库基础设施附近或嵌入其中。MEGA ETL函数可以作为无服务器函数部署,并从各种来源接收数据,包括事件队列、webhook回调和数据流管道。结果将写入指定的云数据库以供后续分析。
先决条件
WasmEdge WebAssembly运行时是一个CNCF下的开源项目。它提供了一个比Linux容器更安全、更轻量级的替代方案来运行编译的(即高性能)ETL函数。它们可以部署到接近数据源或与云数据库服务器在同一防火墙内的边缘云。特别地,你需要
- 安装Rust。该框架目前是用Rust语言编写的。JavaScript版本正在开发中。
- 安装WasmEdge。你需要它来运行ETL函数。
- 注册 TiDB Cloud。ETL 转换后的数据写入此数据库以供后续分析。
在 Linux 上,您可以使用以下命令安装 Rust 和 WasmEdge。
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
source $HOME/.cargo/env
# Install WebAssembly target for Rust
rustup target add wasm32-wasi
# Install WasmEdge
curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash -s -- -e all
source $HOME/.wasmedge/env
创建 ETL 函数
首先,将 MEGAcrate 添加到您的 Rust 项目中。
[dependencies]
mega_etl = "0.1"
接下来,在您的 Rust 代码中,您需要实现以下内容。
- 定义一个结构体来模拟数据库表。表中每一列都由结构体中的数据字段表示。
- 实现一个必需的
transform()
函数,为上述结构体提供Transformer
特性。该函数接收一个Vec<u8>
字节数组作为输入参数,并返回一个用于数据库的 SQL 字符串。 - 设置连接字符串变量以连接到 TiDB,以及配置用于检索输入
Vec<u8>
的入站连接器(例如,来自 Kafka 队列、HTTP 服务或 Redis 中的临时数据库表)。
首先,让我们定义数据库表的数据结构。这是一个电子商务网站的订单记录表。
#[derive(Serialize, Deserialize, Debug)]
struct Order {
order_id: i32,
product_id: i32,
quantity: i32,
amount: f32,
shipping: f32,
tax: f32,
shipping_address: String,
}
接下来,定义将入站数据转换为数据库 SQL 语句的 ETL transform()
函数。入站数据是简单地从任何数据源接收到的字节数组(例如,来自 Webhook 的 POST 请求或 Kafka 中的消息)。在这个例子中,入站数据是代表 order
的 JSON 字符串。
#[async_trait]
impl Transformer for Order {
async fn transform(inbound_data: Vec<u8>) -> TransformerResult<String> {
let s = std::str::from_utf8(&inbound_data)
.map_err(|e| TransformerError::Custom(e.to_string()))?;
let order: Order = serde_json::from_str(String::from(s).as_str())
.map_err(|e| TransformerError::Custom(e.to_string()))?;
dbg!(&order);
let sql_string = format!(
r"INSERT INTO orders VALUES ({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?});",
order.order_id,
order.product_id,
order.quantity,
order.amount,
order.shipping,
order.tax,
order.shipping_address,
);
Ok(sql_string)
}
}
最后,在主应用程序中,我们将配置一个入站数据源(位于 http://127.0.0.1:8080
的 Webhook)和一个出站数据库(TiDB Cloud 实例)。
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
env_logger::init();
// can use builder later
let uri = match std::env::var("DATABASE_URL") {
Ok(uri) => uri,
Err(_) => "mysql://userID:[email protected]:4000/test".into(),
};
let mut pipe = Pipe::new(uri, "127.0.0.1:8080".to_string()).await;
// This is async because this calls the async transform() function in Order
pipe.start::<Order>().await?;
Ok(())
}
可选地,您可以定义一个 init()
函数。它在 ETL 启动时第一次执行。在这里,我们使用 init()
在数据库中创建一个空的 orders
表。
impl Transformer for Order {
async fn init() -> TransformerResult<String> {
let sql_string = "DROP TABLE IF EXISTS orders; CREATE TABLE orders (order_id INT, product_id INT, quantity INT, amount FLOAT, shipping FLOAT, tax FLOAT, shipping_address VARCHAR(20));";
Ok(sql_string)
}
}
构建
使用 Rust 的 cargo
工具构建 ETL 应用程序。
cargo build --target wasm32-wasi --release
可选地,您可以将它 AOT 编译以提高性能(对于计算密集型的 ETL 函数可能快 100 倍)。
wasmedgec target/wasm32-wasi/release/order_demo.wasm order_demo.wasm
运行
使用 WasmEdge,您有多种部署选项。您可以在支持 WasmEdge 的任何无服务器基础设施上运行编译后的 ETL 函数程序,包括几乎所有的 Kubernetes 变体、Dapr、Docker、Podman 以及托管函数调度器,如 essa-rs 和 flows.network。
但在这个例子中,我们将使用老式的 wasmedge
CLI 工具来运行 ETL 函数作为服务。
wasmedge order_demo.wasm
它将在端口 8080 上启动一个 HTTP 服务器并等待入站数据。在另一个终端中,使用 curl
发送一些入站数据。
curl http://127.0.0.1:8080/ -X POST -d @order.json
在 order.json
中的 JSON 数据作为入站数据发送到 ETL 的 transform()
函数。该函数解析它并生成 SQL 字符串,该字符串将在连接的 TiDB Cloud 实例上自动执行。您现在可以从数据库浏览器连接到 TiDB Cloud,并看到数据库中的 order
记录。
资源
加入我们!
依赖
~17–33MB
~491K SLoC