首頁 > 軟體

Apache Doris Join 優化原理詳解

2022-10-28 14:08:45

背景 & 目標

  • 掌握 Apache Doris Join 優化手段及其實現原理
  • 為程式碼閱讀提供理論基礎

Doris 資料劃分

不同的 Join 方式非常依賴於對 Doris 中資料劃分方式的透徹理解。因此先在這裡列舉出必要的基礎知識。

首先,在 Doris 中資料都以表(Table)的形式進行邏輯上的描述。

在 Doris 的儲存引擎中,使用者資料被水平劃分為若干個資料分片(Tablet,也稱作資料分桶 Bucket)。每個 Tablet 包含若干資料行。各個 Tablet 之間的資料沒有交集,並且在物理上是獨立儲存的。

一個 Tablet 只屬於一個資料分割區(Partition)。而一個 Partition 包含若干個 Tablet。因為 Tablet 在物理上是獨立儲存的,所以可以視為 Partition 在物理上也是獨立的。Tablet 是資料移動、複製等操作的最小物理儲存單元。

若干個 Partition 組成一個 Table。Partition 可以視為是邏輯上最小的管理單元。資料的匯入與刪除,僅能針對一個 Partition 進行。

Doris 支援兩層的資料劃分。第一層是 Partition,支援 Range 和 List 的劃分方式。第二層是 Bucket(Tablet),僅支援 Hash 的劃分方式。也可以僅使用一層分割區。使用一層分割區時,只支援 Bucket 劃分。

下圖說明 Table、Partition、Bucket(Tablet) 的關係:

  • Table 按照 Range 的方式按照 date 欄位進行分割區,得到了 N 個 Partition
  • 每個 Partition 通過相同的 Hash 方式將其中的資料劃分為 M 個 Bucket(Tablet)
  • 從邏輯上來說,Bucket 1 可以包含 N 個 Partition 中劃分得到的資料,比如下圖中的 Tablet 11、Tablet 21、Tablet N1

特別注意:

Doris 中的 Partition 和 Bucket 定義可能和某些其它資料庫系統的定義有一些差異,下面配以一個具體的建表語句為例來說明:

CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl
(
    `user_id` LARGEINT NOT NULL COMMENT "使用者id",
    `date` DATE NOT NULL COMMENT "資料灌入日期時間",
    `timestamp` DATETIME NOT NULL COMMENT "資料灌入的時間戳",
    `city` VARCHAR(20) COMMENT "使用者所在城市",
    `age` SMALLINT COMMENT "使用者年齡",
    `sex` TINYINT COMMENT "使用者性別",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "使用者最後一次存取時間",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "使用者總消費",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "使用者最大停留時間",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "使用者最小停留時間"
)
ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
    PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
    PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
    PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
    "replication_num" = "3"
);

綠色高亮:Partition,此例中使用一個 date 欄位進行分割區

藍色高亮:Bucket,此例中使用 user_id 欄位為作為分佈列

Partition

  • Partition 列可以指定一列或多列,分割區列必須為 KEY 列
  • 分割區數量理論上沒有上限
  • 當不使用 Partition 建表時,系統會自動生成一個和表名同名的,全值範圍的 Partition。該 Partition 對使用者不可見,並且不可刪改

建立分割區時不可新增範圍重疊的分割區

有兩種分割區方式:

分割區方式一般用法
Range通常按時間分割區,以方便地管理新舊資料
List支援的型別更豐富,分割區值為列舉值。只有當資料為目標分割區列舉值其中之一時,才可以命中分割區

