Hadoop生成HFile直接入库HBase心得
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                Hadoop生成HFile直接入库HBase心得
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                
                            
                            
                            轉載請標明出處:http://blackwing.iteye.com/blog/1991380?
 
 
hbase自帶了ImportTsv類,可以直接把tsv格式(官方教材顯示,是\t分割各個字段的文本格式)生成HFile,并且使用另外一個類org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移動到hbase對應的hdfs目錄。?
 
PS:網上看到一個XD說,直接生成HFile并入庫HBase效率不如先生成HFile,再通過LoadIncrementalHFiles移動文件到hbase目錄高,這點沒有驗證,我的做法也是先生成,再move。?
 
官方教材在此:?
Java代碼?? http://hbase.apache.org/book/ops_mgt.html#importtsv??  
 
但ImportTsv功能對我來說不適合,例如文件格式為:?
Java代碼?? topsid???uid???roler_num???typ????????time??  10??????111111???255?????????0???????1386553377000??  
 
ImportTsv導入的命令為:?
Java代碼?? bin/hbase?org.apache.hadoop.hbase.mapreduce.ImportTsv?-Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type?-Dimporttsv.bulk.output=hdfs://storefile-outputdir?<hdfs-data-inputdir>??  
 
 
它生成的表格式為:?
Java代碼?? row?: 10???  cf??:??kq??  qualifier:?topsid??  value:?10??  .....??  
 
