首頁 > 軟體

Flink 在有讚的實踐和應用

2021-05-27 20:32:45

簡介:今天主要分享的內容是 Flink 在有讚的實踐和應用。內容包括:Flink 的容器化改造和實踐Flink SQL 的實踐和應用未來規劃。

一、Flink 的容器化改造和實踐

1. 有讚的叢集演進歷史

2014 年 7 月,第一個 Storm 任務正式上線;2016 年,引入 Spark Streaming, 運行在 Hadoop Yarn;2018 年,引入了 Flink,作業模式為 Flink on Yarn Per Job;2020 年 6 月,實現了 100% Flink Jar 任務 K8s 化, K8s 作為 Flink Jar 預設計算資源,Flink SQL 任務 On Yarn,Flink 統一實時開發;2020 年 11 月,Storm 叢集正式下線。原先的 storm 任務全部都遷移到了 Flink;2021 年,我們打算把所有的 Flink 任務 K8s 化。

2. Flink 在內部支援的業務場景

Flink 支援的業務場景有風控,埋點的實時任務,支付,演算法實時特徵處理,BI 的實時看板,以及實時監控等等。目前的實時任務規模有 500+。

3. 有贊在 Flink on Yarn 的痛點

主要有三部分:

第一,CPU 沒有隔離。Flink On Yarn 模式,CPU 沒有隔離,某個實時任務造成某臺機器 CPU 使用過高時, 會對該機器其他實時任務造成影響;第二,大促擴縮容成本高。Yarn 和 HDFS 服務使用物理機,物理機在大促期間擴縮容不靈活,同時需要投入一定的人力和物力;第三,需要投入人力運維。公司底層應用資源統一為 K8S,單獨再對 Yarn 叢集運維,會再多一類叢集的人力運維成本。

4. Flink on k8s 相對於 Yarn 的優勢

可以歸納為 4 點:

第一,統一運維。公司統一化運維,有專門的部門運維 K8S;第二,CPU 隔離。K8S Pod 之間 CPU 隔離,實時任務不相互影響,更加穩定;第三,儲存計算分離。Flink 計算資源和狀態儲存分離,計算資源能夠和其他元件資源進行 混部,提升機器使用率;第四,彈性擴縮容。大促期間能夠彈性擴縮容,更好的節省人力和物力成本。

5. 實時叢集的部署情況

總體上分為三層。第一層是儲存層;第二層是實時計算資源層;第三層是實時計算引擎層。

儲存層主要分為兩部分:第一個就是雲盤,它主要儲存 Flink 任務本地的狀態,以及 Flink 任務的日誌;第二部分是實時計算 HDFS 叢集,它主要儲存 Flink 任務的遠端狀態。第二層是實時計算的資源層,分為兩部分:一個是 Hadoop Yarn 叢集;另一個是 Flink k8s 叢集,再往下細分,會有 Flink k8s 和離線的 HDFS 混部叢集的資源,還有 Flink k8s 單獨類型的叢集資源。最上層有一些實時 Flink Jar,spark streaming 任務,以及 Flink SQL 任務。

我們考慮混部的原因是,離線 HDFS 叢集白天機器使用率不高。把離線 HDFS 叢集計算資源給實時任務,離線使用內部其他元件的彈性計算資源,從而提升機器使用率,更好的達到降本效果。

6. Flink on k8s 的容器化流程

如下圖所示:

第一步,實時平臺的 Flink Jar 任務提交,Flink Jar 任務版本管理,Docker Flink 任務映象構建,上傳映象到 Docker 映象倉庫;第二步,任務啟動;第三步,yaml 檔案創建;第四步,和 k8s Api Server 之間進行命令互動;第五步,從 Docker 映象倉庫拉取 Flink 任務映象到 Flink k8s 叢集;最後,任務運行。這邊有幾個 tips:作業模式為 Flink Standalone Per Job 模式;每個 Flink Jar 任務一個映象,通過任務名稱 + 時間截作為映象的版本;JobManager 需要創建為 Deployment 而不是 Job 類型;Dockerfile 指定 HADOOP_USER_NAME,與線上任務保持一致。

7. 在 Flink on k8s 的一些實踐

