首頁 > 軟體

Apache Hudi資料佈局黑科技降低一半查詢時間

2022-03-30 19:00:02

1. 背景

Apache Hudi將流處理帶到巨量資料,相比傳統批次處理效率高一個數量級,提供了更新鮮的資料。在資料湖/倉庫中,需要在攝取速度和查詢效能之間進行權衡,資料攝取通常更喜歡小檔案以改善並行性並使資料儘快可用於查詢,但很多小檔案會導致查詢效能下降。在攝取過程中通常會根據時間在同一位置放置資料,但如果把查詢頻繁的資料放在一起時,查詢引擎的效能會更好,大多數系統都傾向於支援獨立的優化來提高效能,以解決未優化的資料佈局的限制。本部落格介紹了一種稱為Clustering[RFC-19]的服務,該服務可重新組織資料以提高查詢效能,也不會影響攝取速度。

2. Clustering架構

Hudi通過其寫入使用者端API提供了不同的操作,如insert/upsert/bulk_insert來將資料寫入Hudi表。為了能夠在檔案大小和攝取速度之間進行權衡,Hudi提供了一個hoodie.parquet.small.file.limit設定來設定最小檔案大小。使用者可以將該設定設定為0以強制新資料寫入新的檔案組,或設定為更高的值以確保新資料被"填充"到現有小的檔案組中,直到達到指定大小為止,但其會增加攝取延遲。

為能夠支援快速攝取的同時不影響查詢效能,我們引入了Clustering服務來重寫資料以優化Hudi資料湖檔案的佈局。

Clustering服務可以非同步或同步執行,Clustering會新增了一種新的REPLACE操作型別,該操作型別將在Hudi後設資料時間軸中標記Clustering操作。

總體而言Clustering分為兩個部分:

•排程Clustering:使用可插拔的Clustering策略建立Clustering計劃。•執行Clustering:使用執行策略處理計劃以建立新檔案並替換舊檔案。

2.1 排程Clustering

排程Clustering會有如下步驟

•識別符合Clustering條件的檔案:根據所選的Clustering策略,排程邏輯將識別符合Clustering條件的檔案。•根據特定條件對符合Clustering條件的檔案進行分組。每個組的資料大小應為targetFileSize的倍數。分組是計劃中定義的"策略"的一部分。此外還有一個選項可以限制組大小,以改善並行性並避免混排大量資料。•最後將Clustering計劃以avro後設資料格式儲存到時間線。

2.2 執行Clustering

•讀取Clustering計劃,並獲得clusteringGroups,其標記了需要進行Clustering的檔案組。•對於每個組使用strategyParams範例化適當的策略類(例如:sortColumns),然後應用該策略重寫資料。•建立一個REPLACE提交,並更新HoodieReplaceCommitMetadata中的後設資料。

Clustering服務基於Hudi的MVCC設計,允許繼續插入新資料,而Clustering操作在後臺執行以重新格式化資料佈局,從而確保並行讀寫者之間的快照隔離。

注意:現在對錶進行Clustering時還不支援更新,將來會支援並行更新。

2.3 Clustering設定

使用Spark可以輕鬆設定內聯Clustering,參考如下範例

import org.apache.hudi.QuickstartUtils._</code><code>import scala.collection.JavaConversions._</code><code>import org.apache.spark.sql.SaveMode._</code><code>import org.apache.hudi.DataSourceReadOptions._</code><code>import org.apache.hudi.DataSourceWriteOptions._</code><code>import org.apache.hudi.config.HoodieWriteConfig._</code><code>val df =  //generate data frame</code><code>df.write.format("org.apache.hudi").</code><code>        options(getQuickstartWriteConfigs).</code><code>        option(PRECOMBINE_FIELD_OPT_KEY, "ts").</code><code>        option(RECORDKEY_FIELD_OPT_KEY, "uuid").</code><code>        option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").</code><code>        option(TABLE_NAME, "tableName").</code><code>        option("hoodie.parquet.small.file.limit", "0").</code><code>        option("hoodie.clustering.inline", "true").</code><code>        option("hoodie.clustering.inline.max.commits", "4").</code><code>        option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").</code><code>        option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").</code><code>        option("hoodie.clustering.plan.strategy.sort.columns", "column1,column2"). //optional, if sorting is needed as part of rewriting data</code><code>        mode(Append).</code><code>        save("dfs://location");

對於設定更高階的非同步Clustering管道,參考此處範例。

3. 表查詢效能

我們使用生產環境表的一個分割區建立了一個資料集,該表具有約2000萬條記錄,約200GB,資料集具有多個session_id的行。使用者始終使用對談謂詞查詢資料,單個對談的資料會分佈在多個資料檔案中,因為資料攝取會根據到達時間對資料進行分組。下面實驗表明通過對對談進行Clustering可以改善資料區域性性並將查詢執行時間減少50%以上。

查詢SQL如下

spark.sql("select  *  from table where session_id=123")

3.1 進行Clustering之前

查詢花費了2.2分鐘。請注意查詢計劃的"掃描parquet"部分中的輸出行數包括表中的所有2000W行。

3.2 進行Clustering之後

查詢計劃與上面類似,但由於改進了資料區域性性和謂詞下推,Spark可以修剪很多行。進行Clustering後,相同的查詢在掃描parquet檔案時僅輸出11萬行(2000萬行中的),這將查詢時間從2.2分鐘減少到不到一分鐘。

下表總結了使用Spark3執行的實驗對查詢效能的改進

Table StateQuery runtimeNum Records ProcessedNum files on diskSize of each file
Unclustered130,673 ms~20M13642~150 MB
Clustered55,963 ms~110K294~600 MB

Clustering後查詢執行時間減少了60%,在其他樣本資料集上也觀察到了類似的結果,請參閱範例查詢計劃和RFC-19效能評估上的更多詳細資訊。

我們希望大型表能夠大幅度提高速度,與上面的範例不同,查詢執行時間幾乎完全由實際I/O而不是查詢計劃決定。

4. 總結

使用Clustering,我們可以通過以下方式提高查詢效能:

利用空間填充曲線之類的概念來適應資料湖佈局並減少查詢讀取的資料量。

將小檔案合併成較大的檔案以減少查詢引擎需要掃描的檔案總數。

Clustering使得巨量資料進行流處理,攝取可以寫入小檔案以滿足流處理的延遲要求,可以在後臺使用Clustering將這些小檔案重寫成較大的檔案並減少檔案數。

除此之外,Clustering框架還提供了根據特定要求非同步重寫資料的靈活性,我們預見到許多其他用例將採用帶有自定義可插拔策略的Clustering框架來按需管理資料湖資料,如可以通過Clustering解決如下一些用例:

重寫資料並加密資料。

從表中修剪未使用的列並減少儲存空間。

以上就是Apache Hudi資料佈局黑科技降低一半查詢時間的詳細內容,更多關於Apache Hudi資料佈局查詢的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com