首頁 > 軟體

kafka並行寫大訊息異常TimeoutException排查記錄

2022-02-28 10:00:44

前言

先簡單介紹下我們的使用場景,線上5臺Broker節點的kafka承接了所有binlog訂閱的資料,用於Flink元件接收資料做資料中臺的原始資料。昨兒開發反饋,線上的binlog大量報錯,都是kafka的異常,而且都是同一條topic拋的錯,特徵也很明顯,傳送的訊息體非常大,主觀判斷肯定是寫入大訊息導致的超時了,異常詳情如下:

thread:  kafka-producer-network-thread | producer-1
throwable:  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for BIN-LOG-DATA-center-dmz2-TABLE-kk-data-center-ods_contract_finance_info-0 due to 56352 ms has passed since last append

定位異常點

應用拋一個不常見的異常,一般操作是先去百度or谷歌搜尋一番的,就上面這個timeout超時的異常,搜尋引擎的結果都是producer連不上Borker導致的問題,根本不是我們這個場景的,所以其次我們就需要從原始碼中尋找答案了。博主使用的開發工具是IDEA,藉助IDEA很容易定位到異常丟擲點。首先定位TimeoutException異常類,然後按住ctrl鍵,點選這個類,會出現如下圖所有拋TimeoutException異常的點,然後根據異常message內容,尋找相匹配的點選進去就是拋異常的地方了,如圖,紅色箭頭所指即程式碼位置:

分析拋異常的邏輯

程式中的異常,一定是符合某些條件才會丟擲的,想要解決異常,只要讓執行時的環境不滿足拋異常的條件即可,下面就是拋異常的程式碼:

boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";

        boolean expired = expiryErrorMessage != null;
        if (expired)
            abortRecordAppends();
        return expired;
    }

可以看到,我們的異常是在第一個邏輯判斷時候就滿足了所以拋異常了。在此處有可能會丟擲三個不同的timeout異常,用中文語意翻譯條件分別是:

  • 沒設定重試,並且傳送批次(batch.size)滿了,並且設定請求超時時間(request.timeout.ms)小於【當前時間減去最後追加批次的時間】
  • 沒設定重試,並且設定請求超時時間(request.timeout.ms)小於【建立批次時間減去設定的等待傳送的時間(linger.ms)】
  • 設定重試,並且設定請求超時時間(request.timeout.ms)小於【當前時間-最後重試時間-重試需要等待的時間(retry.backoff.ms)】

上面括號中的引數就是kafka producer中設定的相關的引數,這些引數都沒有重新設定過,batch.size預設是10kb大小,而引發報錯的訊息都是36kb的大小,預設的request.timeout.ms超時設定是30s,所以在這個判斷可能過期了的方法中,引發我們異常的主要原因是batch.size和request.timeout.ms的引數設定問題了。

真實原因-解決方案

從上面程式碼看表面原因是引數設定不夠了,實際上呢,博主使用kafka-test啟動了五個Borker叢集做復現驗證測試,測試寫入相同的36kb的message,在所有設定也保持預設的情況下,也根本毫無壓力。後面查詢相關的錯誤紀錄檔,發現所有的TimeoutException集中在幾乎同一時刻,經查明,是因為業務批次匯入了資料到mysql中,造成binlog訊息突然增加,高並行的往kafka寫大訊息導致Borker處理不過來,造成的TimeoutException超時,所以真正解決問題也可以從兩個方面入手:

  • 伺服器端:增加Borker,並設定多個TopicPartition,平攤寫入壓力,這個是根本的解決問題
  • 使用者端:加大request.timeout.ms、batch.size引數,或者開啟訊息重試,這種方案治標不治本,但是也能大概率的減少因為此類場景導致的TimeoutException

結語

異常不可怕,所有異常都是人為拋的,都是有既定的觸發條件的,只要定位到觸發異常的條件對症下藥即可解決問題。不過博主五年來的經驗發現,紀錄檔列印真的是門藝術,在這個方面,Spring框架和Dubbo以及Apollo設定中心框架就是紀錄檔列印的典範,不管發生什麼異常,紀錄檔裡都會輸出詳細的上下文環境,異常的原因,建議的解決方法,如果涉及到相關的設定,也會列印該怎麼設定最好。反觀kafka client的這條TimeoutException就顯的資訊量有點過少了,如果能把相關的設定資訊和排查的方向寫明會更好。最後安利一波kafka test,輕鬆搭建多Borker的kafka叢集,一個註解就ok了。詳情參考我的這篇博文深入研究spring boot整合kafka之spring-kafka底層原理

以上就是kafka並行寫大訊息異常TimeoutException排查記錄的詳細內容,更多關於kafka並行異常TimeoutException排查的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com