Hadoop入门(九)Mapreduce高级shuffle之Combiner
一、Combiner的出現
(1)為什么需要進行Map規約操
作
在上述過程中,我們看到至少兩個性能瓶頸:
(1)如果我們有10億個數據,Mapper會生成10億個鍵值對在網絡間進行傳輸,但如果我們只是對數據求最大值,那么很明顯的Mapper只需要輸出它所知道的最大值即可。這樣做不僅可以減輕網絡壓力,同樣也可以大幅度提高程序效率。
總結:網絡帶寬嚴重被占降低程序效率;
(2)假設使用美國專利數據集中的國家一項來闡述數據傾斜這個定義,這樣的數據遠遠不是一致性的或者說平衡分布的,由于大多數專利的國家都屬于美國,這樣不僅Mapper中的鍵值對、中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集于一個單一的Reducer之上,壓倒這個Reducer,從而大大降低程序的性能。
總結:單一節點承載過重降低程序性能;
(2)一種方案能夠解決這兩個問題呢?
在MapReduce編程模型中,在Mapper和Reducer之間有一個非常重要的組件,它解決了上述的性能瓶頸問題,它就是Combiner。
①與mapper和reducer不同的是,combiner沒有默認的實現,需要顯式的設置在conf中才有作用。
②并不是所有的job都適用combiner,只有操作滿足結合律的才可設置combiner。
combine操作類似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。
?
?
二、Combiner的作用
當Map程序開始產生結果的時候,并不是直接寫到文件的,而是利用緩存做一些排序方面的預處理操作。
每個Map任務都有一個循環內存緩沖區(默認100MB),當緩存的內容達到80%時,后臺線程開始將內容寫到文件,此時Map任務可以持續輸出結果,但如果緩沖區滿了,Map任務則需要等待。
寫文件使用round-robin方式。在寫入文件之前,先將數據按照Reduce進行分區。對于每一個分區,都會在內存中根據key進行排序,如果配置了Combiner,則排序后執行Combiner(Combine之后可以減少寫入文件和傳輸的數據)。
每次結果達到緩沖區的閥值時,都會創建一個文件,在Map結束時,可能會產生大量的文件。在Map完成前,會將這些文件進行合并和排序。如果文件的數量超過3個,則合并后會再次運行Combiner(1、2個文件就沒有必要了)。
(1)MapReduce的一種優化手段
每一個map都可能會產生大量的本地輸出,Combiner的作用就是對map端的輸出先做一次合并,以減少在map和reduce節點之間的數據傳輸量,以提高網絡IO性能
(2)Combiner的過程
1)Combiner實現本地key的聚合,對map輸出的key排序value進行迭代
? ? ? ?如下所示:
? ? ?map: (K1, V1) → list(K2, V2)? combine: (K2, list(V2)) → list(K2, V2)? reduce: (K2, list(V2)) → list(K3, V3)
2)Combiner還有本地reduce功能(其本質上就是一個reduce)
? ? ? ? ?例如wordcount的例子和找出value的最大值的程序
? ? ? ? ? combiner和reduce完全一致,如下所示:
? ? ? ? ? map: (K1, V1) → list(K2, V2)? ? ? ?combine: (K2, list(V2)) → list(K3, V3)? ? ? ?reduce: (K3, list(V3)) → list(K4, V4)
使用combiner之后,先完成的map會在本地聚合,提升速度。對于hadoop自帶的wordcount的例子,value就是一個疊加的數字,所以map一結束就可以進行reduce的value疊加,而不必要等到所有的map結束再去進行reduce的value疊加。
(3)融合Combiner的MapReduce
1)使用MyReducer作為Combiner
// 設置Map規約Combiner
? ? job.setCombinerClass(MyReducer.class);
執行后看到map的輸出和combine的輸入統計是一致的,而combine的輸出與reduce的輸入統計是一樣的。
由此可以看出規約操作成功,而且執行在map的最后,reduce之前。
2)自己定義Combiner
public static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text key, java.lang.Iterable<LongWritable> values,org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {// 顯示次數表示規約函數被調用了多少次,表示k2有多少個分組System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>");long count = 0L;for (LongWritable value : values) {count += value.get();// 顯示次數表示輸入的k2,v2的鍵值對數量System.out.println("Combiner輸入鍵值對<" + key.toString() + ",”+ value.get() + ">");}context.write(key, new LongWritable(count));// 顯示次數表示輸出的k2,v2的鍵值對數量System.out.println("Combiner輸出鍵值對<" + key.toString() + "," + count + ">");};}3)添加設置Combiner的代碼
// 設置Map規約Combiner
job.setCombinerClass(MyCombiner.class);
?
小結: 在實際的Hadoop集群操作中,我們是由多臺主機一起進行MapReduce的, 如果加入規約操作,每一臺主機會在reduce之前進行一次對本機數據的規約, 然后在通過集群進行reduce操作,這樣就會大大節省reduce的時間, 從而加快MapReduce的處理速度
?
?
總結
以上是生活随笔為你收集整理的Hadoop入门(九)Mapreduce高级shuffle之Combiner的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 路由器的分类有哪些路由器有哪些类型
- 下一篇: Hadoop入门(八)Mapreduce