MapReduce基础开发之二数据去重和排序
生活随笔
收集整理的這篇文章主要介紹了
MapReduce基础开发之二数据去重和排序
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
因Hadoop集群平臺網絡限制,只能在eclipse里先寫好代碼再提交jar到集群平臺namenode上執行,不能實時調試,所以沒有配置eclipse的hadoop開發環境,只是引入了hadoop的lib包。
eclipse的hadoop開發環境配置可參考:http://www.cnblogs.com/xia520pi/archive/2012/05/20/2510723.html
MapReduce的基礎開發也是通過該博客系列學習到,很感謝!
1、數據去重
? ?在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚集成<key,value-list>后會交給reduce。?map將輸入的value復制到輸出數據的key上,并直接輸出;
? ?經過shuffle,相同key形成<key,value-list>,作為reduce的輸入;reduce將輸入中的key復制到輸出數據的key上,并直接輸出。利用MapReduce對key的匯聚機制將重復的數據去掉。
? ?1)在namenode系統tmp目錄下新建輸入文件file1.txt和file2.txt,上傳到hadoop
? ? ? ? ?命令:hadoop fs -put /tmp/file1.txt /tmp/fjs/in
? ? ? ? ?注意:file1.txt和file2.txt中要有重復的行,可以看出去重效果
? ?2)編碼DataDedup并導出mr.jar和上傳到namenode系統tmp目錄下
? ? ? ? ?注意:通過eclipse工程導出Runnable JAR file包含hadoop lib包,如通過Fatjar不要勾選one-JAR。
? ? ? ? 代碼如下:
package com.data;import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class DataDedup {public static class Map extends Mapper<Object,Text,Text,Text>{private static Text line=new Text();//每行數據public void map(Object key,Text value,Context context) throws IOException,InterruptedException{line=value;context.write(line, new Text(""));}}public static class Reduce extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{context.write(key, new Text(""));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: datadedup <in> <out>");System.exit(2);}Job job = new Job(conf, "data dedup");job.setJarByClass(DataDedup.class);job.setNumReduceTasks(1);//設置reduce輸入文件一個,方便查看結果,如設置為0就是不執行reduce,map就輸出結果job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }? ?3)執行jar并查看結果
? ? ? ? ?執行:yarn jar /tmp/mr.jar /tmp/fjs/in /tmp/fjs/out?
? ? ? ? ?注意:out目錄不能先創建,hadoop會自己創建
? ? ? ? ?查看:hadoop fs -text /tmp/fjs/out/part-r-00000.bz2
? ? ? ? 注意:這里執行的Hadoop集群平臺reduce輸出文件是bz2壓縮
? ? ? ? ? ? ? ? ? ?為方便觀察結果,設置reduce數量為1,job.setNumReduceTasks(1)
2、數據排序
? ?MapReduce過程中按照key值排序的,如果key為封裝int的IntWritable類型則按照數字大小對key排序,如果key為封裝為String的Text類型,則按照字典順序對字符串排序。
? ?在map中將讀入的數據轉化成IntWritable型,然后作為key值輸出(value任意)。reduce拿到<key,value-list>后,將輸入的key作為value輸出,并根據value-list中元素的個數決定輸出的次數。輸出的key(即代碼中的linenum)是一個全局變量,它統計當前key的位次。MapReduce過程中map和reduce即完成任務,不需要配置Combiner任務。
? ?1)在namenode系統tmp目錄下新建輸入文件file3.txt和file4.txt,上傳到hadoop
? ? ? ? 命令:hadoop fs -put /tmp/file3.txt /tmp/fjs/in1
? ? ? ? 注意:file3.txt和file4.txt中每行輸入整數,最好有重復;
? ?2)編碼DataSort并導出mr.jar和上傳到namenode系統tmp目錄下
? ? ? ? 注意:和DataDedup放在同一工程下,導出時,要選擇DataSort作為主類
? ? ? ? 代碼如下:
package com.data;import java.io.IOException; import java.util.Iterator;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class DataSort {//map將輸入中的value化成IntWritable類型,作為輸出的keypublic static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{private static IntWritable data=new IntWritable();public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line=value.toString();data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}//reduce將輸入中的key復制到輸出數據的key上,//然后根據輸入的value-list中元素的個數決定key的輸出次數//用全局linenum來代表key的位次public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{private static IntWritable linenum = new IntWritable(1);public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{Iterator<IntWritable> itVal=values.iterator();while(itVal.hasNext()){//for(IntWritable val:values){ context.write(linenum, key);linenum = new IntWritable(linenum.get()+1);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: datasort <in> <out>");System.exit(2);}Job job = new Job(conf, "data sort");job.setJarByClass(DataSort.class);job.setNumReduceTasks(1);//設置reduce輸入文件一個,方便查看結果,如設置為0就是不執行reduce,map就輸出結果job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }? ?3)執行jar并查看結果
? ? ? ? 執行:yarn jar /tmp/mr.jar /tmp/fjs/in1 /tmp/fjs/out1?
? ? ? ?查看:hadoop fs -text /tmp/fjs/out1/part-r-00000.bz2
3、總結:
? ?通過MapReduce開發之詞匯統計和排序以及數據去重和排序,進一步理解Map-Reduce的計算框架,其代碼處理流程通俗描述為:
? ?1)MapReduce對輸入的文件分片并按照行提取<key,value>,作為Map的輸入,其中key是每行的偏移量,value就是每一行的數據;
? ?2)MapReduce調用自定義的map函數來處理輸入的<key,value>并以<key,value>輸出加工后的數據;
? ?3)MapReduce開始shuffle處理map輸出的<key,value>,形成<key,value-list>,匯集相同key的值為list;
? ?4)最后MapReduce調用自定義的reduce函數來處理輸入的<key,value-list>,并以<key,value>輸出結果;
? ?整個步驟就是:分片-map-匯聚-reduce,在框架之上定義自己的map和reduce函數來加工數據。 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔為你收集整理的MapReduce基础开发之二数据去重和排序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce基础开发之一词汇统计和
- 下一篇: Java正则表达式基础案例和语法