Bucket

  • 如果使用了 Partition,則 DISTRIBUTED 語句描述的是資料在各個分割區內的劃分規則。如果不使用 Partition,則描述的是對整個表的資料劃分規則
  • 分桶列的選擇,是在 查詢吞吐 和 查詢並行 之間的一種權衡:
  • 如果選擇多個分桶列,則資料分佈更均勻。如果一個查詢條件不包含所有分桶列的等值條件(意味著無法做桶裁剪以減少資料查詢範圍),那麼該查詢會觸發所有分桶同時掃描,這樣查詢的吞吐會增加,單個查詢的延遲隨之降低。這個方式適合大吞吐低並行的查詢場景
  • 如果僅選擇一個或少數分桶列,則對應的點查詢可以僅觸發一個分桶掃描(意味著可以做桶裁剪以減少資料查詢範圍)。此時,當多個點查詢並行時,這些查詢有較大的概率分別觸發不同的分桶掃描,各個查詢之間的 IO 影響較小,尤其當不同桶分佈在不同磁碟上時),所以這種方式適合高並行的點查詢場景
  • 分桶的數量理論上沒有上限

Join 方式

總覽

作為分散式的 MPP 資料庫, 在 Join 的過程中是需要進行資料的 Shuffle。資料需要進行拆分排程,才能保證最終的 Join 結果是正確的。舉個簡單的例子,假設關係 S 和 R 進行Join,N 表示參與 Join 計算的節點的數量;T 則表示關係的 Tuple 數目。

目前 Doris 支援的 Join 方式有以上 4 種,這 4 種方式靈活度和適用性是從高到低的,對資料分佈的要求越來越嚴,但 Join 計算的效能則通過降低網路開銷而越來越好。

Join 方式的選擇是 FE 生成分散式計劃階段會考慮的事項之一。在 FE 進行分散式計劃時,優先選擇的順序為(總是會優先選擇預期效能最好的):Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。

Colocate 以及 Bucket Shuffle 是可遇不可求的。當無法使用它們時,Doris會自動嘗試進行 Broadcast Join,如果預估小表過大則會自動切換至 Shuffle Join。

但是使用者可以通過顯式 Hint 來強制使用期望的 Join 型別,比如:

select * from test join [shuffle] baseall on test.k1 = baseall.k1;

Broadcast / Shuffle Join

原理比較簡單,這裡不展開。

Bucket Shuffle Join

當 Join 條件命中了左表的資料分佈列時,Broadcast 以及 Shuffle Join 會有非必要的網路傳輸開銷。而 Bucket Shuffle Join 旨在解決這類問題,通過對左表實現本地性計算優化,來減少左表資料在節點間的傳輸耗時,從而加速查詢。

以上的例子中,Join 的等值表示式命中了表 A(左表)的資料分佈列。Bucket Shuffle Join 會根據表 A 的資料分佈資訊,將表 B(右表)的資料傳送到對應表 A 的資料計算節點。

定性分析上:

  • 降低了網路與記憶體開銷(相比 Broadcast 以及 Shuffle Join 都不會更差),使一類 Join 查詢有更好的效能。尤其是當 FE 能夠執行左表的分割區裁剪與桶裁剪時
  • 與 Colocate Join 不同,它對於表的資料分佈方式沒有侵入性,對於使用者來說是透明的。對於表的資料分佈沒有強制性的要求(體現在建表語句中不需要顯式地設定 colocate_with 屬性),不容易導致資料傾斜的問題
  • 可以為 Join Reorder 提供更多可能的優化空間

Plan Rule

  • Bucket Shuffle Join 只生效於 Join 條件為等值的場景,原因與 Colocate Join 類似,它們都依賴 Hash 來計算確定的資料分佈
  • 在等值 Join 條件之中包含兩張表的分桶列,當左表的分桶列為等值的 Join 條件時,它有很大概率會被規劃為 Bucket Shuffle Join
  • 由於不同的資料型別的 Hash 值計算結果不同,所以 Bucket Shuffle Join 要求左表的分桶列的型別與右表等值 Join 列的型別需要保持一致,否則無法進行對應的規劃
  • Bucket Shuffle Join 只作用於 Doris 原生的 OLAP 表,對於 ODBC,MySQL,ES 等外表,當其作為左表時是無法規劃生效的
  • 對於分割區表,由於每一個分割區的資料分佈規則可能不同,所以 Bucket Shuffle Join 只能保證左表為單分割區時生效。所以在 SQL 執行之中,需要儘量使用 where 條件使分割區裁剪的策略能夠生效
  • 假如左表為 Colocate 的表,那麼它每個分割區的資料分佈規則是確定的,Bucket Shuffle Join 能在Colocate 表上表現更好

