#maria-db #binlog #replication #cdc #capture #events #change

mysql_cdc

MySQL/MariaDB binlog变更数据捕获(CDC)的Rust连接器

9个版本

0.2.1 2023年7月13日
0.2.0 2023年6月4日
0.1.7 2023年6月3日
0.1.6 2022年1月25日
0.1.0 2021年9月29日

#327数据库接口

Download history

每月68次下载

MIT许可证

165KB
3.5K SLoC

mysql_cdc

crates.io docs.rs

MySQL/MariaDB binlog复制的Rust客户端

限制

请注意,库目前有以下限制

  • 仅支持标准认证插件 mysql_native_passwordcaching_sha2_password
  • 目前,库不支持SSL加密。
  • 不处理拆分的数据包(16MB及以上)。

Binlog事件流复制

实时复制客户端按以下方式工作。

use mysql_cdc::binlog_client::BinlogClient;
use mysql_cdc::binlog_options::BinlogOptions;
use mysql_cdc::errors::Error;
use mysql_cdc::providers::mariadb::gtid::gtid_list::GtidList;
use mysql_cdc::providers::mysql::gtid::gtid_set::GtidSet;
use mysql_cdc::replica_options::ReplicaOptions;
use mysql_cdc::ssl_mode::SslMode;

fn main() -> Result<(), Error> {
    // Start replication from MariaDB GTID
    let _options = BinlogOptions::from_mariadb_gtid(GtidList::parse("0-1-270")?);

    // Start replication from MySQL GTID
    let gtid_set =
        "d4c17f0c-4f11-11ea-93e3-325d3e1cd1c8:1-107, f442510a-2881-11ea-b1dd-27916133dbb2:1-7";
    let _options = BinlogOptions::from_mysql_gtid(GtidSet::parse(gtid_set)?);

    // Start replication from the position
    let _options = BinlogOptions::from_position(String::from("mysql-bin.000008"), 195);

    // Start replication from last master position.
    // Useful when you are only interested in new changes.
    let _options = BinlogOptions::from_end();

    // Start replication from first event of first available master binlog.
    // Note that binlog files by default have expiration time and deleted.
    let options = BinlogOptions::from_start();

    let options = ReplicaOptions {
        username: String::from("root"),
        password: String::from("Qwertyu1"),
        blocking: true,
        ssl_mode: SslMode::Disabled,
        binlog: options,
        ..Default::default()
    };

    let mut client = BinlogClient::new(options);

    for result in client.replicate()? {
        let (header, event) = result?;
        println!("{:#?}", header);
        println!("{:#?}", event);

        // You process an event here

        // After you processed the event, you need to update replication position
        client.commit(&header, &event);
    }
    Ok(())
}

一个典型的事务具有以下结构。

  1. GtidEvent 如果启用了gtid模式。
  2. 一个或多个 TableMapEvent 事件。
    • 一个或多个 WriteRowsEvent 事件。
    • 一个或多个 UpdateRowsEvent 事件。
    • 一个或多个 DeleteRowsEvent 事件。
  3. XidEvent 指示事务提交。

最佳实践是使用from_gtid方法进行GTID复制。使用此方法可以正确执行复制故障转移。请注意,在GTID模式下,from_gtid具有以下行为

  • from_gtid(@@gtid_purged)类似于from_start()
  • from_gtid(@@gtid_executed)类似于from_end()

离线读取binlog文件

在某些情况下,您可能需要从文件系统离线读取binlog文件。可以使用BinlogReader类完成此操作。

use mysql_cdc::{binlog_reader::BinlogReader, errors::Error};
use std::fs::File;

const PATH: &str = "mysql-bin.000001";

fn main() -> Result<(), Error> {
    let file = File::open(PATH)?;
    let reader = BinlogReader::new(file)?;

    for result in reader.read_events() {
        let (header, event) = result?;
        println!("{:#?}", header);
        println!("{:#?}", event);
    }
    Ok(())
}

依赖关系

~2.3–3MB
~72K SLoC