2 个版本
0.1.1 | 2024年6月14日 |
---|---|
0.1.0 | 2024年1月19日 |
#581 in 数据库接口
每月 90 次下载
150KB
3K SLoC
英语 | 中文
mysql-binlog-connector-rust
概述
- 一个简单但强大的库,用于在 Rust 中使用异步 I/O 输出 mysql binlog (binlog_format=ROW) 并解析基于行的复制事件。
支持的 mysql 版本
- mysql 5.6 (在 mysql:5.6.51 中测试)
- mysql 5.7 (在 mysql:5.7.40 中测试)
- mysql 8.0 (在 mysql:8.0.31 中测试)
支持的事件类型
-
FORMAT_DESCRIPTION_EVENT
-
ROTATE_EVENT
-
PREVIOUS_GTIDS_LOG_EVENT
-
GTID_LOG_EVENT
-
QUERY_EVENT
-
XID_EVENT
-
XA_PREPARE_LOG_EVENT
-
TRANSACTION_PAYLOAD_EVENT
-
ROWS_QUERY_LOG_EVENT
-
TABLE_MAP_EVENT
-
WRITE_ROWS_EVENT_V1
-
WRITE_ROWS_EVENT
-
UPDATE_ROWS_EVENT_V1
-
UPDATE_ROWS_EVENT
-
DELETE_ROWS_EVENT_V1
-
DELETE_ROWS_EVENT
-
更多详细信息,请参阅:mysql 文档
mysql 列和 rust 类型的映射
mysql 列类型 | binlog 列类型(原始) | binlog 列类型(从 binlog 列元数据解析) | rust 类型 |
---|---|---|---|
BIT | MYSQL_TYPE_BIT = 16 | ColumnType::Bit | ColumnValue::Bit(u64) |
TINYINT [UNSIGNED] | MYSQL_TYPE_TINY = 1 | ColumnType::Tiny | ColumnValue::Tiny(i8) |
SMALLINT [UNSIGNED] | MYSQL_TYPE_SHORT = 2 | ColumnType::Short | ColumnValue::Short(i16) |
MEDIUMINT [UNSIGNED] | MYSQL_TYPE_INT24 = 9 | ColumnType::Int24 | ColumnValue::Long(i32) |
INT [UNSIGNED] | MYSQL_TYPE_LONG = 3 | ColumnType::Long | ColumnValue::Long(i32) |
BIGINT [UNSIGNED] | MYSQL_TYPE_LONGLONG = 8 | ColumnType::LongLong | ColumnValue::LongLong(i64) |
FLOAT | MYSQL_TYPE_FLOAT = 4 | ColumnType::Float | ColumnValue::Float(f32) |
DOUBLE | MYSQL_TYPE_DOUBLE = 5 | ColumnType::Double | ColumnValue::Double(f64) |
DECIMAL | MYSQL_TYPE_NEWDECIMAL = 246 | ColumnType::NewDecimal | ColumnValue::Decimal(String) |
DATE | MYSQL_TYPE_DATE = 10 | ColumnType::Date | ColumnValue::Date(String) |
TIME | MYSQL_TYPE_TIME2 = 19 | ColumnType::Time2 | ColumnValue::Time(String) |
TIMESTAMP | MYSQL_TYPE_TIMESTAMP2 = 17 | ColumnType::TimeStamp2 | ColumnValue::Timestamp(i64) |
DATETIME | MYSQL_TYPE_DATETIME2 = 18 | ColumnType::DateTime2 | ColumnValue::DateTime(String) |
YEAR | MYSQL_TYPE_YEAR = 13 | ColumnType::Year | ColumnValue::Year(u16) |
CHAR | MYSQL_TYPE_STRING = 254 | ColumnType::String | ColumnValue::String(Vec |
VARCHAR | MYSQL_TYPE_VARCHAR = 15 | ColumnType::VarChar | ColumnValue::String(Vec |
BINARY | MYSQL_TYPE_STRING = 254 | ColumnType::String | ColumnValue::String(Vec |
VARBINARY | MYSQL_TYPE_VARCHAR = 15 | ColumnType::VarChar | ColumnValue::String(Vec |
ENUM | MYSQL_TYPE_STRING = 254 | ColumnType::Enum | ColumnValue::Enum(u32) |
SET | MYSQL_TYPE_STRING = 254 | 列类型::设置 | 列值::设置(u64) |
极短文本 TEXT 中等文本 LONGTEXT 极短二进制 BLOB 二进制 MEDIUMBLOB 长二进制 LONGBLOB | MYSQL_TYPE_BLOB = 252 | 列类型::二进制 | 列值::二进制(Vec |
几何形状 | MYSQL_TYPE_GEOMETRY = 255 | 列类型::几何形状 | 列值::二进制(Vec |
JSON | MYSQL_TYPE_JSON = 245 | 列类型::JSON | 列值::JSON(Vec |
- 对于 CHAR / VARCHAR 列,由于 binlog 不包含字符集信息,我们只获取原始字节并将它们存储在 ColumnValue::String(Vec
) 对象中,您可能需要根据列元数据将它们转换为字符串以进行进一步使用。 - 对于无符号数字列,由于 binlog 不包含无符号标志,我们仅将其解析为有符号数字,您可能需要根据列元数据将它们转换为无符号值以进行进一步使用。
- 对于 JSON 列,我们获取原始字节并将它们存储在 ColumnValue::Json(Vec
) 对象中,我们还提供了一个默认的反序列化器 "JsonBinary",用于将它们解析为字符串,稍后在本文档中提供示例。
快速入门
运行测试
- 启动 mysql,启用 binlog,不启用 binlog-transaction-compression
docker run -d --name mysql57 \
--platform linux/x86_64 \
-it --restart=always \
-p 3307:3306 \
-e MYSQL_ROOT_PASSWORD="123456" \
mysql:5.7.40 \
--lower_case_table_names=1 \
--character-set-server=utf8 \
--collation-server=utf8_general_ci \
--datadir=/var/lib/mysql \
--user=mysql \
--server_id=1 \
--log_bin=/var/lib/mysql/mysql-bin.log \
--max_binlog_size=100M \
--gtid_mode=ON \
--enforce_gtid_consistency=ON \
--binlog_format=ROW
- 启动 mysql,启用 binlog,启用 binlog-transaction-compression
docker run -d --name mysql80 \
--platform linux/x86_64 \
-it --restart=always \
-p 3308:3306 -e MYSQL_ROOT_PASSWORD="123456" \
mysql:8.0.31 \
--lower_case_table_names=1 \
--character-set-server=utf8 \
--collation-server=utf8_general_ci \
--datadir=/var/lib/mysql \
--user=mysql \
--server_id=1 \
--log_bin=/var/lib/mysql/mysql-bin.log \
--max_binlog_size=100M \
--gtid_mode=ON \
--enforce_gtid_consistency=ON \
--binlog_format=ROW \
--binlog-transaction-compression \
--binlog_rows_query_log_events=ON \
--default_authentication_plugin=mysql_native_password \
--default_time_zone="+08:00"
- 在 tests/.env 中更新配置
db_url=mysql://root:[email protected]:3307
server_id=200
default_db="db_test"
default_tb="tb_test"
binlog_parse_millis=100
- 运行测试
cargo test --package mysql-binlog-connector-rust --test integration_test
- 每个测试将
- 执行 sql 语句以创建表并生成 binlog
- 转储和解析 binlog
- 等待 binlog_parse_millis 以解析所有 binlog
- 您可能需要为大型事务增加 binlog_parse_millis
示例
fn main() {
let env_path = env::current_dir().unwrap().join("example/src/.env");
dotenv::from_path(env_path).unwrap();
let db_url = env::var("db_url").unwrap();
let server_id: u64 = env::var("server_id").unwrap().parse().unwrap();
let binlog_filename = env::var("binlog_filename").unwrap();
let binlog_position: u32 = env::var("binlog_position").unwrap().parse().unwrap();
block_on(start_client(
db_url,
server_id,
binlog_filename,
binlog_position,
));
}
async fn start_client(url: String, server_id: u64, binlog_filename: String, binlog_position: u32) {
let mut client = BinlogClient {
url,
binlog_filename,
binlog_position,
server_id,
};
let mut stream = client.connect().await.unwrap();
loop {
let (header, data) = stream.read().await.unwrap();
println!("header: {:?}", header);
println!("data: {:?}", data);
println!("");
}
}
示例 1:禁用 binlog-transaction-compression 解析 binlog
- 执行 sql 语句
flush logs;
SET autocommit=0;
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE test_tb(id INT, value INT);
INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4);
UPDATE test_tb SET value=3 WHERE id in(1,2);
DELETE FROM test_tb WHERE id in (1,2);
TRUNCATE TABLE test_tb;
DROP TABLE test_tb;
commit;
- 显示 binlog 事件
mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000050 | 1255 |
mysql> show binlog events in 'mysql-bin.000050';
+------------------+------+----------------+-----------+-------------+------------------------------------------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+------+----------------+-----------+-------------+------------------------------------------------------------------------+
| mysql-bin.000050 | 4 | Format_desc | 1 | 123 | Server ver: 5.7.40-log, Binlog ver: 4 |
| mysql-bin.000050 | 123 | Previous_gtids | 1 | 194 | 50dc6874-13d3-11ee-a17a-0242ac110002:1-176027 |
| mysql-bin.000050 | 194 | Gtid | 1 | 259 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176028' |
| mysql-bin.000050 | 259 | Query | 1 | 378 | use `test_db`; CREATE TABLE test_tb(id INT, value INT) |
| mysql-bin.000050 | 378 | Gtid | 1 | 443 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176029' |
| mysql-bin.000050 | 443 | Query | 1 | 518 | BEGIN |
| mysql-bin.000050 | 518 | Table_map | 1 | 572 | table_id: 12832 (test_db.test_tb) |
| mysql-bin.000050 | 572 | Write_rows | 1 | 643 | table_id: 12832 flags: STMT_END_F |
| mysql-bin.000050 | 643 | Table_map | 1 | 697 | table_id: 12832 (test_db.test_tb) |
| mysql-bin.000050 | 697 | Update_rows | 1 | 769 | table_id: 12832 flags: STMT_END_F |
| mysql-bin.000050 | 769 | Table_map | 1 | 823 | table_id: 12832 (test_db.test_tb) |
| mysql-bin.000050 | 823 | Delete_rows | 1 | 876 | table_id: 12832 flags: STMT_END_F |
| mysql-bin.000050 | 876 | Xid | 1 | 907 | COMMIT /* xid=13739 */ |
| mysql-bin.000050 | 907 | Gtid | 1 | 972 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176030' |
| mysql-bin.000050 | 972 | Query | 1 | 1064 | use `test_db`; TRUNCATE TABLE test_tb |
| mysql-bin.000050 | 1064 | Gtid | 1 | 1129 | SET @@SESSION.GTID_NEXT= '50dc6874-13d3-11ee-a17a-0242ac110002:176031' |
| mysql-bin.000050 | 1129 | Query | 1 | 1255 | use `test_db`; DROP TABLE `test_tb` /* generated by server */ |
+------------------+------+----------------+-----------+-------------+------------------------------------------------------------------------+
- 已解析 binlog
header: EventHeader { timestamp: 0, event_type: 4, server_id: 1, event_length: 47, next_event_position: 0, event_flags: 32 }
data: Rotate(RotateEvent { binlog_filename: "mysql-bin.000050", binlog_position: 194 })
header: EventHeader { timestamp: 1704443761, event_type: 15, server_id: 1, event_length: 119, next_event_position: 0, event_flags: 0 }
data: FormatDescription(FormatDescriptionEvent { binlog_version: 4, server_version: "5.7.40-log\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", create_timestamp: 0, header_length: 19, checksum_type: CRC32 })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 259, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176028" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 119, next_event_position: 378, event_flags: 0 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "CREATE TABLE test_tb(id INT, value INT)" })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 443, event_flags: 0 }
data: Gtid(GtidEvent { flags: 0, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176029" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 75, next_event_position: 518, event_flags: 8 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "BEGIN" })
header: EventHeader { timestamp: 1704443769, event_type: 19, server_id: 1, event_length: 54, next_event_position: 572, event_flags: 0 }
data: TableMap(TableMapEvent { table_id: 12832, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })
header: EventHeader { timestamp: 1704443769, event_type: 30, server_id: 1, event_length: 71, next_event_position: 643, event_flags: 0 }
data: WriteRows(WriteRowsEvent { table_id: 12832, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(3), Long(3)] }, RowEvent { column_values: [Long(4), Long(4)] }] })
header: EventHeader { timestamp: 1704443769, event_type: 19, server_id: 1, event_length: 54, next_event_position: 697, event_flags: 0 }
data: TableMap(TableMapEvent { table_id: 12832, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })
header: EventHeader { timestamp: 1704443769, event_type: 31, server_id: 1, event_length: 72, next_event_position: 769, event_flags: 0 }
data: UpdateRows(UpdateRowsEvent { table_id: 12832, included_columns_before: [true, true], included_columns_after: [true, true], rows: [(RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(1), Long(3)] }), (RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(2), Long(3)] })] })
header: EventHeader { timestamp: 1704443769, event_type: 19, server_id: 1, event_length: 54, next_event_position: 823, event_flags: 0 }
data: TableMap(TableMapEvent { table_id: 12832, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })
header: EventHeader { timestamp: 1704443769, event_type: 32, server_id: 1, event_length: 53, next_event_position: 876, event_flags: 0 }
data: DeleteRows(DeleteRowsEvent { table_id: 12832, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(3)] }, RowEvent { column_values: [Long(2), Long(3)] }] })
header: EventHeader { timestamp: 1704443769, event_type: 16, server_id: 1, event_length: 31, next_event_position: 907, event_flags: 0 }
data: Xid(XidEvent { xid: 13739 })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 972, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176030" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 92, next_event_position: 1064, event_flags: 0 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "TRUNCATE TABLE test_tb" })
header: EventHeader { timestamp: 1704443769, event_type: 33, server_id: 1, event_length: 65, next_event_position: 1129, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "50dc6874-13d3-11ee-a17a-0242ac110002:176031" })
header: EventHeader { timestamp: 1704443769, event_type: 2, server_id: 1, event_length: 126, next_event_position: 1255, event_flags: 4 }
data: Query(QueryEvent { thread_id: 493, exec_time: 0, error_code: 0, schema: "test_db", query: "DROP TABLE `test_tb` /* generated by server */" })
示例 2:启用 binlog-transaction-compression 解析 binlog
- 执行 sql 语句
flush logs;
SET autocommit=0;
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE test_tb(id INT, value INT);
INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4);
UPDATE test_tb SET value=3 WHERE id in(1,2);
DELETE FROM test_tb WHERE id in (1,2);
TRUNCATE TABLE test_tb;
DROP TABLE test_tb;
commit;
- 显示 binlog 事件
mysql> show binary logs;
+------------------+-----------+-----------+
| Log_name | File_size | Encrypted |
+------------------+-----------+-----------+
| mysql-bin.000033 | 1429 | No |
mysql> show binlog events in 'mysql-bin.000033';
+------------------+------+---------------------+-----------+-------------+----------------------------------------------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+------+---------------------+-----------+-------------+----------------------------------------------------------------------------+
| mysql-bin.000033 | 4 | Format_desc | 1 | 126 | Server ver: 8.0.31, Binlog ver: 4 |
| mysql-bin.000033 | 126 | Previous_gtids | 1 | 197 | 36682cf3-a048-11ed-b4b3-0242ac110004:1-125753 |
| mysql-bin.000033 | 197 | Gtid | 1 | 274 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125754' |
| mysql-bin.000033 | 274 | Query | 1 | 391 | CREATE DATABASE test_db /* xid=5 */ |
| mysql-bin.000033 | 391 | Gtid | 1 | 468 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125755' |
| mysql-bin.000033 | 468 | Query | 1 | 601 | use `test_db`; CREATE TABLE test_tb(id INT, value INT) /* xid=10 */ |
| mysql-bin.000033 | 601 | Gtid | 1 | 680 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125756' |
| mysql-bin.000033 | 680 | Transaction_payload | 1 | 1033 | compression='ZSTD', decompressed_size=633 bytes |
| mysql-bin.000033 | 1033 | Query | 1 | 1033 | BEGIN |
| mysql-bin.000033 | 1033 | Rows_query | 1 | 1033 | # INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4) |
| mysql-bin.000033 | 1033 | Table_map | 1 | 1033 | table_id: 90 (test_db.test_tb) |
| mysql-bin.000033 | 1033 | Write_rows | 1 | 1033 | table_id: 90 flags: STMT_END_F |
| mysql-bin.000033 | 1033 | Rows_query | 1 | 1033 | # UPDATE test_tb SET value=3 WHERE id in(1,2) |
| mysql-bin.000033 | 1033 | Table_map | 1 | 1033 | table_id: 90 (test_db.test_tb) |
| mysql-bin.000033 | 1033 | Update_rows | 1 | 1033 | table_id: 90 flags: STMT_END_F |
| mysql-bin.000033 | 1033 | Rows_query | 1 | 1033 | # DELETE FROM test_tb WHERE id in (1,2) |
| mysql-bin.000033 | 1033 | Table_map | 1 | 1033 | table_id: 90 (test_db.test_tb) |
| mysql-bin.000033 | 1033 | Delete_rows | 1 | 1033 | table_id: 90 flags: STMT_END_F |
| mysql-bin.000033 | 1033 | Xid | 1 | 1033 | COMMIT /* xid=11 */ |
| mysql-bin.000033 | 1033 | Gtid | 1 | 1110 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125757' |
| mysql-bin.000033 | 1110 | Query | 1 | 1214 | use `test_db`; TRUNCATE TABLE test_tb /* xid=14 */ |
| mysql-bin.000033 | 1214 | Gtid | 1 | 1291 | SET @@SESSION.GTID_NEXT= '36682cf3-a048-11ed-b4b3-0242ac110004:125758' |
| mysql-bin.000033 | 1291 | Query | 1 | 1429 | use `test_db`; DROP TABLE `test_tb` /* generated by server */ /* xid=15 */ |
+------------------+------+---------------------+-----------+-------------+----------------------------------------------------------------------------+
- 已解析 binlog
header: EventHeader { timestamp: 1704445709, event_type: 15, server_id: 1, event_length: 122, next_event_position: 126, event_flags: 0 }
data: FormatDescription(FormatDescriptionEvent { binlog_version: 4, server_version: "8.0.31\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", create_timestamp: 0, header_length: 19, checksum_type: CRC32 })
header: EventHeader { timestamp: 1704445709, event_type: 35, server_id: 1, event_length: 71, next_event_position: 197, event_flags: 128 }
data: PreviousGtids(PreviousGtidsEvent { gtid_set: "36682cf3-a048-11ed-b4b3-0242ac110004:1-125753" })
header: EventHeader { timestamp: 1704445716, event_type: 33, server_id: 1, event_length: 77, next_event_position: 274, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125754" })
header: EventHeader { timestamp: 1704445716, event_type: 2, server_id: 1, event_length: 117, next_event_position: 391, event_flags: 8 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "CREATE DATABASE test_db" })
header: EventHeader { timestamp: 1704445716, event_type: 33, server_id: 1, event_length: 77, next_event_position: 468, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125755" })
header: EventHeader { timestamp: 1704445716, event_type: 2, server_id: 1, event_length: 133, next_event_position: 601, event_flags: 0 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "CREATE TABLE test_tb(id INT, value INT)" })
header: EventHeader { timestamp: 1704445717, event_type: 33, server_id: 1, event_length: 79, next_event_position: 680, event_flags: 0 }
data: Gtid(GtidEvent { flags: 0, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125756" })
header: EventHeader { timestamp: 1704445717, event_type: 40, server_id: 1, event_length: 353, next_event_position: 1033, event_flags: 0 }
data: TransactionPayload(TransactionPayloadEvent { uncompressed_size: 633, uncompressed_events: [(EventHeader { timestamp: 1704445716, event_type: 2, server_id: 1, event_length: 74, next_event_position: 0, event_flags: 8 }, Query(QueryEvent { thread_id: 8, exec_time: 1, error_code: 0, schema: "test_db", query: "BEGIN" })), (EventHeader { timestamp: 1704445716, event_type: 29, server_id: 1, event_length: 69, next_event_position: 0, event_flags: 128 }, RowsQuery(RowsQueryEvent { query: "INSERT INTO test_tb VALUES(1,1),(2,2),(3,3),(4,4)" })), (EventHeader { timestamp: 1704445716, event_type: 19, server_id: 1, event_length: 53, next_event_position: 0, event_flags: 0 }, TableMap(TableMapEvent { table_id: 90, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })), (EventHeader { timestamp: 1704445716, event_type: 30, server_id: 1, event_length: 67, next_event_position: 0, event_flags: 0 }, WriteRows(WriteRowsEvent { table_id: 90, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(3), Long(3)] }, RowEvent { column_values: [Long(4), Long(4)] }] })), (EventHeader { timestamp: 1704445717, event_type: 29, server_id: 1, event_length: 63, next_event_position: 0, event_flags: 128 }, RowsQuery(RowsQueryEvent { query: "UPDATE test_tb SET value=3 WHERE id in(1,2)" })), (EventHeader { timestamp: 1704445717, event_type: 19, server_id: 1, event_length: 53, next_event_position: 0, event_flags: 0 }, TableMap(TableMapEvent { table_id: 90, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })), (EventHeader { timestamp: 1704445717, event_type: 31, server_id: 1, event_length: 68, next_event_position: 0, event_flags: 0 }, UpdateRows(UpdateRowsEvent { table_id: 90, included_columns_before: [true, true], included_columns_after: [true, true], rows: [(RowEvent { column_values: [Long(1), Long(1)] }, RowEvent { column_values: [Long(1), Long(3)] }), (RowEvent { column_values: [Long(2), Long(2)] }, RowEvent { column_values: [Long(2), Long(3)] })] })), (EventHeader { timestamp: 1704445717, event_type: 29, server_id: 1, event_length: 57, next_event_position: 0, event_flags: 128 }, RowsQuery(RowsQueryEvent { query: "DELETE FROM test_tb WHERE id in (1,2)" })), (EventHeader { timestamp: 1704445717, event_type: 19, server_id: 1, event_length: 53, next_event_position: 0, event_flags: 0 }, TableMap(TableMapEvent { table_id: 90, database_name: "test_db", table_name: "test_tb", column_types: [3, 3], column_metas: [0, 0], null_bits: [true, true] })), (EventHeader { timestamp: 1704445717, event_type: 32, server_id: 1, event_length: 49, next_event_position: 0, event_flags: 0 }, DeleteRows(DeleteRowsEvent { table_id: 90, included_columns: [true, true], rows: [RowEvent { column_values: [Long(1), Long(3)] }, RowEvent { column_values: [Long(2), Long(3)] }] })), (EventHeader { timestamp: 1704445717, event_type: 16, server_id: 1, event_length: 27, next_event_position: 0, event_flags: 0 }, Xid(XidEvent { xid: 11 }))] })
header: EventHeader { timestamp: 1704445717, event_type: 33, server_id: 1, event_length: 77, next_event_position: 1110, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125757" })
header: EventHeader { timestamp: 1704445717, event_type: 2, server_id: 1, event_length: 104, next_event_position: 1214, event_flags: 0 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "TRUNCATE TABLE test_tb" })
header: EventHeader { timestamp: 1704445717, event_type: 33, server_id: 1, event_length: 77, next_event_position: 1291, event_flags: 0 }
data: Gtid(GtidEvent { flags: 1, gtid: "36682cf3-a048-11ed-b4b3-0242ac110004:125758" })
header: EventHeader { timestamp: 1704445717, event_type: 2, server_id: 1, event_length: 138, next_event_position: 1429, event_flags: 4 }
data: Query(QueryEvent { thread_id: 8, exec_time: 0, error_code: 0, schema: "test_db", query: "DROP TABLE `test_tb` /* generated by server */" })
示例 3:解析 JSON 列到字符串
fn parse_json_columns(data: EventData) {
let parse_row = |row: RowEvent| {
for column_value in row.column_values {
if let ColumnValue::Json(bytes) = column_value {
println!(
"json column: {}",
JsonBinary::parse_as_string(&bytes).unwrap()
)
}
}
};
match data {
EventData::WriteRows(event) => {
for row in event.rows {
parse_row(row)
}
}
EventData::DeleteRows(event) => {
for row in event.rows {
parse_row(row)
}
}
EventData::UpdateRows(event) => {
for (before, after) in event.rows {
parse_row(before);
parse_row(after);
}
}
_ => {}
}
}
- 执行 sql 语句
CREATE TABLE test_db_1.json_test(id INT AUTO_INCREMENT, json_col JSON, PRIMARY KEY(id));
SET autocommit=0;
INSERT INTO test_db_1.json_test VALUES (NULL, '{"k.1":1,"k.0":0,"k.-1":-1,"k.true":true,"k.false":false,"k.null":null,"k.string":"string","k.true_false":[true,false],"k.32767":32767,"k.32768":32768,"k.-32768":-32768,"k.-32769":-32769,"k.2147483647":2147483647,"k.2147483648":2147483648,"k.-2147483648":-2147483648,"k.-2147483649":-2147483649,"k.18446744073709551615":18446744073709551615,"k.18446744073709551616":18446744073709551616,"k.3.14":3.14,"k.{}":{},"k.[]":[]}');
INSERT INTO test_db_1.json_test VALUES (NULL, '{"中文":"😀"}');
commit;
- 解析的 JSON 列值
json column: {"k.0":0,"k.1":1,"k.-1":-1,"k.[]":[],"k.{}":{},"k.3.14":3.14,"k.null":null,"k.true":true,"k.32767":32767,"k.32768":32768,"k.false":false,"k.-32768":-32768,"k.-32769":-32769,"k.string":"string","k.2147483647":2147483647,"k.2147483648":2147483648,"k.true_false":[true,false],"k.-2147483648":-2147483648,"k.-2147483649":-2147483649,"k.18446744073709551615":18446744073709551615,"k.18446744073709551616":18446744073709552000}
json column: {"中文":"😀"}
示例 4:解析 binlog 文件
async fn parse_file() {
let file_path = "path-to-binlog-file";
let mut file = File::open(file_path).unwrap();
let mut parser = BinlogParser {
checksum_length: 4,
table_map_event_by_table_id: HashMap::new(),
};
assert!(parser.check_magic(&mut file).is_ok());
while let Ok((header, data)) = parser.next(&mut file) {
println!("header: {:?}", header);
println!("data: {:?}", data);
println!("");
}
}
依赖项
~13–26MB
~463K SLoC