第一個實踐是解決資源少配任務無法啟動這個問題。先來描述一下問題,Flink on k8s 非雲原生,無法做到實時任務資源按需申請。當用戶在平臺配置的資源少於實時任務真實使用的資源時(比如使用者程式碼寫死併發度,但使用者配置的併發度小於該值),會出現實時任務無法啟動的問題。針對這個問題,我們內部增加了一種 Flink Jar 任務併發度的自動檢測機制。它的主要流程如下圖所示。首先,使用者會在我們平臺去提交 Flink Jar 作業,當他提交完成之後,在後臺會把 Jar 作業以及運行參數,構建 PackagedProgram。通過 PackagedProgram 獲取到任務的預執行計劃。再通過它獲取到任務真實的併發度。如果使用者在程式碼裡配置的併發度小於平臺端配置的資源,我們會使用在平臺端的配置去申請資源,然後進行啟動;反之,我們會使用它真實的任務併發度去申請資源,啟動任務。

第二個實踐是 Flink on k8s 任務的資源分析工具。首先來說一下背景,Flink k8s 任務資源是使用者自行配置,當配置的併發度或者記憶體過大時,存在計算資源浪費的問題,從而會增加底層機器成本。怎麼樣去解決這個問題,我們做了一個平臺管理員的工具。對於管理員來說,他可以從兩種視角去看這個任務的資源是否進行了一個超配:第一個是任務記憶體的視角。我們根據任務的 GC 日誌,通過一個開源工具 GC Viewer,拿到這一個實時任務的記憶體使用指標;第二個是訊息處理能力的視角。我們在 Flink 源碼層增加了資料來源輸入 record/s 和任務訊息處理時間 Metric。根據 metric 找到訊息處理最慢的 task 或者 operator,從而判斷併發度配置是否合理。管理員根據記憶體分析指標以及併發度合理性,結合優化規則,預設定 Flink 資源。然後我們會和業務方溝通與調整。右圖是兩種分析結果,上面是 Flink on K8S pod 記憶體分析結果。下面是 Flink K8S 任務處理能力的分析結果。最終,我們根據這些指標就可以對任務進行一個資源的重新調整,降低資源浪費。目前我們打算把它做成一個自動化的分析調整工具。

接下來是 Flink on K8s 其他的相關實踐。第一,基於 Ingress Flink Web UI 和 Rest API 的使用。每個任務有一個 Ingress 域名,始終通過域名訪問 Flink Web UI 以及 Resti API 使用;第二,掛載多個 hostpath volume,解決單塊雲盤 IO 限制。單塊雲盤的寫入頻寬以及 IO 能力有瓶頸,使用多塊雲盤,降低雲盤 Checkpoint 狀態和本地寫入的壓力;第三,Flink 相關通用配置 ConfigMap 化、Flink 映象上傳成功的檢測。為 Filebeat、Flink 作業通用配置,創建 configmap,然後掛載到實時任務中,確保每個 Flink 任務映象都成功上傳到映象倉庫;第四,HDFS 磁碟 SSD 以及基於 Filebeat 日誌採集。SSD 磁碟主要是為了降低磁碟的 IO Wait 時 間,調整 dfs.block.invalidate.limit,降低 HDFS Pending delete block 數。任務日誌使用 Filebeat 採集,輸出到 kafka,後面通過自定義 LogServer 和離線公用 LogServer 檢視。

8. Flink on K8s 當前面臨的痛點

第一,JobManager HA 問題。JobManager Pod 如果掛掉,藉助於 k8s Deployment 能力,JobManager 會根據 yaml 檔案重啟,狀態可能會丟失。而如果 yaml 配置 Savepoint 恢復,則訊息可能大量重複。我們希望後續藉助於 ZK 或者 etcd 支援 Jobmanager HA;第二,修改程式碼,再次上傳時間久。一旦程式碼修改邏輯,Flink Jar 任務上傳時間加上打映象時間可能是分鐘級別,對實時性要求比較高的業務或許有影響。我們希望後續可以參考社群的實現方式,從 HDFS 上面拉取任務 Jar 運行;第三,K8S Node Down 機, JobManager 恢復慢。一旦 K8S Node down 機後, Jobmanager Pod 恢復運行需要 8分鐘左右,主要是 k8s 內部異常發現時間以及作業啟動時間,對部分業務有影響,比如CPS實時任務。如何解決,平臺端定時檢測 K8s node 狀態,一旦檢測到 down 機狀態,將 node 上面有 JobManager 所屬的任務停止掉,然後從其之前 checkpoint 恢復;第四,Flink on k8s 非雲原生。當前通過 Flink Jar 任務併發度自動檢測工具解決資源少配無法啟動問題,但是如果任務的預執行計劃無法獲取,就無法獲取到程式碼配置的併發度。我們的思考是: Flink on k8s 雲原生功能以及前面的 1、2 問題,如果社群支援的比較快速的話,後面可能會考慮將 Flink 版本與社群版本對齊。