而我要求的格式是:?
Java代碼?? row?: 10-111111-255??  cf??:??kq??  qualifier:?0??  value:?1??  
 
 
所以還是自己寫MR處理數據方便。?
Mapper:?
Java代碼?? /*?  ?*?adminOnOff.log?文件格式:?  ?*?topsid???uid???roler_num???typ???time?  ?*?*/??  public?class?HFileImportMapper2?extends??  ????????Mapper<LongWritable,?Text,?ImmutableBytesWritable,?KeyValue>?{??  ????protected?SimpleDateFormat?sdf?=?new?SimpleDateFormat("yyyyMMdd");??  ????protected?final?String?CF_KQ="kq";//考勤??  ????protected?final?int?ONE=1;??  ????@Override??  ????protected?void?map(LongWritable?key,?Text?value,Context?context)??  ????????????throws?IOException,?InterruptedException?{??  ????????String?line?=?value.toString();??  ????????System.out.println("line?:?"+line);??  ????????String[]?datas?=?line.split("\\s+");??  ????????//?row格式為:yyyyMMdd-sid-uid-role_num-timestamp-typ??  ????????String?row?=?sdf.format(new?Date(Long.parseLong(datas[4])))??  ????????????????+?"-"?+?datas[0]?+?"-"?+?datas[1]?+?"-"?+?datas[2]??  ????????????????+?"-"?+?datas[4]?+?"-"?+?datas[3];??  ????????ImmutableBytesWritable?rowkey?=?new?ImmutableBytesWritable(??  ????????????????Bytes.toBytes(row));??  ????????KeyValue?kv?=?new?KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(),?datas[3].getBytes(),Bytes.toBytes(this.ONE));??  ????????context.write(rowkey,?kv);??  ????????}??  }??  
 
 
job:?
Java代碼?? public?class?GenHFile2?{??  ????public?static?void?main(String[]?args)?{??  ????????Configuration?conf?=?new?Configuration();??  ????????conf.addResource("myConf.xml");??  ????????String?input?=?conf.get("input");??  ????????String?output?=?conf.get("output");??  ????????String?tableName?=?conf.get("source_table");??  ????????System.out.println("table?:?"+tableName);??  ????????HTable?table;??  ????????try?{??  ????????????//運行前,刪除已存在的中間輸出目錄??  ????????????try?{??  ????????????????FileSystem?fs?=?FileSystem.get(URI.create(output),?conf);??  ????????????????fs.delete(new?Path(output),true);??  ????????????????fs.close();??  ????????????}?catch?(IOException?e1)?{??  ????????????????e1.printStackTrace();??  ????????????}??  ??????????????  ????????????table?=?new?HTable(conf,tableName.getBytes());??  ????????????Job?job?=?new?Job(conf);??  ????????????job.setJobName("Generate?HFile");??  ??????????????  ????????????job.setJarByClass(HFileImportMapper2.class);??  ????????????job.setInputFormatClass(TextInputFormat.class);??  ????????????job.setMapperClass(HFileImportMapper2.class);??  ????????????FileInputFormat.setInputPaths(job,?input);??  ??????????????  //job.setReducerClass(KeyValueSortReducer.class);??  //job.setMapOutputKeyClass(ImmutableBytesWritable.class);??  //job.setMapOutputValueClass(KeyValue.class);??  ????????????job.getConfiguration().set("mapred.mapoutput.key.class",?"org.apache.hadoop.hbase.io.ImmutableBytesWritable");??  ????????????job.getConfiguration().set("mapred.mapoutput.value.class",?"org.apache.hadoop.hbase.KeyValue");??  ??????????????  //job.setOutputFormatClass(HFileOutputFormat.class);??  FileOutputFormat.setOutputPath(job,?new?Path(output));??  ????//job.setPartitionerClass(SimpleTotalOrderPartitioner.class);??  HFileOutputFormat.configureIncrementalLoad(job,table);??  ????????????try?{??  ????????????????job.waitForCompletion(true);??  ????????????}?catch?(InterruptedException?e)?{??  ????????????????e.printStackTrace();??  ????????????}?catch?(ClassNotFoundException?e)?{??  ????????????????e.printStackTrace();??  ????????????}??  ????????}?catch?(IOException?e)?{??  ????????????e.printStackTrace();??  ????????}??  ????}??  }??  
 
 
生成的HFile文件在hdfs的/output目錄下,已經根據cf名稱建好文件目錄:?
Java代碼?? hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560??  
 
其中:?
Java代碼?? HFileOutputFormat.configureIncrementalLoad(job,table);??  
 
根據其源碼知道,會自動為job設置好以下參數:?
Java代碼?? public?static?void?configureIncrementalLoad(Job?job,?HTable?table)??  throws?IOException?{??  ??Configuration?conf?=?job.getConfiguration();??  ??  ??job.setOutputKeyClass(ImmutableBytesWritable.class);??  ??job.setOutputValueClass(KeyValue.class);??  ??job.setOutputFormatClass(HFileOutputFormat.class);??  ??  ??//?Based?on?the?configured?map?output?class,?set?the?correct?reducer?to?properly??  ??//?sort?the?incoming?values.??  ??//?TODO?it?would?be?nice?to?pick?one?or?the?other?of?these?formats.??  ??if?(KeyValue.class.equals(job.getMapOutputValueClass()))?{??  ????job.setReducerClass(KeyValueSortReducer.class);??  ??}?else?if?(Put.class.equals(job.getMapOutputValueClass()))?{??  ????job.setReducerClass(PutSortReducer.class);??  ??}?else?if?(Text.class.equals(job.getMapOutputValueClass()))?{??  ????job.setReducerClass(TextSortReducer.class);??  ??}?else?{??  ????LOG.warn("Unknown?map?output?value?type:"?+?job.getMapOutputValueClass());??  ??}??  ??  ??conf.setStrings("io.serializations",?conf.get("io.serializations"),??  ??????MutationSerialization.class.getName(),?ResultSerialization.class.getName(),??  ??????KeyValueSerialization.class.getName());??  ??  ??//?Use?table's?region?boundaries?for?TOP?split?points.??  ??LOG.info("Looking?up?current?regions?for?table?"?+?Bytes.toString(table.getTableName()));??  ??List<ImmutableBytesWritable>?startKeys?=?getRegionStartKeys(table);??  ??LOG.info("Configuring?"?+?startKeys.size()?+?"?reduce?partitions?"?+??  ??????"to?match?current?region?count");??  ??job.setNumReduceTasks(startKeys.size());??  ??  ??configurePartitioner(job,?startKeys);??  ??//?Set?compression?algorithms?based?on?column?families??  ??configureCompression(table,?conf);??  ??configureBloomType(table,?conf);??  ??configureBlockSize(table,?conf);??  ??  ??TableMapReduceUtil.addDependencyJars(job);??  ??TableMapReduceUtil.initCredentials(job);??  ??LOG.info("Incremental?table?"?+?Bytes.toString(table.getTableName())?+?"?output?configured.");??  }??  
 
 
HFileOutputFormat只支持寫單個column family,如果有多個cf,則需要寫多個job來實現了。?
                            
                        
                        
                        hbase自帶了ImportTsv類,可以直接把tsv格式(官方教材顯示,是\t分割各個字段的文本格式)生成HFile,并且使用另外一個類org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移動到hbase對應的hdfs目錄。?
PS:網上看到一個XD說,直接生成HFile并入庫HBase效率不如先生成HFile,再通過LoadIncrementalHFiles移動文件到hbase目錄高,這點沒有驗證,我的做法也是先生成,再move。?
官方教材在此:?
Java代碼??
但ImportTsv功能對我來說不適合,例如文件格式為:?
Java代碼??
ImportTsv導入的命令為:?
Java代碼??
它生成的表格式為:?
Java代碼??
而我要求的格式是:?
Java代碼??
所以還是自己寫MR處理數據方便。?
Mapper:?
Java代碼??
job:?
Java代碼??
生成的HFile文件在hdfs的/output目錄下,已經根據cf名稱建好文件目錄:?
Java代碼??
其中:?
Java代碼??
根據其源碼知道,會自動為job設置好以下參數:?
Java代碼??
HFileOutputFormat只支持寫單個column family,如果有多個cf,則需要寫多個job來實現了。?
總結
以上是生活随笔為你收集整理的Hadoop生成HFile直接入库HBase心得的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: hadoop 之DefaultStrin
- 下一篇: HDFS 的Trash回收站功能的配置、
