<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在之前的一篇部落格中,我們介紹了Clustering(聚簇)的表服務來重新組織資料來提供更好的查詢效能,而不用降低攝取速度,並且我們已經知道如何部署同步Clustering,本篇部落格中,我們將討論近期社群做的一些改進以及如何通過HoodieClusteringJob
和DeltaStreamer
工具來部署非同步Clustering。
通常講,Clustering
根據可設定的策略建立一個計劃,根據特定規則對符合條件的檔案進行分組,然後執行該計劃。Hudi支援並行寫入,並在多個表服務之間提供快照隔離,從而允許寫入程式在後臺執行Clustering
時繼續攝取。有關Clustering
的體系結構的更詳細概述請檢視上一篇博文。
如前所述Clustering
計劃和執行取決於可插拔的設定策略。這些策略大致可分為三類:計劃策略、執行策略和更新策略。
該策略在建立Clustering計劃時發揮作用。它有助於決定應該對哪些檔案組進行Clustering。讓我們看一下Hudi提供的不同計劃策略。請注意,使用此設定可以輕鬆地插拔這些策略。
Clustering
組,最大大小為每個組允許的最大檔案大小。可以使用此設定指定最大大小。此策略對於將中等大小的檔案合併成大檔案非常有用,以減少跨冷分割區分佈的大量檔案。N
天分割區建立一個計劃,將這些分割區中的小檔案片進行Clustering
,這是預設策略,當工作負載是可預測的並且資料是按時間劃分時,它可能很有用。Clustering
,那麼無論這些分割區是新分割區還是舊分割區,此策略都很有用,要使用此策略,還需要在下面設定兩個設定(包括開始和結束分割區):hoodie.clustering.plan.strategy.cluster.begin.partition hoodie.clustering.plan.strategy.cluster.end.partition
注意:所有策略都是分割區感知的,後兩種策略仍然受到第一種策略的大小限制的約束。
在計劃階段構建Clustering
組後,Hudi主要根據排序列和大小為每個組應用執行策略,可以使用此設定指定策略。
SparkSortAndSizeExecutionStrategy
是預設策略。使用此設定進行Clustering
時,使用者可以指定資料排序列。除此之外我們還可以為Clustering
產生的Parquet檔案設定最大檔案大小。該策略使用bulk_insert
將資料寫入新檔案,在這種情況下,Hudi隱式使用一個分割區器,該分割區器根據指定列進行排序。通過這種策略改變資料佈局,不僅提高了查詢效能,而且自動平衡了重寫開銷。
現在該策略可以作為單個Spark作業或多個作業執行,具體取決於在計劃階段建立的Clustering
組的數量。預設情況下Hudi將提交多個Spark作業併合並結果。如果要強制Hudi使用單Spark作業,請將執行策略類設定設定為SingleSparkJobExecutionStrategy
。
目前只能為未接收任何並行更新的表/分割區排程Clustering
。預設情況下更新策略的設定設定為SparkRejectUpdateStrategy
。如果某個檔案組在Clustering
期間有更新,則它將拒絕更新並引發異常。然而在某些用例中,更新是非常稀疏的,並且不涉及大多數檔案組。簡單拒絕更新的預設策略似乎不公平。在這種用例中使用者可以將設定設定為SparkAllowUpdateStregy
。
我們討論了關鍵策略設定,下面列出了與Clustering
相關的所有其他設定。在此列表中一些非常有用的設定包括:
設定項 | 解釋 | 預設值 |
---|---|---|
hoodie.clustering.async.enabled | 啟用在表上的非同步執行Clustering服務。 | false |
hoodie.clustering.async.max.commits | 通過指定應觸發多少次提交來控制非同步Clustering的頻率。 | 4 |
hoodie.clustering.preserve.commit.metadata | 重寫資料時保留現有的_hoodie_commit_time。這意味著使用者可以在Clustering資料上執行增量查詢,而不會產生任何副作用。 | false |
之前我們已經瞭解了使用者如何設定同步Clustering。此外使用者可以利用HoodiecClusteringJob設定兩步非同步Clustering。
隨著Hudi版本0.9.0的釋出,我們可以在同一步驟中排程和執行Clustering
。我們只需要指定-mode
或-m
選項。有如下三種模式:
schedule
(排程):制定一個Clustering計劃。這提供了一個可以在執行模式下傳遞的instant
。
execute
(執行):在給定的instant
執行Clustering計劃,這意味著這裡需要instant
。
scheduleAndExecute
(排程並執行):首先制定Clustering計劃並立即執行該計劃。
請注意要在原始寫入程式仍在執行時執行作業請啟用多寫入:
hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
使用spark submit
命令提交HoodieClusteringJob
範例如下:
spark-submit --class org.apache.hudi.utilities.HoodieClusteringJob /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar --props /path/to/config/clusteringjob.properties --mode scheduleAndExecute --base-path /path/to/hudi_table/basePath --table-name hudi_table_schedule_clustering --spark-memory 1g
clusteringjob.properties
組態檔範例如下
hoodie.clustering.async.enabled=true hoodie.clustering.async.max.commits=4 hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 hoodie.clustering.plan.strategy.small.file.limit=629145600 hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.sort.columns=column1,column2
接著看下如何使用HudiDeltaStreamer
。現在我們可以使用DeltaStreamer
觸發非同步Clustering。只需將hoodie.clustering.async.enabled為true
,並在屬性檔案中指定其他Clustering設定,在啟動Deltastreamer
時可以將其位置設為-props
(與HoodieClusteringJob
設定類似)。
使用spark submit
命令提交HoodieDeltaStreamer
範例如下:
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar --props /path/to/config/clustering_kafka.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --source-ordering-field impresssiontime --table-type COPY_ON_WRITE --target-base-path /path/to/hudi_table/basePath --target-table impressions_cow_cluster --op INSERT --hoodie-conf hoodie.clustering.async.enabled=true --continuous
我們還可以使用Spark結構化流啟用非同步Clustering,如下所示。
val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) def getAsyncClusteringOpts(isAsyncClustering: String, clusteringNumCommit: String, executionStrategy: String):Map[String, String] = { commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering, HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy ) } def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = { val streamingInput = // define the source of streaming Future { println("streaming starting") streamingInput .writeStream .format("org.apache.hudi") .options(hudiOptions) .option("checkpointLocation", basePath + "/checkpoint") .mode(Append) .start() .awaitTermination(10000) println("streaming ends") } } def structuredStreamingWithClustering(): Unit = { val df = //generate data frame val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") val f1 = initStreamingWriteFuture(hudiOptions) Await.result(f1, Duration.Inf) }
在這篇文章中,我們討論了不同的Clustering策略以及如何設定非同步Clustering。未來的工作包括:
Clustering支援更新。
支援Clustering的CLI工具。
另外Flink支援Clustering已經有相應Pull Request,有興趣的小夥伴可以關注該PR。
以上就是Apache Hudi非同步Clustering部署操作的掌握的詳細內容,更多關於Apache Hudi非同步Clustering部署的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45