從 MySQL 到 ClickHouse 實時復制與實現

2022-12-14 18:16:07 來源:51CTO博客

ClickHouse 可以掛載為 MySQL 的一個從庫 ,先全量再增量的實時同步 MySQL 數據,這個功能可以說是今年最亮眼、最剛需的功能,基于它我們可以輕松的打造一套企業級解決方案,讓 OLTP 和 OLAP 的融合從此不再頭疼。

目前支持 MySQL 5.6/5.7/8.0 版本,兼容 Delete/Update 語句,及大部分常用的 DDL 操作。


(相關資料圖)

代碼還處于 Alpha 版本階段,畢竟是兩個異構生態的融合,仍然有不少的工作要做,同時也期待著社區用戶的反饋,以加速迭代。

代碼獲取

由于還在驗收階段,我們只好把 github 上的 pull request 代碼 pull 到本地。?

git fetch origin pull/10851/head:mysql_replica_experiment

開始編譯…

MySQL Master

我們需要一個開啟 binlog 的 MySQL 作為 master:?

docker run -d -e MYSQL_ROOT_PASSWORD=123 mysql:5.7 mysqld --datadir=/var/lib/mysql --server-id=1 --log-bin=/var/lib/mysql/mysql-bin.log --gtid-mode=ON --enforce-gtid-consistency

創建數據庫和表,并寫入數據:?

mysql> create database ckdb;mysql> use ckdb;mysql> create table t1(a int not null primary key, b int);mysql> insert into t1 values(1,1),(2,2);mysql> select * from t1;+---+------+| a | b    |+---+------+| 1 |    1 || 2 |    2 |+---+------+2 rows in set (0.00 sec)

ClickHouse Slave

目前以 database 為單位進行復制,不同的 database 可以來自不同的 MySQL master,這樣就可以實現多個 MySQL 源數據同步到一個 ClickHouse 做 OLAP 分析功能。

創建一個復制通道:?

clickhouse :) CREATE DATABASE ckdb ENGINE = MaterializeMySQL("172.17.0.2:3306", "ckdb", "root", "123");clickhouse :) use ckdb;clickhouse :) show tables;┌─name─┐│ t1   │└──────┘clickhouse :) select * from t1;┌─a─┬─b─┐│ 1 │ 1 │└───┴───┘┌─a─┬─b─┐│ 2 │ 2 │└───┴───┘2 rows in set. Elapsed: 0.017 sec.

看下 ClickHouse 的同步位點:cat ckdatas/metadata/ckdb/.metadata?

Version:1Binlog File:mysql-bin.000001Binlog Position:913Data Version:0

Delete

首先在 MySQL Master 上執行一個刪除操作:?

mysql> delete from t1 where a=1;Query OK, 1 row affected (0.01 sec)

然后在 ClickHouse Slave 側查看記錄:?

clickhouse :) select * from t1;SELECT *FROM t1┌─a─┬─b─┐│ 2 │ 2 │└───┴───┘1 rows in set. Elapsed: 0.032 sec.

此時的 metadata 里 Data Version 已經遞增到 2:?

cat ckdatas/metadata/ckdb/.metadataVersion:1Binlog File:mysql-bin.000001Binlog Position:1171Data Version:2

Update

MySQL Master:

mysql> select * from t1;+---+------+| a | b    |+---+------+| 2 |    2 |+---+------+1 row in set (0.00 sec)mysql> update t1 set b=b+1;mysql> select * from t1;+---+------+| a | b    |+---+------+| 2 |    3 |+---+------+1 row in set (0.00 sec)

ClickHouse Slave:

clickhouse :) select * from t1;SELECT *FROM t1┌─a─┬─b─┐│ 2 │ 3 │└───┴───┘1 rows in set. Elapsed: 0.023 sec.

實現機制

在探討機制之前,首先需要了解下 MySQL 的 binlog event ,主要有以下幾種類型:

1. MYSQL_QUERY_EVENT    -- DDL2. MYSQL_WRITE_ROWS_EVENT -- insert數據3. MYSQL_UPDATE_ROWS_EVENT -- update數據4. MYSQL_DELETE_ROWS_EVENT -- delete數據

當一個事務提交后,MySQL 會把執行的 SQL 處理成相應的 binlog event,并持久化到 binlog 文件。

binlog 是 MySQL 對外輸出的重要途徑,只要你實現 MySQL Replication Protocol,就可以流式的消費MySQL 生產的 binlog event,具體協議見 Replication Protocol。

由于歷史原因,協議繁瑣而詭異,這不是本文重點。

對于 ClickHouse 消費 MySQL binlog 來說,主要有以下3個難點:

DDL 兼容Delete/Update 支持Query 過濾

DDL

DDL 兼容花費了大量的代碼去實現。

首先,我們看看 MySQL 的表復制到 ClickHouse 后會變成什么樣子。

MySQL master:

mysql> show create table t1\G;*************************** 1. row ***************************       Table: t1Create Table: CREATE TABLE `t1` (  `a` int(11) NOT NULL,  `b` int(11) DEFAULT NULL,  PRIMARY KEY (`a`)) ENGINE=InnoDB DEFAULT CHARSET=latin1

ClickHouse slave:

