首頁 > 軟體

Apache Kafka 分割區重分配的實現原理解析

2022-07-13 14:04:20

本文作者為中國移動雲能力中心巨量資料團隊軟體開發工程師孫大鵬,本文結合 2.0.0 版本的 Kafka 原始碼,詳細介紹了 Kafka 分割區副本重分配的流程和邏輯,供大家參考。

一、前言

Kafka 是由 Apache 軟體基金會開發的一個開源流處理平臺,旨在提供一個統一的、高吞吐、低延遲的實時資料處理平臺。其持久化層本質上是一個“按照分散式事務紀錄檔架構的大規模釋出/訂閱訊息佇列”,這使它作為企業級基礎設施來處理流式資料非常有價值。

在 Kafka 中,用 topic 來對訊息進行分類,每個進入到 Kafka 的資訊都會被放到一個 topic 下,同時每個 topic 中的訊息又可以分為若干 partition 以此來提高訊息的處理效率。儲存訊息資料的主機伺服器被命名為 broker。通常為了保證資料的可靠性,資料是以多副本的形式儲存在不同 broker 的不同磁碟上的。對於每一個 topic 的每一個 partition,如果多個副本之間完成了資料同步,保證了資料的一致性,則此時的多個副本所在的 broker 的集合稱為 Isr。同一時間,某個 topic 的某個 partition 的多個副本中僅有一個對外提供服務,此時對外提供服務的 broker 被認定為該 partition 的 leader,使用者端的請求都集中到 leader 上。

對於 2 副本 3 分割區的 topic 其描述資訊及儲存狀態如下所示:

test的描述資訊:
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:min.insync.replic
as=1
Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

test的副本分佈

健康狀態的 Kafka 叢集,對於每個 topic 的每個 partition,其 Isr 都應該等於預期的副本集合(後面均已 Replicas 表示),但在實際場景中,不可避免的存在磁碟/主機故障,或者 由於某些原因需要將部分 broker 節點下線的情況,此時就需要將故障/要下線的 broker 從 Replicas 中移除。對此 Kafka 提供了 kafka-reassign-partitions 工具來進行手動的分割區副本遷移。

二、工具的使用

在 Kafka 的根路徑下,通過執行如下命令,來完成分割區副本的重分配:

./bin/kafka‐reassign‐partitions.sh ‐‐zookeeper localhost:2181/kafka ‐‐reassignment‐json‐file reassign‐topic.json ‐‐execute

其中:reassign‐topic.json 檔案指定了分割區副本的分佈情況,範例如下:

