
本文作者:劉濤,阿里云智能技術專家。
(資料圖片僅供參考)
01Compaction Topic介紹
一般來說,消息隊列提供的數據過期機制有如下幾種,比如有基于時間的過期機制——數據保存多長時間后即進行清理,也有基于數據總量的過期機制——數據分區數據量達到一定值后進行清理。
而 Compaction Topic 是一種基于 key 的數據過期機制,即對于相同 key 的數據只保留最新值。
該特性的應用場景主要為維護狀態信息,或者在需要用到 KV 結構時,可以通過 Compaction Topic 將 key-value 信息直接保存到 MQ,從而解除對外部數據庫的依賴。比如維護消費位點,可以將消費組加分區作為 key ,將消費位點做 offset ,以消息形式發送到 MQ ,壓縮之后,消費時獲取最新 offset 信息即可。另外,像 connect 里的 source 信息比如 Binlog 解析位點或其他 source 處理的位點信息均可存到 Compaction Topic。同時 Compaction Topic 也支持 存儲 RSQLDB 與 RStreams 的 checkpoint 信息。
02需要解決的問題
Compaction 過程中,需要解決如下幾個問題:
第一,數據寫入過程中,數據如何從生產者發送到 broker 并且最終落盤,數據主備之間的 HA 如何保證?
第二,整個 compaction 的流程包括哪幾個步驟?如果數據量太大,如何優化?
第三,數據消費時如何索引消息?如果找不到消息指定的 offset 消息,如何處理?
第四,如果有機器故障,如何恢復老數據?
03方案設計與實現
第一,數據如何寫入。
首先寫入到 CommitLog,主要為復用 CommitLog 本身的 HA 能力。然后通過 reput線程將 CommitLog 消息按照 Topic 加 partition 的維度拆分到不同文件里,按分區整理消息,同時生成索引。這樣最終消息就按 Topic 加 partition的粒度做了規整。
在 compaction 過程中,為什么不在原先的 commitLog 上做規整,而是再額外按分區做規整?原因如下:
所有數據都會寫到 CommitLog ,因此單個 Topic 的數據不連續。如果要遍歷單個 topic 的所有數據,可能需要跳著讀,這樣就會導致大量冷讀,對磁盤 IO 影響比較大。CommitLog 數據有自動過期機制,會將老數據刪除,因此不能將數據直接寫到 CommitLog,而 CompactionLog 里的老數據為按 key 過期,不一定會刪除。compact 以分區為維度進行。如果多個分區同時做 compact ,效率較低。因為很多分區的 key 同時在一個結構里,會導致同一個分區能夠 compact 的數據比較少, 并且 compact 之后也需要重新寫一份么,因此,索性就在 compact 之前將消息通過 reput service 重新歸整一遍。Compact 流程如下:
第一步,確定需要做 compaction 的數據文件列表。一般大于兩個文件,需要排除當前正在寫的文件。
第二步,將上一步篩選出的文件做遍歷,得到 key 到 offset 的映射關系。
第三步,根據映射關系將需要保留的數據重新寫到新文件。
第四步,用新文件替換老文件,將老文件刪除。
第二步的構建 OffsetMap 主要目的在于可以知道哪文件需要被保留、哪文件需要被刪除,以及文件的前后關系,這樣就可以確定寫入的布局,確定布局之后,就可以按照append 的方式將需要保留的數據寫到新文件。
此處記錄的并非 key 到 value 的信息,而是 key 到 Offset 的信息。因為 value 的數據 body 可能較長,比較占空間,而 offset 是固定長度,且通過 offset 信息也可以明確消息的先后順序。另外,key 的長度也不固定,直接在 map 存儲原始 key 并不合適。因此我們將 MD5 作為新 key ,如果 MD5 相同 key 認為也相同。
做 compaction 時會遍歷所有消息,將相同 key 且 offset 小于 OffsetMap 的值刪除。最終通過原始數據與 map 結構得到壓縮之后的數據文件。
上圖為目錄結構展示。寫入時上面為數據文件,下面為索引,要 compact 的是標紅兩個文件。壓縮后的文件存儲于子目錄,需要將老文件先標記為刪除,將子目錄文件與 CQ 同時移到老的根目錄。注意,文件與 CQ 文件名一一對應,可以一起刪除。
隨著數據量越來越大,構建的 OffsetMap 也會越來越大,導致無法容納。
因此不能使用全量構建方式,不能將所有要 compact 的文件的 OffsetMap 一次性構建,需要將全量構建改為增量構建,構建邏輯也會有小的變化。
第一輪構建:如上圖,先構建上面部分的 OffsetMap ,然后遍歷文件,如果 offset 小于 OffsetMap 中對應 key 的 offset 則刪除,如果等于則保留。而下面部分的消息的offset 肯定大于 OffsetMap 內的 offset ,因此也需要保留。
第二輪構建:從上一次結束的點開始構建。如果上一輪中的某個 key 在新一輪中不存在,則保留上一輪的值;如果存在,則依然按照小于刪除、大于保留的原則進行構建。
將一輪構建變為兩輪構建后, OffsetMap 的大小顯著降低,構建的數據量也顯著降低。
原先的索引為 CommitLog Position、Message Size 和 Tag Hush,而現在我們復用了bcq 結構。由于 Compact 之后數據不連續,無法按照先前的方式直接查找數據所在物理位置。由于 queueOffset 依然為單調增排列,因此可以通過二分查找方式將索引找出。
二分查找需要 queueoffset 信息,索引結構也會發生變化,而 bcq 帶有 queueoffse 信息,因此可以復用 bcq 的結構。
Queueoffset 在 compact 前后保持不變。如果 queueoffset 不存在,則獲取第一個大于 queueoffset 的消息,然后從頭開始將所有全量數據發送給客戶端。
機器故障導致消息丟失時,需要做備機的重建。因為 CommitLog 只能恢復最新數據,而 CompactionLog 需要老數據。之前的 HA 方式下,數據文件可能在 compact 過程中被被刪除,因此也不能基于復制文件的方式做主備間同步。
因此,我們實現了基于 message 的復制。即模擬消費請求從 master 上拉取消息。拉取位點一般從 0 開始,大于等于 commitLog 最小offset 時結束。拉取結束之后,再做一次 force compaction 將 CommitLog 數據與恢復時的數據做一次 compaction ,以保證保留的數據是被壓縮之后的數據。后續流程不變。
04使用說明
生產者側使用現有生產者接口,因為要按分區做 compact ,因此需要將相同 key 路由到相同的 MessageQueue,需要自己實現相關算法。
消費者側使用現有消費者接口,消費到消息后,存入本地類 Map 結構中再進行使用。我們的場景大多為從頭開始拉數據,因此需要在開始時將消費位點重置到0。拉取完以后,將消息 key 與 value 傳入本地 kv 結構,使用時直接從該結構拿取即可。