9个版本
0.2.1 | 2023年7月13日 |
---|---|
0.2.0 | 2023年6月4日 |
0.1.7 |
|
0.1.6 | 2022年1月25日 |
0.1.0 | 2021年9月29日 |
#327 在 数据库接口
每月68次下载
165KB
3.5K SLoC
mysql_cdc
MySQL/MariaDB binlog复制的Rust客户端
限制
请注意,库目前有以下限制
- 仅支持标准认证插件
mysql_native_password
和caching_sha2_password
。 - 目前,库不支持SSL加密。
- 不处理拆分的数据包(16MB及以上)。
Binlog事件流复制
实时复制客户端按以下方式工作。
use mysql_cdc::binlog_client::BinlogClient;
use mysql_cdc::binlog_options::BinlogOptions;
use mysql_cdc::errors::Error;
use mysql_cdc::providers::mariadb::gtid::gtid_list::GtidList;
use mysql_cdc::providers::mysql::gtid::gtid_set::GtidSet;
use mysql_cdc::replica_options::ReplicaOptions;
use mysql_cdc::ssl_mode::SslMode;
fn main() -> Result<(), Error> {
// Start replication from MariaDB GTID
let _options = BinlogOptions::from_mariadb_gtid(GtidList::parse("0-1-270")?);
// Start replication from MySQL GTID
let gtid_set =
"d4c17f0c-4f11-11ea-93e3-325d3e1cd1c8:1-107, f442510a-2881-11ea-b1dd-27916133dbb2:1-7";
let _options = BinlogOptions::from_mysql_gtid(GtidSet::parse(gtid_set)?);
// Start replication from the position
let _options = BinlogOptions::from_position(String::from("mysql-bin.000008"), 195);
// Start replication from last master position.
// Useful when you are only interested in new changes.
let _options = BinlogOptions::from_end();
// Start replication from first event of first available master binlog.
// Note that binlog files by default have expiration time and deleted.
let options = BinlogOptions::from_start();
let options = ReplicaOptions {
username: String::from("root"),
password: String::from("Qwertyu1"),
blocking: true,
ssl_mode: SslMode::Disabled,
binlog: options,
..Default::default()
};
let mut client = BinlogClient::new(options);
for result in client.replicate()? {
let (header, event) = result?;
println!("{:#?}", header);
println!("{:#?}", event);
// You process an event here
// After you processed the event, you need to update replication position
client.commit(&header, &event);
}
Ok(())
}
一个典型的事务具有以下结构。
GtidEvent
如果启用了gtid模式。- 一个或多个
TableMapEvent
事件。- 一个或多个
WriteRowsEvent
事件。 - 一个或多个
UpdateRowsEvent
事件。 - 一个或多个
DeleteRowsEvent
事件。
- 一个或多个
XidEvent
指示事务提交。
最佳实践是使用from_gtid
方法进行GTID复制。使用此方法可以正确执行复制故障转移。请注意,在GTID模式下,from_gtid
具有以下行为
from_gtid(@@gtid_purged)
类似于from_start()
from_gtid(@@gtid_executed)
类似于from_end()
离线读取binlog文件
在某些情况下,您可能需要从文件系统离线读取binlog文件。可以使用BinlogReader
类完成此操作。
use mysql_cdc::{binlog_reader::BinlogReader, errors::Error};
use std::fs::File;
const PATH: &str = "mysql-bin.000001";
fn main() -> Result<(), Error> {
let file = File::open(PATH)?;
let reader = BinlogReader::new(file)?;
for result in reader.read_events() {
let (header, event) = result?;
println!("{:#?}", header);
println!("{:#?}", event);
}
Ok(())
}
依赖关系
~2.3–3MB
~72K SLoC