{   
"version": 1,   
"partitions": [       
{         
"topic": "test",         
"partition": 2,         
"replicas": [            
2,             
1         
],         
"log_dirs": [             
"any",             
"any"         
]        
} 
}

檔案中指明瞭將 topic=test,partition=2 的分割區的兩副本分別移動到 brokerId=2 和 brokerId=1 的節點的任意磁碟路徑上。

下面將結合 2.0.0 版本的 Kafka 原始碼簡單的介紹下 Kafka 分割區副本重分配的流程和邏輯。

三、後設資料管理及協調器

在開始之前先簡單介紹下在 Kafka 分割區副本重分配中涉及到的兩個概念:ZooKeeper 和 Kafka Controller。

3.1 ZooKeeper

Kafka 的後設資料,是儲存在 ZooKeeper  中的。Apache ZooKeeper  是一個提供高可靠性的分散式協調服務架構。它使用的資料模型類似於檔案系統的樹形結構,根目錄也是以“/”開始。該結構上的每個節點被稱為 znode,用來儲存一些後設資料協調資訊。同時 ZooKeeper 賦予使用者端監控 znode 變更的能力,即所謂的 Watch 通知功能。一旦 znode 節點被建立、刪除,子節點數量發生變化,或是 znode 所存的資料本身變更, ZooKeeper 會通過節點變更監聽器 (ChangeHandler) 的方式顯式通知使用者端以便使用者端 觸發對應的處理操作。

3.2 Kafka Controller

Kafka Controller 是 Apache Kafka 的核心元件,它的主要作用是在 Apache ZooKeeper 的幫助下管理和協調整個 Kafka 叢集。叢集中任意一臺 Broker 都能充當控制器的角色,但是,在執行過程中,只能有一個 Broker 成為控制器,行使其管理和協調的職責。

四、分割區重分配流程分析

Kafka 的分割區重分配就是在 client、broker 和 controller 的協同執行下完成的。即:

1. 使用者端發起分割區重分配任務,在 ZooKeeper  中建立/admin/reassign_partitions 節點,然 後向涉及的 broker 傳送 alterReplicaLogDirs 請求 

2. controller 監測到 ZooKeeper  中/admin/reassign_partitions 的變化,觸發 Kafka 分割區元 資料的變更維護操作 

3. broker 接收到使用者端傳送的 alterReplicaLogDirs 請求,根據具體任務內容在伺服器端實際完成分割區副本移動

流程總結如下圖所示:

下面將針對這三部分分別展開介紹:

4.1 kafka-reassign-partitions 使用者端

分割區重分配任務是由使用者端發起的,其入口主類為 ReassignPartitionsCommand.scala 中,呼叫 executeAssignment 方法。使用者端的 executeAssignment 方法主要完成了如下操作:

1.解析 json 檔案並進行相關校驗
•讀取 json 檔案內容,校驗“partitions”的“version”,僅為 1 時,繼續執行副本重分 配
•校驗分割區副本數和副本資料路徑數是否一致
•校驗 partition/replica 是否為空/重複
2.檢查待重分配的分割區在叢集中是否存在(根據 zk 中的/brokers/topics/${topic})
3.檢查確認所有目標 broker 均線上(zk 中/brokers/ids 的子 znode 列表)
4.檢查是否已存在分割區副本重分配任務,如果已存在相關任務,則退出
5.將分割區重分配任務記錄到 zk 中,即在 zk 中建立/admin/reassign_partitions,以便 controller 可以發現並協調 broker 進行相關操作
6.根據解析的 json 內容,逐個 topic 向相關的 broker 傳送 alterReplicaLogDirs 請求

使用者端的處理邏輯可總結為如下流程圖:

4.2 controller 維護分割區的後設資料資訊

在 controller 啟動時會建立 partitionReassignmentHandler,kafkaController 主執行緒回撥 onControllerFailover 時,檢測到/admin/reassign_partitions 發生變化時,觸發分割區副本重分配操作,在 maybeTriggerPartitionReassignment 中通過呼叫 onPartitionReassignment 真正執行分割區副本重分配。在 onPartitionReassignment 中定 義了三個概念:

•RAR:指定的分割區副本放置策略
•OAR:原始的分割區副本放置策略
•AR:當前的分割區副本放置策略

onPartitionReassignment 的執行過程可以總結為如下步驟:

檢查指定的分割區副本是否處在 isr 中,如果不在則執行以下前 3 步,否則直接執行第 4 步

1.在 zk 中將 AR 更新為 RAR+OAR (/broker/topics/${topicName})
2.向所有副本(RAR+OAR)中傳送 LeaderAndIsr 請求
3.將 RAR-OAR 的副本狀態置為 NewReplica,等待 NewReplica 中的資料與 leader 中的資料 完成同步
4.等待直到所有 RAR 中的副本完成與 leader 的同步
5.將所有 RAR 的副本置為 OnlineReplica 狀態
6.將 RAR 作為 AR
7.如果當前的 leader 不在 RAR 中,傳送 LeaderAndIsr Request 從 RAR 中選出一個新的 leader;如果當前 leader 在 RAR 中,檢查 leader 狀態,如果 leader 健康則更新 LeaderEpoch,否則重新選擇 leader
8.將 OAR-RAR 的副本置為 Offline 狀態
9.將 OAR-RAR 的副本置為 NonExistentReplica 狀態(真實刪除對應的分割區副本)
10.將 zk 中的 AR 置為 RAR(/brokers/topics/${topicName}資料格式:{"version":1,"partitions":{"0":[${brokerId}]}})
11.更新 zk 中/admin/reassign_partitions 的值,將完成遷移的分割區刪除
12.同步所有 broker,更新後設資料資訊

邏輯流程圖如下:

4.3 broker 端資料跨路徑遷移

底層資料跨路徑遷移,是由 broker 端完成的,broker 接收到使用者端發來的 ALTER_REPLICA_LOG_DIRS 請求後,呼叫 alterReplicaLogDirs 方法,相關流程如下:

1.確保目的路徑/待移動分割區線上
2.如果當前分割區副本的 log 路徑不存在給定的目的路徑並且 futureLogs(用於跨路徑資料遷移的中間過程)也不包含目的路徑,則在記憶體中記錄當前分割區副本和目的 logDir,即標記那些需要進行遷移的分割區副本路徑
3.對於需要移動的分割區副本,目的 broker 的路徑中建立 future Log
4.停止當前 Log 的清理工作,等待 future Log 同步完再清理
5.建立 ReplicaAlterLogDirsThread,逐個 topic 逐個 partition 獲取 fetchOffset、 logStartOffset 、fetchSize 等資料構造 Fetch 請求
6.通過 ReplicaManager.fetchMessages 從分割區副本 leader 獲取資料,完成資料同步

更詳細的處理流程如下圖所示:

五、總結

Kafka 分割區重分配,通過 kafka-reassign-partitions 啟動任務,將任務記錄在後設資料管理器 ZooKeeper  中,Kafka controller 通過對 ZooKeeper  的監測,發現相關任務通過和 broker 的互動按序處理相關的遷移任務,同時 controller 實時維護 ZooKeeper  中的後設資料資訊並進行相關變化的記錄,保證在重分配過程中,不影響 topic 分割區的正常使用,在任務完成後,再由 controller 負責 ZooKeeper  中重分配任務標記的清理,以便使用者端驗證重分配任務的結果。

到此這篇關於Apache Kafka 分割區重分配的實現原理解析的文章就介紹到這了,更多相關Apache Kafka 分割區重分配內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com