2021-05-12 14:32:11
MapReduce執行流程分析
研究MapReduce已經有一段時間了。起初是從分析WordCount程式開始,後來開始閱讀Hadoop原始碼,自認為已經看清MapReduce的執行流程。現在把自己的理解貼出來,與大家分享,歡迎糾錯。
還是以最經典的WordCount程式作為基礎,來分析map階段、reduce階段和最複雜的shuffle階段。
文字1:hello world 文字2:map reduce
hello hadoop java interface
abc qaz java hdfs
java jvm spark storm
這樣的2個小文字檔案(不足64M),肯定會產生2個map任務,reduce任務預設是1個。當然,map任務和reduce任務的個數都可以在程式中或者組態檔中人為設定。為了說明partition的過程,我們把reduce任務的個數設為2。
1、map階段
map1 map2
輸入:<xxxx, hello world> <xxxx, map reduce>
<xxxx, hello hadoop> <xxxx, java interface>
<xxxx, abc qaz> <xxxx, java hdfs>
<xxxx, java jvm> <xxxx, spark storm>
切分:<hello, 1> <map, 1>
<word, 1> <reduce, 1>
<hello, 1> <java, 1>
<hadoop, 1> <interface, 1>
<abc, 1> <java, 1>
<qaz, 1> <hdfs, 1>
<java, 1> <spark, 1>
<jvm, 1> <storm, 1>
2、shuffle階段
切分完畢後,每一組<key, value>都會不斷地被collect到一個記憶體緩衝區中,對應程式碼中的資料結構MapOutputBuffer。
partition過程:每一組<key, value>在被收集的時候,就已經確定了分割區(partition),即在這個時候就已經確定了要交給哪個reduce任務處理。分割區會給<key, value>加上一個索引標識。假設分割區後(分割區演算法可以設定,預設是hash值模運算),資料如下:reduce1的標識是0,reduce2的標識是1
<hello, 1> 0 <map, 1> 0
<word, 1> 1 <reduce, 1> 1
<hello, 1> 0 <java, 1> 0
<hadoop, 1> 1 <interface, 1> 1
<abc, 1> 0 <java, 1> 0
<qaz, 1> 1 <hdfs, 1> 1
<java, 1> 0 <spark, 1> 0
<jvm, 1> 1 <storm, 1> 1
spill過程:緩衝區預設是100M,每當裡面的資料達到80M(比例80%,這個比例也可以人為設定),就會另起一個執行緒SpillThread往磁碟溢寫,每次溢寫都會產生一個資料檔案和對應的索引檔案。
sort過程:在溢寫的過程中一直在排序,比較演算法可以客製化,預設排序演算法是快速排序(可以人為設定),排序的過程就是一些位置的索引在不斷的變化。
排序之後的資料:
<abc, 1> 0 <hdfs, 1> 1
<hello, 1> 0 <interface, 1> 1
<hello, 1> 0 <java, 1> 0
<hadoop, 1> 1 <java, 1> 0
<java, 1> 0 <map, 1> 0
<jvm, 1> 1 <reduce, 1> 1
<qaz, 1> 1 <spark, 1> 0
<word, 1> 1 <storm, 1> 1
combine過程:這個過程預設是沒有的,需要明確指定combiner。combiner其實就是一個reducer,可以讓資料交給reduce任務之前,進行一些計算、合併。它的意義在於,使資料進一步減少,減輕了 reduce任務通過網路獲取資料的壓力和reduce處理資料的壓力。combiner也可以自己客製化,每個溢寫檔案都會combine。
combiner會通過一個比較器對key進行比較,相同的key(比較結果為0,比較演算法可以客製化),會被放到一個集合的疊代器中,然後疊代進行一次reduce運算,產生一個輸出。
combine之後的資料:
<abc, 1> 0 <hdfs, 1> 1
<hello, 1+1> 0 <interface, 1> 1
<hadoop, 1> 1 <java, 1+1> 0
<java, 1> 0 <map, 1> 0
<jvm, 1> 1 <reduce, 1> 1
<qaz, 1> 1 <spark, 1> 0
<word, 1> 1 <storm, 1> 1
merge過程:一個map所有的溢寫檔案都會進行合併,產生一個最終的溢寫檔案和一個索引檔案。合併是針對於不同的溢寫檔案中相同分割區的資料。在這個合併的過程中,也會進行combine操作(如果設定了的話),此處的combine過程同上,不再細說。
copy資料過程:每個reduce任務會遠端copy屬於自己的多個map輸出資料檔案,通過http傳輸,在本地會合併。另外,這個過程也會進行combine,此次不過多說明。
結果如下:
reduce0 reduce1
<abc, 1> <hadoop, 1>
<hello, 2> <jvm, 1>
<java, 1> <qaz, 1>
<java, 2> <word, 1>
<map, 1> <hdfs, 1>
<spark, 1> <interface, 1>
<reduce, 1>
<storm, 1>
sort過程:對上述結果進行排序,結果如下:
reduce0 reduce1
<abc, 1> <hadoop, 1>
<hello, 2> <hdfs, 1>
<java, 1> <interface, 1>
<java, 2> <jvm, 1>
<map, 1> <qaz, 1>
<spark, 1> <reduce, 1>
<storm, 1>
<word, 1>
3、reduce階段
通過一個GroupComparator對key進行比較,相同的key(比較結果為0,比較演算法可以客製化),會被放到一個集合的疊代器中,然後疊代進行一次reduce運算,產生一個輸出。類似combine過程。
最終的輸出: reduce0 reduce1
<abc, 1> <hadoop, 1>
<hello, 2> <hdfs, 1>
<java, 3> <interface, 1>
<map, 1> <jvm, 1>
<spark, 1> <qaz, 1>
<reduce, 1>
<storm, 1>
<word, 1>
從上述過程的分析可以看出,合併和排序是核心!!!
PS:其實每個階段沒有這麼分明,只不過是為了分析和理解的需要,才進行這樣詳細的劃分,而且劃分的還不一定正確,請大家及時糾錯。另外,上述流程中涉及到好多的細節,沒有一一說明。
本文永久更新連結地址:http://www.linuxidc.com/Linux/2017-06/144455.htm
相關文章