9. Flink on K8s的一些方案推薦

第一種方案,是平臺自己去構建和管理任務的映象。優點是:平臺方對於構建映象,以及運行實時任務整體流程自我掌控,具體問題能夠及時修正。缺點是:需要對 Docker 以及 K8S 相關技術要有一定了解,門檻使用比較高,同時需要考慮非雲原生相關問題。它的適用版本為 Flink 1.6 以上。第二種方案,Flink k8s Operator。優點是:對使用者整體封裝了很多底層細節,使用門檻相對降低一些。缺點是:整體使用沒有第一種方案那麼靈活,一旦有問題,由於底層使用的是其封裝的功能,底層不好修改。它的適用版本為Flink 1.7 以上。最後一種方案是,基於社群 Flink K8s 功能。優點是:雲原生,對於資源的申請方面更加友好。同時,使用者使用會更加方便,遮蔽很多底層實現。缺點是:K8s 雲原生功能還是實驗中的功能,相關功能還在開發中,比如 k8s Per job 模式。它的適用版本為Flink 1.10 以上。

二、Flink SQL 實踐和應用

1. 有贊 Flink SQL 的發展歷程

2019 年 9 月,我們對 Flink 1.9 、1.10 SQL 方面的能力進行研究和嘗試,同時增強了一些 Flink SQL 功能。2019 年 10 月,我們進行了 SQL 功能驗證,基於埋點實時需求,驗證 Flink SQL Hbase 維表關聯功能,結果符合預期。2020 年 2 月,我們對 SQL 的功能進行了擴展,以 Flink 1.10 作為 SQL 計算引擎,進行 Flink SQL 功能擴展開發和優化,實時平臺支援全 SQL 化開發。2020 年 4 月,開始支援實時數倉、有贊教育、美業、零售等相關實時需求。2020 年 8 月,新版的實時平臺才開始正式上線,目前主推 Flink SQL 開發我們的實時任務。

2. 在 Flink SQL 方面的一些實踐

主要分為三個方面:

第一,Flink Connector 的實踐包括:Flink SQL 支援 Flink NSQ Connector、Flink SQL 支援 Flink HA Hbase Sink 和維表、Flink SQL 支援無密 Mysql Connector、Flink SQL 支援標準輸出(社群已經支援)、Flink SQL 支援 Clickhouse Sink;第二,平臺層的實踐包括:Flink SQL 支援 UDF 以及 UDF 管理、支援任務從 Checkpoint 恢復、支援冪等函數、支援 Json 相關函數等、支援 Flink 運行相關參數配置,比如狀態時間設定,聚合優化參數等等、Flink 實時任務血緣資料自動化採集、Flink 語法正確性檢測功能;第三,Flink Runtime的實踐包括:Flink 源碼增加單個Task 以及 Operator 單條記錄處理時間指標;修復 Flink SQL 可撤回流 TOP N 的BUG。

3. 業務實踐

第一個實踐是我們內部的客服機器人實時看板。流程分為三層:第一層是實時資料來源,首先是線上的 MySQL 業務表,我們會把它的 Binlog 通過 DTS 服務同步到相應的 Kafka Topic;實時任務的 ODS 層有三個 Kafka Topic;在實時 DWD 層,有兩個 Flink SQL 任務。Flink SQL A 消費兩個 topic,然後把這兩個 topic 裡面的資料去通過 Interval Join,根據一些視窗的作用關聯到對應的資料。同時,會對這個實時任務設定狀態的保留時間。Join 之後,會去進行一些 ETL 的加工處理,最終會把它的資料輸入到一個 topic C。另外一個實時任務 Flink SQL B 消費一個 topic,然後會對 topic 裡面的資料進行清洗,然後到 HBase 裡面去進行一個維表的關聯,去關聯它所需要的一些額外的資料,關聯的資料最終會輸入到 topic D。在上游,Druid 會消費這兩個 topic 的資料,去進行一些指標的查詢,最終提供給業務方使用。