Colocate Join

可以理解為在資料分佈滿足一定條件的前提下,減少一切不必要的網路傳輸開銷,實現完全的計算在地化來加速查詢。同時因為沒有網路傳輸開銷,BE 節點可以擁有更高的並行度,從而進一步提升 Join 效能。

要理解這個演演算法,需要先了解兩個術語:

  • Colocation Group(CG):一個 CG 中會包含一張及以上的 Table。在同一個 Group 內的 Table 有著相同的 Colocation Group Schema,並且有著相同的資料分片分佈
  • Colocation Group Schema(CGS):用於描述一個 CG 中的 Table,和 Colocation 相關的通用 Schema 資訊。包括分桶列型別,分桶數以及副本數等

和 Buckets Sequence 這一概念:

一個表的資料,最終會根據分桶列值 Hash、對桶數取模後落在某一個分桶內。假設一個 Table 的分桶數為 8,則共有 [0, 1, 2, 3, 4, 5, 6, 7] 8 個分桶(Bucket),我們稱這樣一個序列為一個 BucketsSequence。每個 Bucket 內會有一個或多個資料分片(Tablet)。當表為單分割區表時,一個 Bucket 內僅有一個 Tablet。如果是多分割區表,則會有多個(因為多個 Partition 中的不同 Tablet 會被劃分到相同的 Bucket)。

Colocation Join 功能,是將一組擁有相同 CGS 的 Table 組成一個 CG。並保證這些 Table 對應的資料分片會落在同一個 BE 節點上。使得當 CG 內的表進行分桶列上的 Join 操作時,可以通過直接進行本地資料 Join,減少資料在節點間的傳輸耗時。

 

因此關鍵問題就轉變為了「如何保證這些 Table 對應的資料分片會落在同一個 BE 節點上?」

通過同一 CG 內的 Table 必須保證以下屬性相同實現:

  • 分桶列和分桶數

分桶列,即在建表語句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定的列。分桶列決定了一張表的資料通過哪些列的值進行 Hash 劃分到不同的 Tablet 中。同一 CG 內的 Table 必須保證分桶列的型別和數量完全一致,並且桶數一致,才能保證多張表的資料分片能夠一一對應的進行分佈控制。

  • 副本數

同一個 CG 內所有表的所有分割區(Partition)的副本數必須一致。如果不一致,可能出現某一個 Tablet 的某一個副本,在同一個 BE 上沒有其他的表分片的副本對應。不過,同一個 CG 內的表,分割區的個數、範圍以及分割區列的型別不要求一致。

 

在固定了分桶列和分桶數後,同一個 CG 內的表會擁有相同的 BucketsSequence。而副本數決定了每個分桶內的 Tablet 的多個副本,存放在哪些 BE 上。假設 BucketsSequence 為 [0, 1, 2, 3, 4, 5, 6, 7],BE 節點有 [A, B, C, D] 4個。則一個可能的資料分佈如下:

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| A | | B | | C | | D | | A | | B | | C | | D |
|   | |   | |   | |   | |   | |   | |   | |   |
| B | | C | | D | | A | | B | | C | | D | | A |
|   | |   | |   | |   | |   | |   | |   | |   |
| C | | D | | A | | B | | C | | D | | A | | B |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

CG 內所有表的資料都會按照上面的規則進行統一分佈,這樣就保證了,分桶列值相同的資料都在同一個 BE 節點上,可以進行本地資料 Join。其核心思想是「兩次對映」,保證相同的 Distributed Key 的資料會被對映到相同的 Bucket Sequence,再保證 Bucket Sequence 對應的 Bucket 對映到相同的 BE 節點:

通過查詢計劃可以檢查一個查詢是否使用了 Colocate Join,同時計劃中的 Exchange Node 也被去掉了,會將 ScanNode 直接設定為 Hash Join Node 的孩子節點。

DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
-- 在 Hash Join 節點會顯示:
-- colocate: true/false

Colocate Join 十分適合幾張表按照相同欄位分桶,並高頻根據固定的欄位 Join 的場景。這樣可以將資料預先儲存到相同的分桶中,實現本地計算。

Runtime Filter 優化

Doris 在進行 Hash Join 計算時會在右表構建一個 Hash Table,左表流式地通過右表的 Hash Table 從而得出 Join 結果。而 Runtime Filter 就是充分利用了右表的 Hash Table 構建階段去做一些額外的事情。

在右表生成 Hash Table 的時,同時生成一個基於 Hash Table 資料的一個過濾條件,然後下推到左表的資料掃描節點。通過這樣的方式,Doris 可以在執行時進行資料過濾。

假如左表是一張大表,右表是一張小表,那麼利用下推到左表的過濾條件就可以把絕大多數 Join 層要過濾的資料在資料讀取時就提前過濾(如果能夠下推到引擎層,還能夠利用 Doris 針對 Key 列過濾的延遲物化),從而大幅度地提升 Join 查詢的效能。

Runtime Filter 在查詢規劃時生成,在 HashJoinNode 中構建,在 ScanNode 中應用。比如 T1(行數 10w) 和 T2(行數 2k) 的 Join 操作:

|          >      HashJoinNode     <
|         |                         |
|         | 100000                  | 2000
|         |                         |
|   OlapScanNode              OlapScanNode
|         ^                         ^   
|         | 100000                  | 2000
|        T1                        T2
|

顯而易見對 T2 掃描資料要遠遠快於 T1,如果我們主動等待一段時間再掃描 T1,等 T2 將掃描的資料記錄交給 HashJoinNode 後,HashJoinNode 根據 T2 的資料計算出一個過濾條件,比如 T2 資料的最大和最小值,或者構建一個 Bloom Filter,接著將這個過濾條件發給等待掃描 T1 的 ScanNode,後者應用這個過濾條件,將過濾後的資料交給 HashJoinNode,從而減少 probe hash table 的次數和網路開銷,這個過濾條件就是 Runtime Filter,效果如下:

|          >      HashJoinNode     <
|         |                         |
|         | 6000                    | 2000
|         |                         |
|   OlapScanNode              OlapScanNode
|         ^                         ^   
|         | 100000                  | 2000
|        T1                        T2
|

如果能將過濾條件(Runtime Filter)下推到儲存引擎,則某些情況可以利用索引(比如 Join 列為 Key 列,可以利用延遲物化能力)來直接減少掃描的資料量,從而大大減少掃描耗時,效果如下:

|          >      HashJoinNode     <
|         |                         |
|         | 6000                    | 2000
|         |                         |
|   OlapScanNode              OlapScanNode
|         ^                         ^   
|         | 6000                    | 2000
|        T1                        T2
|

可見,和謂詞下推、分割區裁剪不同,Runtime Filter 是在執行時動態生成的過濾條件,即在查詢執行時解析 Join 條件確定過濾表示式,並將表示式下推給正在讀取左表的 ScanNode,從而減少掃描的資料量,進而減少 probe hash table 的次數,避免不必要的 IO 和網路傳輸。因為其執行時生效的特性,官方認為它是 Adaptive Query Execution 的一種應用。

根據上面的例子,可以推匯出場景滿足以下的條件時,使用 Runtime Filter 的效果會比較好:

  • 左表大右表小(當右表上還有額外的過濾條件會更理想),因為構建 Runtime Filter 是需要承擔計算成本的,包括一些記憶體的開銷,而開銷直接取決於右表的實際資料量
  • 左右表 Join 出來的結果很少,說明通過 Runtime Filter 可以過濾掉左表的絕大部分資料

Doris 支援 3 種 Runtime Filter:

  • 一種是 IN,很好理解,將一個 hashset 下推到資料掃描節點。
  • 第二種就是 BloomFilter,就是利用雜湊表的資料構造一個 BloomFilter,然後把這個 BloomFilter 下推到查詢資料的掃描節點。
  • 最後一種就是 MinMax,就是個 Range 範圍,通過右表資料確定 Range 範圍之後,下推給資料掃描節點。

