mapreduce的规约(Combiner)
聽了超哥的一席課后逐漸明白了Combiner,記錄一下自己的理解!(thanks 超哥)
首先貼上兩段代碼:
code1:
package combine;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** @ClassName: WordCount2 * @Description: TODO(這里用一句話描述這個類的作用) * @author zhangweixiang* @date 2014年3月6日 下午1:34:50*/public class WordCount2 {static final String INPUT_PATH="hdfs://192.168.0.9:9000/hello.txt";static final String OUT_PATH="hdfs://192.168.0.9:9000/word";public static void main(String[] args) throws Exception {Configuration conf=new Configuration();Job job=new Job(conf, WordCount2.class.getSimpleName());//1.1指定讀取的文件位置FileInputFormat.addInputPaths(job, INPUT_PATH);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對job.setInputFormatClass(TextInputFormat.class);//1.2指定自定義的map類job.setMapperClass(MyMapper.class);//map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>的類型一致,則可以省略job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//1.3分區(qū) // job.setPartitionerClass(HashPartitioner.class);//有一個reudce任務運行 // job.setNumReduceTasks(1);//1.4 排序、分組//1.5規(guī)約 // job.setCombinerClass(MyReduce.class);//2.2指定自定義的reduce類job.setReducerClass(MyReduce.class);//指定reduce的輸出類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//刪除已存在的文件FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), new Configuration());Path path = new Path(OUT_PATH);if(fileSystem.exists(path)){fileSystem.delete(path, true);}//2.3指定寫出到哪里FileOutputFormat.setOutputPath(job,new Path(OUT_PATH) );//指定輸出文件的格式化類job.setOutputFormatClass(TextOutputFormat.class);//把Job提交給JobTracker執(zhí)行job.waitForCompletion(true);}/*** @ClassName: MyMapper * @Description: map任務處理* @param KEYIN 即k1 表示行的偏移量* @param VALUEIN 即v1 表示行的文本內(nèi)容* @param KEYOUT 即k2 表示行中出現(xiàn)的單詞* @param VALUEOUT 即v2 表示行中出現(xiàn)的單詞數(shù),固定值為1* @author zhangweixiang* @date 2014年3月4日 下午4:16:00*/static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String string = value.toString();//自定義計數(shù)器(查詢hello的出現(xiàn),以及出現(xiàn)次數(shù))Counter counter = context.getCounter("查找hello", "hello");if(string.contains("hello")){counter.increment(1l);//出現(xiàn)一次+1}//分割字符串String[] split = string.split(" ");for(String word:split){context.write(new Text(word), new LongWritable(1));//map任務輸出}}}/*** @ClassName: MyReduce * @Description: reduce任務處理* @param KEYIN 即k2 表示行中出現(xiàn)的單詞* @param VALUEIN 即v2 表示行中出現(xiàn)的單詞個數(shù)* @param KEYOUT 即k3 表示文本中出現(xiàn)的不同單詞* @param VALUEOUT 即v3 表示文本中出現(xiàn)不同單詞的總次數(shù)* @author zhangweixiang* @date 2014年3月4日 下午4:23:20*/static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{protected void reduce(Text key, Iterable<LongWritable> values,Context context)throws IOException, InterruptedException {long sum=0;for (LongWritable longWritable : values) {sum+=longWritable.get();}context.write(key, new LongWritable(sum));//reduce輸出}} }code2
package combine;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** @ClassName: WordCount2 * @Description: TODO(這里用一句話描述這個類的作用) * @author zhangweixiang* @date 2014年3月6日 下午1:34:50*/public class WordCount2 {static final String INPUT_PATH="hdfs://192.168.0.9:9000/hello.txt";static final String OUT_PATH="hdfs://192.168.0.9:9000/word";public static void main(String[] args) throws Exception {Configuration conf=new Configuration();Job job=new Job(conf, WordCount2.class.getSimpleName());//1.1指定讀取的文件位置FileInputFormat.addInputPaths(job, INPUT_PATH);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對job.setInputFormatClass(TextInputFormat.class);//1.2指定自定義的map類job.setMapperClass(MyMapper.class);//map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>的類型一致,則可以省略job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//1.3分區(qū) // job.setPartitionerClass(HashPartitioner.class);//有一個reudce任務運行 // job.setNumReduceTasks(1);//1.4 排序、分組//1.5規(guī)約job.setCombinerClass(MyReduce.class);//2.2指定自定義的reduce類job.setReducerClass(MyReduce.class);//指定reduce的輸出類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//刪除已存在的文件FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), new Configuration());Path path = new Path(OUT_PATH);if(fileSystem.exists(path)){fileSystem.delete(path, true);}//2.3指定寫出到哪里FileOutputFormat.setOutputPath(job,new Path(OUT_PATH) );//指定輸出文件的格式化類job.setOutputFormatClass(TextOutputFormat.class);//把Job提交給JobTracker執(zhí)行job.waitForCompletion(true);}/*** @ClassName: MyMapper * @Description: map任務處理* @param KEYIN 即k1 表示行的偏移量* @param VALUEIN 即v1 表示行的文本內(nèi)容* @param KEYOUT 即k2 表示行中出現(xiàn)的單詞* @param VALUEOUT 即v2 表示行中出現(xiàn)的單詞數(shù),固定值為1* @author zhangweixiang* @date 2014年3月4日 下午4:16:00*/static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String string = value.toString();//自定義計數(shù)器(查詢hello的出現(xiàn),以及出現(xiàn)次數(shù))Counter counter = context.getCounter("查找hello", "hello");if(string.contains("hello")){counter.increment(1l);//出現(xiàn)一次+1}//分割字符串String[] split = string.split(" ");for(String word:split){context.write(new Text(word), new LongWritable(1));//map任務輸出}}}/*** @ClassName: MyReduce * @Description: reduce任務處理* @param KEYIN 即k2 表示行中出現(xiàn)的單詞* @param VALUEIN 即v2 表示行中出現(xiàn)的單詞個數(shù)* @param KEYOUT 即k3 表示文本中出現(xiàn)的不同單詞* @param VALUEOUT 即v3 表示文本中出現(xiàn)不同單詞的總次數(shù)* @author zhangweixiang* @date 2014年3月4日 下午4:23:20*/static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{protected void reduce(Text key, Iterable<LongWritable> values,Context context)throws IOException, InterruptedException {long sum=0;for (LongWritable longWritable : values) {sum+=longWritable.get();}context.write(key, new LongWritable(sum));//reduce輸出}} }
code1和code2的唯一區(qū)別就是
//1.5規(guī)約
job.setCombinerClass(MyReduce.class);
code1中加了注釋,code2啟用了規(guī)約!
接下來看看控制臺的輸出(我win8下的eclipse)
code1:
/06 13:52:17 INFO mapred.JobClient: Counters: 20
14/03/06 13:52:17 INFO mapred.JobClient: ? File Output Format Counters?
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Bytes Written=28
14/03/06 13:52:17 INFO mapred.JobClient: ? FileSystemCounters
14/03/06 13:52:17 INFO mapred.JobClient: ? ? FILE_BYTES_READ=424
14/03/06 13:52:17 INFO mapred.JobClient: ? ? HDFS_BYTES_READ=84
14/03/06 13:52:17 INFO mapred.JobClient: ? ? FILE_BYTES_WRITTEN=128676
14/03/06 13:52:17 INFO mapred.JobClient: ? ? HDFS_BYTES_WRITTEN=28
14/03/06 13:52:17 INFO mapred.JobClient: ? File Input Format Counters?
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Bytes Read=42
14/03/06 13:52:17 INFO mapred.JobClient: ? 查找hello
14/03/06 13:52:17 INFO mapred.JobClient: ? ? hello=2
14/03/06 13:52:17 INFO mapred.JobClient: ? Map-Reduce Framework
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Map output materialized bytes=126
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Map input records=4
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Reduce shuffle bytes=0
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Spilled Records=16
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Map output bytes=104
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Total committed heap usage (bytes)=266469376
14/03/06 13:52:17 INFO mapred.JobClient: ? ? SPLIT_RAW_BYTES=98
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Combine input records=0
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Reduce input records=8
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Reduce input groups=4
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Combine output records=0
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Reduce output records=4
14/03/06 13:52:17 INFO mapred.JobClient: ? ? Map output records=8
code2:
14/03/06 13:55:11 INFO mapred.JobClient: Counters: 20
14/03/06 13:55:11 INFO mapred.JobClient: ? File Output Format Counters?
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Bytes Written=28
14/03/06 13:55:11 INFO mapred.JobClient: ? FileSystemCounters
14/03/06 13:55:11 INFO mapred.JobClient: ? ? FILE_BYTES_READ=364
14/03/06 13:55:11 INFO mapred.JobClient: ? ? HDFS_BYTES_READ=84
14/03/06 13:55:11 INFO mapred.JobClient: ? ? FILE_BYTES_WRITTEN=129072
14/03/06 13:55:11 INFO mapred.JobClient: ? ? HDFS_BYTES_WRITTEN=28
14/03/06 13:55:11 INFO mapred.JobClient: ? File Input Format Counters?
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Bytes Read=42
14/03/06 13:55:11 INFO mapred.JobClient: ? 查找hello
14/03/06 13:55:11 INFO mapred.JobClient: ? ? hello=2
14/03/06 13:55:11 INFO mapred.JobClient: ? Map-Reduce Framework
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Map output materialized bytes=66
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Map input records=4
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Reduce shuffle bytes=0
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Spilled Records=8
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Map output bytes=104
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Total committed heap usage (bytes)=266469376
14/03/06 13:55:11 INFO mapred.JobClient: ? ? SPLIT_RAW_BYTES=98
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Combine input records=8
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Reduce input records=4
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Reduce input groups=4
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Combine output records=4
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Reduce output records=4
14/03/06 13:55:11 INFO mapred.JobClient: ? ? Map output records=8
通過code1和code2的輸出比較(紅色部分),相信大家已經(jīng)看出了區(qū)別!
code1中沒有使用規(guī)約,所以:
Combine input records=0
Reduce input records=8
?Reduce input groups=4
Combine output records=0
code2中使用了規(guī)約,所以:
?Combine input records=8
?Reduce input records=4
Reduce input groups=4
Combine output records=4
很明顯使用了規(guī)約后 Reduce input records=4減少map到reduce過程中數(shù)據(jù)的傳輸!
為什么要使用規(guī)約呢?(總結超哥的!)
/**
?* 問:為什么使用Combiner?
?* 答:Combiner發(fā)生在Map端,對數(shù)據(jù)進行規(guī)約處理,數(shù)據(jù)量變小了,傳送到reduce端的數(shù)據(jù)量變小了,傳輸時間變短,作業(yè)的整體時間變短。
?*?
?* 問:為什么Combiner不作為MR運行的標配,而是可選步驟哪?
?* 答:因為不是所有的算法都適合使用Combiner處理,例如求平均數(shù)。
?*
?* 問:Combiner本身已經(jīng)執(zhí)行了reduce操作,為什么在Reducer階段還要執(zhí)行reduce操作哪?
?* 答:combiner操作發(fā)生在map端的,處理一個任務所接收的文件中的數(shù)據(jù),不能跨map任務執(zhí)行;只有reduce可以接收多個map任務處理的數(shù)據(jù)。
?*
?*/
總結
以上是生活随笔為你收集整理的mapreduce的规约(Combiner)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 厂商推出 12 代酷睿被动散热迷你主机,
- 下一篇: 经典游戏中的人设变迁,多年后你还认得那些