首頁 > 軟體

MapReduce執行流程分析

2020-06-16 17:13:06

研究MapReduce已經有一段時間了。起初是從分析WordCount程式開始,後來開始閱讀Hadoop原始碼,自認為已經看清MapReduce的執行流程。現在把自己的理解貼出來,與大家分享,歡迎糾錯。

還是以最經典的WordCount程式作為基礎,來分析map階段、reduce階段和最複雜的shuffle階段。

    文字1:hello world                                      文字2:map reduce

              hello hadoop                                              java interface

              abc qaz                                                      java hdfs

     java jvm                                                    spark storm

這樣的2個小文字檔案(不足64M),肯定會產生2個map任務,reduce任務預設是1個。當然,map任務和reduce任務的個數都可以在程式中或者組態檔中人為設定。為了說明partition的過程,我們把reduce任務的個數設為2。

1、map階段

                    map1                                                            map2

輸入:<xxxx, hello world>                                          <xxxx, map reduce>

        <xxxx, hello hadoop>                                        <xxxx, java interface>

    <xxxx, abc qaz>                                              <xxxx, java hdfs>

    <xxxx, java jvm>                                              <xxxx, spark storm>

切分:<hello, 1>                                                          <map, 1>

        <word, 1>                                                          <reduce, 1>

   <hello, 1>                                                          <java, 1>                 

   <hadoop, 1>                                                      <interface, 1>

   <abc, 1>                                                            <java, 1>

   <qaz, 1>                                                            <hdfs, 1>

   <java, 1>                                                            <spark, 1>

   <jvm, 1>                                                            <storm, 1>

 

2、shuffle階段

    切分完畢後,每一組<key, value>都會不斷地被collect到一個記憶體緩衝區中,對應程式碼中的資料結構MapOutputBuffer。

partition過程:每一組<key, value>在被收集的時候,就已經確定了分割區(partition),即在這個時候就已經確定了要交給哪個reduce任務處理。分割區會給<key, value>加上一個索引標識。假設分割區後(分割區演算法可以設定,預設是hash值模運算),資料如下:reduce1的標識是0,reduce2的標識是1                                                                                           

                                                  <hello, 1>                0                                          <map, 1>                        0

                                                  <word, 1>                1                                          <reduce, 1>                      1

                                             <hello, 1>                0                                          <java, 1>                          0 

                                             <hadoop, 1>            1                                          <interface, 1>                  1

                                             <abc, 1>                  0                                          <java, 1>                        0

                                             <qaz, 1>                  1                                          <hdfs, 1>                        1

                                             <java, 1>                0                                          <spark, 1>                        0

                                             <jvm, 1>                  1                                          <storm, 1>                      1

 

spill過程:緩衝區預設是100M,每當裡面的資料達到80M(比例80%,這個比例也可以人為設定),就會另起一個執行緒SpillThread往磁碟溢寫,每次溢寫都會產生一個資料檔案和對應的索引檔案。

sort過程:在溢寫的過程中一直在排序,比較演算法可以客製化,預設排序演算法是快速排序(可以人為設定),排序的過程就是一些位置的索引在不斷的變化。

              排序之後的資料:         

                                                  <abc, 1>                0                                          <hdfs, 1>                        1

                                                  <hello, 1>                0                                          <interface, 1>                  1

                                                  <hello, 1>                0                                          <java, 1>                          0 

                                                  <hadoop, 1>            1                                        <java, 1>                        0

                                             <java, 1>                0                                          <map, 1>                        0

                                             <jvm, 1>                  1                                        <reduce, 1>                      1

                                             <qaz, 1>                  1                                          <spark, 1>                        0

                                             <word, 1>                1                                          <storm, 1>                      1  

 

combine過程:這個過程預設是沒有的,需要明確指定combiner。combiner其實就是一個reducer,可以讓資料交給reduce任務之前,進行一些計算、合併。它的意義在於,使資料進一步減少,減輕了                      reduce任務通過網路獲取資料的壓力和reduce處理資料的壓力。combiner也可以自己客製化,每個溢寫檔案都會combine。

                      combiner會通過一個比較器對key進行比較,相同的key(比較結果為0,比較演算法可以客製化),會被放到一個集合的疊代器中,然後疊代進行一次reduce運算,產生一個輸出。

                      combine之後的資料:

                                                  <abc, 1>                0                                        <hdfs, 1>                        1

                                                  <hello, 1+1>            0                                        <interface, 1>                  1 

                                                  <hadoop, 1>            1                                        <java, 1+1>                    0

                                             <java, 1>                0                                          <map, 1>                        0

                                             <jvm, 1>                  1                                        <reduce, 1>                      1

                                             <qaz, 1>                  1                                          <spark, 1>                        0

                                             <word, 1>                1                                          <storm, 1>                      1

merge過程:一個map所有的溢寫檔案都會進行合併,產生一個最終的溢寫檔案和一個索引檔案。合併是針對於不同的溢寫檔案中相同分割區的資料。在這個合併的過程中,也會進行combine操作(如果設定了的話),此處的combine過程同上,不再細說。

copy資料過程:每個reduce任務會遠端copy屬於自己的多個map輸出資料檔案,通過http傳輸,在本地會合併。另外,這個過程也會進行combine,此次不過多說明。

                              結果如下:                       

                                                                    reduce0                        reduce1

                                                                  <abc, 1>                    <hadoop, 1>

                       <hello, 2>                    <jvm, 1>

                                                                  <java, 1>                    <qaz, 1>

                      <java, 2>                    <word, 1>


                                                                  <map, 1>                    <hdfs, 1>

                                                                  <spark, 1>                  <interface, 1>

                                                                                                      <reduce, 1>

                                                                                                      <storm, 1>

sort過程:對上述結果進行排序,結果如下:

                                                            reduce0                        reduce1

                                                                  <abc, 1>                    <hadoop, 1>

                        <hello, 2>                    <hdfs, 1>

                                                                  <java, 1>                    <interface, 1>

                        <java, 2>                    <jvm, 1>

                                                                  <map, 1>                    <qaz, 1>

                                                                  <spark, 1>                  <reduce, 1>

                                                                                                    <storm, 1>

                                                                                                    <word, 1>

 

3、reduce階段

    通過一個GroupComparator對key進行比較,相同的key(比較結果為0,比較演算法可以客製化),會被放到一個集合的疊代器中,然後疊代進行一次reduce運算,產生一個輸出。類似combine過程。


                            最終的輸出:                    reduce0                        reduce1

                                                                  <abc, 1>                    <hadoop, 1>

                        <hello, 2>                    <hdfs, 1>

                                                                  <java, 3>                    <interface, 1>

                         <map, 1>                    <jvm, 1>

                                                                  <spark, 1>                  <qaz, 1>

                                                                                                    <reduce, 1>

                                                                                                    <storm, 1>

                                                                                                    <word, 1>

從上述過程的分析可以看出,合併和排序是核心!!!

PS:其實每個階段沒有這麼分明,只不過是為了分析和理解的需要,才進行這樣詳細的劃分,而且劃分的還不一定正確,請大家及時糾錯。另外,上述流程中涉及到好多的細節,沒有一一說明。

本文永久更新連結地址http://www.linuxidc.com/Linux/2017-06/144455.htm


IT145.com E-mail:sddin#qq.com