首頁 > 軟體

Zabbix對Kafka topic積壓資料監控的解決方案

2022-07-01 22:03:11

Kafka

Apache Kafka是一個分散式釋出-訂閱訊息系統和一個強大的佇列,可以處理大量的資料,並使您能夠將訊息從一個端點傳遞到另一個端點。

Kafka適合離線和線上訊息消費。

Kafka訊息保留在磁碟上,並在群集內複製以防止資料丟失。Kafka構建在ZooKeeper同步服務之上。它與Apache Storm和Spark非常好地整合,用於實時流式資料分析。

需求

雖然我們在生產環境中可以使用Kafka對業務進行解耦,但這並不意味著業務系統就高枕無憂了。消費者的消費速度是否能夠匹配生產速度、過多的訊息積壓這些都可能影響業務系統的正常執行。

關於業務系統執行狀態,雖然我們可以通過業務監控來確定,但是業務監控一般是要對資料進行聚合分析並達到一定的閾值才能觸發告警。因此業務監控告警通知時,業務實際已經有問題一段時間了。為應對這種情況,我們一般需要和系統監控進行互補。系統監控會週期性的對硬體、網路、伺服器、應用等不同維度進行監控告警,一旦某個元件的狀態有問題,那麼系統監控會先預警,然後業務系統才可能進一步預警。經過不同監控系統的告警升級,才更能準確的反映業務系統的執行狀態。

話說回來,對於上線後的Kafka叢集,我們除了要對服務的可用性進行監控外,還要對Topic的消費情況進一步監控。

解決方案

1.監控分析

Lag作為監控指標,它直接反映了一個消費者的執行情況。一個正常工作的消費者,它的Lag值應該很小,甚至是接近於0的,這表示該消費者能夠及時地消費生產者生產出來的訊息,滯後程度很小。

因此我們將Topic作為我們的監控項,當相關的Topic Lag達到某一閾值時進行多渠道告警。

另經過Kafka執行機制的我們知道:

  • 每個Topic內部需要按照Partition進行再次分割區
  • 同一個topic的partition只能由同一個消費者組(group)內的一個consumer來消費,分割區數決定了同組消費者個數的上限

通過以上“Topic-Partition-消費者組(group)”之間的關係,為了便於我們通過告警資訊更快的定位故障點:

  • 監控項命名規則:消費者組(Group)/Topic/Partition,三者組成唯一的監控項;
  • 監控項Lag值:獲取業務系統中某個消費者組的特定Topic所有分割區的Lag值進行告警;

2.監控思路

(1) 消費者組管理

通過Kafka自帶的kafka-consumer-groups.sh指令碼,我們可以輕鬆獲取檢視指定消費組 消費的所有Topic、及所在分割區、最新消費offset、Log最新資料offset、Lag還未消費數量、消費者ID等等資訊

# 檢視消費者組的topic 消費狀態
bash kafka-consumer-groups.sh --bootstrap-server 192.168.3.55:9090 --describe --group test2_consumer_group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 1 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 2 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 3 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1

(2)分割區自動發現

對於Kafka topic的監控我們使用Zabbix監控平臺,考慮到後續業務系統的持續性接入,我們通過Zabbix自動發現實現對特定消費者組(Group)和Topic下所有分割區自動發現:

# 自動發現
vim consumer-groups.conf
#按消費者組(Group)|Topic格式,寫入自動發現組態檔
test-group|test
# 執行指令碼自動發現指定消費者和topic的分割區
bash consumer-groups.sh discovery
{
    "data": [
        { "{#GROUP}":"test-group", "{#TOPICP}":"test", "{#PARTITION}":"0" },
        { "{#GROUP}":"test-group", "{#TOPICP}":"test", "{#PARTITION}":"1" },
        { "{#GROUP}":"test-group", "{#TOPICP}":"test", "{#PARTITION}":"3" },
        { "{#GROUP}":"test-group", "{#TOPICP}":"test", "{#PARTITION}":"2" }
    ]
}

自動發現中的GROUP、TOPIC、PARTITION 這三個資訊可以用於進一步過濾不同的分割區的Lag值和監控系統中的監控項名稱:

  • test-group/test/分割區0
  • test-group/test/分割區1
  • test-group/test/分割區2
  • test-group/test/分割區3
  • 等其他 test-group/test相關的所有分割區

(3)獲取監控項“test-group/test/分割區X”的Lag

# 獲取分割區0 lag
bash consumer-groups.sh lag 0
# 獲取分割區1 lag
bash consumer-groups.sh lag 1
# 獲取分割區2 lag
bash consumer-groups.sh lag 2
# 獲取分割區3 lag
bash consumer-groups.sh lag 3

(4)最終指令碼

vim consumer-groups.sh
#!/bin/bash
#comment: 根據消費者組監控topic lag,進行監控告警
#組態檔說明
#消費者組|Topic
#test-group|test
#獲取topic 資訊
cal_topic() {
    if [ $# -ne 2 ]; then
        echo "parameter num error, 讀取topic資訊失敗"
        exit 1
    else
        /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.55:9092 --describe --group $1 |grep -w $2
    fi
}
#topic+分割區自動發現
topic_discovery() {
    printf "{n"
    printf "t"data": [n"
    for line in `cat /data/scripts/consumer-groups.conf`
    do
        group=`echo ${line} | awk -F'|' '{print $1}'`
        topic=`echo ${line} | awk -F'|' '{print $2}'`
        cal_topic $group $topic > /tmp/consumer-group-tmp
        count=`cat /tmp/consumer-group-tmp|wc -l`
        n=0
        while read line
        do
             n=`expr  $n + 1`
             #判斷最後一行
             if [ $n -eq $count ]; then
                 topicp=`echo $line | awk '{print $1}'`
                 partition=`echo $line  | awk '{print $2}'`
                 printf "tt{ "{#GROUP}":"${group}", "{#TOPICP}":"${topicp}", "{#PARTITION}":"${partition}" }n"
             else
                 topicp=`echo $line | awk '{print $1}'`
                 partition=`echo $line  | awk '{print $2}'`
                 printf "tt{ "{#GROUP}":"${group}", "{#TOPICP}":"${topicp}", "{#PARTITION}":"${partition}" },n"
             fi
        done < /tmp/consumer-group-tmp
    done
    printf "t]n"
    printf "}n"
}


if [ $1 == "discovery" ]; then
    topic_discovery
elif [ $1 == "lag" ];then
    cat /tmp/consumer-group-tmp |awk -v p=$2 '{if($2==p){print $5}}'
else
    echo "Usage: /data/scripts/consumer-group.sh discovery | lag"
fi

3.Zabbix 自動發現設定

1.自動發現設定

2.監控項原型 通過消費者組、Topic、Partition 組成監控項名稱,告警資訊中的名稱能夠幫助我們快定位故障點。

3.觸發器 我們lag的初始閾值設定為50,可根據時間情況進行調整。

4.告警資訊

告警主機:Kafka_192.168.3.55
主機IP:192.168.3.55
主機組:Kafka
告警時間:2022.03.21 00:23:10
告警等級:Average
告警資訊:test-group/test/分割區1:資料積壓62
告警專案:topic_lag[test,1]
問題詳情:
test-group/test/1: 62

到此這篇關於Zabbix對Kafka topic積壓資料監控的文章就介紹到這了,更多相關Zabbix Kafka 監控內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com