工作原理和優劣總結如下:

Runtime Filter 型別工作原理適用場景優點缺點
IN子查詢的方式,實現上是將一個 Hashset 下推到 Scan 節點Broadcast Join開銷小,過濾效果明顯且快速右表超過一定資料量時會失效,目前 Doris 設定的閾值是 1024
Min/Max通過右表構建一個 Range 範圍,然後將它下推到 Scan 節點通用開銷小僅對數值型別有效果;對數值以外型別無法使用
BloomFilter通過右表構建一個 BloomFilter,然後將它下推到 Scan 節點通用通用性較好,適用於各種型別、效果也較好設定比較複雜且計算成本較高;當過濾率較低或者左表資料較少時,可能導致效能降低

一些使用的注意事項(比較細節了,後面考慮結合程式碼再深入理解):

開啟 Runtime Filter 後,左表的 ScanNode 會為每一個分配給自己的 Runtime Filter 等待一段時間再掃描資料,即如果 ScanNode 被分配了 3 個 Runtime Filter,那麼它最多會等待 3000ms。

因為 Runtime Filter 的構建和合並均需要時間,ScanNode 會嘗試將等待時間內到達的 Runtime Filter 下推到儲存引擎,如果超過等待時間後,ScanNode 會使用已經到達的 Runtime Filter 直接開始掃描資料。

如果 Runtime Filter 在 ScanNode 開始掃描之後到達,則 ScanNode 不會將該 Runtime Filter 下推到儲存引擎,而是對已經從儲存引擎掃描上來的資料,在 ScanNode 上基於該 Runtime Filter 使用表示式過濾,之前已經掃描的資料則不會應用該 Runtime Filter,這樣得到的中間資料規模會大於最優解,但可以避免嚴重的劣化。

如果叢集比較繁忙,並且叢集上有許多資源密集型或長耗時的查詢,可以考慮增加等待時間,以避免複雜查詢錯過優化機會。如果叢集負載較輕,並且叢集上有許多隻需要幾秒的小查詢,可以考慮減少等待時間,以避免每個查詢增加 1s 的延遲。

Join Reorder 優化

有了前面兩表 Join 的 Runtime Filter 鋪墊,再來看 Join Reorder 的優化,邏輯關係上就能夠理順了。

Doris 目前的 Join Reorder 演演算法是基於 RBO 的,邏輯描述如下:

  • 儘量讓大表跟小表做 Join,它生成的中間結果是儘可能小的
  • 把有條件的 Join 表往前放,也就是說盡量讓有條件的 Join 表進行過濾
  • Hash Join 的優先順序高於 Nest Loop Join,因為 Hash join 本身是比 Nest Loop Join 快很多的

可以發現前兩條,都是在朝著讓「右表」更小的方向去優化,而最後一條則是從演演算法的效能上來考慮。

Join 調優建議

  • Join 列最好是相同的簡單型別;同型別避免 Cast 操作,簡單型別則有不錯的 Join 計算效能
  • Join 列最好是 Key 列,原因是 Key 列能夠充分利用 Doris 延遲物化的特性,減少 IO 提升效能
  • 大表之間的 Join 最好能夠利用上 Colocate,相當於已經做好了預 Shuffle,實際查詢的時候可以直接 Join 計算不再有 Shuffle 操作,徹底避免了大表的 Shuffle 網路開銷
  • 利用 Runtime Filter,Join 過濾性高時效果顯著。根據 3 種 Runtime Filter 特點選擇最適合的
  • 涉及多表 Join,需要判斷 Join 的合理性。儘量保證“左大右小”的原則,HashJoin 優於 NLJ。必要時可以通過 SQL Rewrite,通過 Hint 來調整 Join 順序

REF

https://www.jb51.net/article/266004.htm

https://www.jb51.net/article/266000.htm

以上就是Apache Doris Join 優化原理詳解的詳細內容,更多關於Apache Doris Join 優化的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com