ATTACH TABLE t1(    `a` Int32,    `b` Nullable(Int32),    `_sign` Int8,    `_version` UInt64)ENGINE = ReplacingMergeTree(_version)PARTITION BY intDiv(a, 4294967)ORDER BY tuple(a)SETTINGS index_granularity = 8192

可以看到:

默認增加了 2 個隱藏字段:_sign(-1刪除, 1寫入) 和 _version(數據版本)引擎轉換成了 ReplacingMergeTree,以 _version 作為 column version原主鍵字段 a 作為排序和分區鍵

這只是一個表的復制,其他還有非常多的DDL處理,比如增加列、索引等,感興趣可以觀摩 Parsers/MySQL 下代碼。

Update和Delete

當我們在 MySQL master 執行:

mysql> delete from t1 where a=1;mysql> update t1 set b=b+1;

ClickHouse t1數據(把 _sign 和 _version 一并查詢):

clickhouse :) select a,b,_sign, _version from t1;SELECT    a,    b,    _sign,    _versionFROM t1┌─a─┬─b─┬─_sign─┬─_version─┐│ 1 │ 1 │     1 │        1 ││ 2 │ 2 │     1 │        1 │└───┴───┴───────┴──────────┘┌─a─┬─b─┬─_sign─┬─_version─┐│ 1 │ 1 │    -1 │        2 │└───┴───┴───────┴──────────┘┌─a─┬─b─┬─_sign─┬─_version─┐│ 2 │ 3 │     1 │        3 │└───┴───┴───────┴──────────┘

根據返回結果,可以看到是由 3 個 part 組成。

part1 由??mysql> insert into t1 values(1,1),(2,2)??生成:

┌─a─┬─b─┬─_sign─┬─_version─┐│ 1 │ 1 │     1 │        1 ││ 2 │ 2 │     1 │        1 │└───┴───┴───────┴──────────┘

part2 由??mysql> delete from t1 where a=1??生成:?

┌─a─┬─b─┬─_sign─┬─_version─┐│ 1 │ 1 │    -1 │        2 │└───┴───┴───────┴──────────┘說明:_sign = -1表明處于刪除狀態

part3 由??update t1 set b=b+1??生成:

┌─a─┬─b─┬─_sign─┬─_version─┐│ 2 │ 3 │     1 │        3 │└───┴───┴───────┴──────────┘

使用 final 查詢:

clickhouse :) select a,b,_sign,_version from t1 final;SELECT    a,    b,    _sign,    _versionFROM t1FINAL┌─a─┬─b─┬─_sign─┬─_version─┐│ 1 │ 1 │    -1 │        2 │└───┴───┴───────┴──────────┘┌─a─┬─b─┬─_sign─┬─_version─┐│ 2 │ 3 │     1 │        3 │└───┴───┴───────┴──────────┘2 rows in set. Elapsed: 0.016 sec.

可以看到 ReplacingMergeTree 已經根據 _version 和 OrderBy 對記錄進行去重。

Query

MySQL master:

mysql> select * from t1;+---+------+| a | b    |+---+------+| 2 |    3 |+---+------+1 row in set (0.00 sec)

ClickHouse slave:

clickhouse :) select * from t1;SELECT *FROM t1┌─a─┬─b─┐│ 2 │ 3 │└───┴───┘clickhouse :) select *,_sign,_version from t1;SELECT    *,    _sign,    _versionFROM t1┌─a─┬─b─┬─_sign─┬─_version─┐│ 1 │ 1 │    -1 │        2 ││ 2 │ 3 │     1 │        3 │└───┴───┴───────┴──────────┘說明:這里還有一條刪除記錄,_sign為-1

MaterializeMySQL 被定義成一種存儲引擎,所以在讀取的時候,會根據 _sign 狀態進行判斷,如果是-1則是已經刪除,進行過濾。

總結

ClickHouse 實時復制同步 MySQL 數據是 upstream 2020 的一個 roadmap,在整體構架上比較有挑戰一直無人接單,挑戰主要來自兩方面:

對 MySQL 復制通道與協議非常熟悉對 ClickHouse 整體機制非常熟悉

這樣,在兩個本來有點遙遠的山頭中間架起了一座高速,這條 10851號高速由 zhang1024(ClickHouse側) 和BohuTANG(MySQL復制) 兩個修路工聯合承建,目前正在接受 upstream 的驗收。

關于同步 MySQL 的數據,目前大家的方案基本都是在中間安置一個 binlog 消費工具,這個工具對 event 進行解析,然后再轉換成 ClickHouse 的 SQL 語句,寫到 ClickHouse server,鏈路較長,性能損耗較大。

10851號高速是在 ClickHouse 內部實現一套 binlog 消費方案,然后根據 event 解析成ClickHouse 內部的 block 結構,再直接寫回到底層存儲引擎,幾乎是最高效的一種實現方式。

基于 database 級的復制,實現了多源復制的功能,如果復制通道壞掉,我們只需在 ClickHouse 側刪除掉 database 然后再重建一次即可,非常方便。

對于單表的數據一致性,未來會實現一個 MySQL CRC 函數,用于校驗 MySQL 與 ClickHouse 的數據一致性。

原文轉自https://bohutang.me/2020/07/26/clickhouse-and-friends-mysql-replication/

標簽: 復制通道 數據一致性 進行判斷

上一篇:天天快資訊:移動研發痛點分析及解決方案
下一篇:每日報道:Spring Integration的網絡通量支持