第二個實踐是實時使用者行為中間層。使用者在我們平臺上面會去搜索、瀏覽、加入購物車等等,都會產生相應的事件。原先的方案是基於離線來做的。我們會把資料落庫到 Hive 表,然後演算法那邊的同學會結合使用者特徵、機器學習的模型、離線的資料去生成一些使用者評分預估,再把它輸入到 HBase。在這樣的背景下面,會有如下訴求:當前的使用者評分主要是基於離線任務,而演算法同學希望結合實時的使用者特徵,更加及時、準確的提高推薦精準度。這其實就需要構建一個實時的使用者行為中間層,把使用者產生的事件輸入到 Kafka 裡面,通過 Flink SQL 作業對這些資料進行處理,然後把相應的結果輸出到 HBase 裡面。演算法的同學再結合演算法模型,實時的更新模型裡面的一些參數,最終實時的進行使用者的評分預估,也會落庫到 HBase,然後到線上使用。使用者行為中間層的構建流程分為三個步驟:第一層,我們的資料來源在 Kafka 裡面;第二層是 ODS 層,在 Flink SQL 作業裡面會有一些流表的定義,一些 ETL 邏輯的處理。然後去定義相關的 sink 表、維表等等。這裡面也會有一些聚合的操作,然後輸入到 Kafka;在 DWS 層,同樣有使用者的 Flink SQL 作業,會涉及到使用者自己的 UDF Jar,多流 Join,UDF 的使用。然後去讀取 ODS 層的一些資料,落庫到 HBase 裡面,最終給演算法團隊使用。這裡有幾個實踐經驗:第一,Kafka Topic、Flink 任務名稱,Flink SQL Table 名稱,按照數倉命名規範。第二,指標聚合類計算,Flink SQL 任務要設定空閒狀態保留時間,防止任務狀態無限增大。第三,如果存在資料傾斜或者讀狀態壓力較大等情況,需要配置 Flink SQL 優化參數。

4. 在 HAHBase Connector 的實踐

社群 HBase Connector 資料關聯或者寫入是單 HBase 叢集使用,當 HBase 叢集不可用時,實時任務資料的寫入或者關聯會受到影響,從而可能會影響到業務使用。至於怎麼樣去解決這個問題。首先,在 HBase 方面有兩個叢集,主叢集和備叢集。它們之間通過 WAL 進行主從的複製。Flink SQL 作業先寫入主叢集,當主叢集不可用的時候,自動降級到備叢集,不會影響到線上業務的使用。

5. 無密 Mysql Connector 和指標擴展實踐

左圖是 Flink 無密 Mysql Sink 語法,解決的問題包括三點:

第一,Mysql 資料庫使用者名和密碼不以明文方式向外進行暴露和儲存;第二,支援 Mysql 使用者名和密碼週期性更新;第三,內部自動根據使用者名鑑定表許可權使用。這樣做最主要的目的還是保證實時任務資料庫使用更安全。

然後是左下圖,我們在 Flink 源碼層面增加 Task 和 Operator 單條訊息處理時間 Metric。目的是幫助業務方,根據訊息處理時間的監控指標,排查和優化 Flink 實時任務。

6. Flink 任務血緣元資料自動化採集的實踐

Flink 任務血緣元資料採集的流程如下圖所示,平臺啟動實時任務後,根據當前任務是 Flink Jar 任務,還是 Flink SQL 任務,分別走兩條不同的路徑,來獲取任務的血緣資料,再把血緣資料上報元資料系統。這樣做的價值有兩點:

第一,幫助業務方瞭解實時任務加工鏈路。業務方能夠更清晰的認知實時任務之間的關係和影響,當操作任務時,能夠及時通知下游其他業務方;第二,更好的構建實時數倉。結合實時任務血緣圖,提煉實時資料公共層,提升複用性,更好的構建實時數倉。

三、未來規劃

最後是未來的規劃,包括四點:

第一,推廣 Flink 實時任務 SQL 化。推廣 Flink SQL 開發實時任務,提升 Flink SQL 任務比例。第二,Flink 任務計算資源自動優化配置。從記憶體、任務處理能力、輸入速率等,對任務資源進行分析,對資源配置不合理任務自動化配置,從而降低機器成本。第三,Flink SQL 任務 k8s 化以及 K8s 雲原生。Flink 底層計算資源統一為 k8s,降低運維成本,Flink k8s 雲原生,更合理使用 K8s 資源。第四,Flink 與資料湖以及 CDC 功能技術的調研。新技術的調研儲備,為未來其他實時需求奠定技術基礎。

本文為阿里雲原創內容,未經允許不得轉載。


IT145.com E-mail:sddin#qq.com