MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs
生活随笔
收集整理的這篇文章主要介紹了
MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
為利用Hadoop集群平臺的分布存儲和計算能力,基于MapReduce將ftp文件分布式下載并上傳到HDFS中。
1、文件移動流程:ftp服務(wù)器->datanode本地目錄->HDFS目錄;
2、實現(xiàn)主要基于兩個設(shè)計思想:
? ?1)將FTP服務(wù)器文件列表作為MapReduce處理的數(shù)據(jù)對象,按照文件名分布到不同Reduce節(jié)點下載和上傳到HDFS中;
? ?2)在每個datanode節(jié)點都建立一個本地文件保存目錄,最好是統(tǒng)一路徑名,這樣每個Reduce節(jié)點都把FTP服務(wù)器文件下載到該目錄下;
3、代碼主要過程:
? ?1)驅(qū)動類中先讀取FTP服務(wù)器上要下載的文件列表,并移入到hdfs中,作為Map函數(shù)的輸入;
? ?2)Map函數(shù)處理文件列表,獲取文件名字,作為Reduce函數(shù)輸入;
? ?3)Reduce函數(shù)根據(jù)輸入的文件名去下載ftp服務(wù)器上對應(yīng)的文件,并下載到datanode節(jié)點的統(tǒng)一本地目錄,再將本地目錄文件上傳到HDFS中;
4、主要技術(shù)點:
? ?1)FTPClient實現(xiàn)ftp文件下載;
? ?2)hadoop的IOUtils類實現(xiàn)文件從本地上傳到HDFS;
5、準(zhǔn)備工作
? ?1)ftp服務(wù)器端口、用戶名和密碼、下載文件目錄;
? ? ? linux下ftp命令:進(jìn)入$ftp ip/常用命令:ls/cd/put/get/mput/mget
? ?2)每個節(jié)點統(tǒng)一建立本地目錄/tmp/fjs/localftp,保存ftp服務(wù)器上下載的文件;
? ?3)Namenode上建立HDFS保存文件的目錄/tmp/fjs/ftp;
? ?4)Namenode上建立HDFS保存文件列表的目錄/tmp/fjs/in,即Map函數(shù)的輸入數(shù)據(jù);
6、具體代碼:
7、執(zhí)行結(jié)果
? ?1)執(zhí)行命令:yarn jar /mnt/mr.jar /tmp/fjs/in /tmp/fjs/ftp
? ?2)hadoop fs -ls /tmp/fjs/in 可以看到文件列表文件
? ?3)hadoop fs -ls /tmp/fjs/ftp 可以看到下載的文件
? ?4)每個節(jié)點ls -l /tmp/fjs/localftp,如果文件都遷入hdfs,應(yīng)該為空
1、文件移動流程:ftp服務(wù)器->datanode本地目錄->HDFS目錄;
2、實現(xiàn)主要基于兩個設(shè)計思想:
? ?1)將FTP服務(wù)器文件列表作為MapReduce處理的數(shù)據(jù)對象,按照文件名分布到不同Reduce節(jié)點下載和上傳到HDFS中;
? ?2)在每個datanode節(jié)點都建立一個本地文件保存目錄,最好是統(tǒng)一路徑名,這樣每個Reduce節(jié)點都把FTP服務(wù)器文件下載到該目錄下;
3、代碼主要過程:
? ?1)驅(qū)動類中先讀取FTP服務(wù)器上要下載的文件列表,并移入到hdfs中,作為Map函數(shù)的輸入;
? ?2)Map函數(shù)處理文件列表,獲取文件名字,作為Reduce函數(shù)輸入;
? ?3)Reduce函數(shù)根據(jù)輸入的文件名去下載ftp服務(wù)器上對應(yīng)的文件,并下載到datanode節(jié)點的統(tǒng)一本地目錄,再將本地目錄文件上傳到HDFS中;
4、主要技術(shù)點:
? ?1)FTPClient實現(xiàn)ftp文件下載;
? ?2)hadoop的IOUtils類實現(xiàn)文件從本地上傳到HDFS;
5、準(zhǔn)備工作
? ?1)ftp服務(wù)器端口、用戶名和密碼、下載文件目錄;
? ? ? linux下ftp命令:進(jìn)入$ftp ip/常用命令:ls/cd/put/get/mput/mget
? ?2)每個節(jié)點統(tǒng)一建立本地目錄/tmp/fjs/localftp,保存ftp服務(wù)器上下載的文件;
? ?3)Namenode上建立HDFS保存文件的目錄/tmp/fjs/ftp;
? ?4)Namenode上建立HDFS保存文件列表的目錄/tmp/fjs/in,即Map函數(shù)的輸入數(shù)據(jù);
6、具體代碼:
? ?1)主類FtpMR:驅(qū)動類加MapReduce類;
package ct.gd;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class FtpMR {public static class FtpMap extends Mapper<Object,Text,Text,Text>{private Text _key = new Text();private Text _value = new Text(); public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line = value.toString();//tag是隨機(jī)值,目的是將文件均勻分到各節(jié)點下載,隨機(jī)范圍根據(jù)集群節(jié)點數(shù),這里是0-100內(nèi)//假設(shè)下載文件有1000個,100隨機(jī)范圍,集群有100個節(jié)點,那每個節(jié)點均勻可能獲得10個文件下載,//map輸出的<key,value>,輸入reduce時,key值相同的會形成value list,因此設(shè)計該隨機(jī)key值String tag = ComMethod.getRandomNbr();_key.set(tag);_value.set(line);context.write(_key,_value);}}public static class FtpReduce extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{String ftpSrv=context.getConfiguration().get("ftpSrv");//獲取ftp服務(wù)器連接信息String outputPath=context.getConfiguration().get("outputPath");//獲取hdfs存放文件的目錄FtpUtil fu=new FtpUtil(); for(Text value:values){String filename=value.toString();//輸入的value是ftp服務(wù)器上的文件名String localFile=fu.DownFileToLocal(ftpSrv,filename);//下載文件到本地目錄,并返回文件保存的路徑if (localFile!=null) fu.WriteFileToHDFS(localFile,ComMethod.changeToDir(outputPath),filename);//本地文件上傳到hdfs中}}}public static void main(String[] args) throws Exception { Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: FtpMR <in> <out>");System.exit(2);}String inputPath=otherArgs[0];//FTP服務(wù)器保存文件列表的文件目錄String outputPath=otherArgs[1];//下載的ftp文件保存在hdfs中的目錄FtpUtil fu=new FtpUtil();//ftp服務(wù)器字符串格式:IP|port|username|password|file directoryString strFtpSrv="IP|port|name|password|directory"; //獲取ftp服務(wù)器上文件列表,保存到hdfs的inputPath目錄下if(!fu.getFtpFileList(strFtpSrv,inputPath)){System.err.println("下載ftp服務(wù)器文件列表失敗");System.exit(2);}//將ftp服務(wù)器的參數(shù)作為參數(shù)傳遞到Reduce中conf.set("ftpSrv", strFtpSrv);//將hdfs上保存下載文件的目錄傳遞到Reduce中conf.set("outputPath", outputPath);Job job = new Job(conf, "FtpToHdfs");job.setJarByClass(FtpMR.class);//job.setNumReduceTasks(1);//設(shè)置reduce輸入文件一個,方便查看結(jié)果job.setMapperClass(FtpMap.class);job.setReducerClass(FtpReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));System.exit(job.waitForCompletion(true) ? 0 : 1);} }? ?2)接口類FtpUtil:主要處理ftp文件下載和寫入hdfs中;
package ct.gd;import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream;import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPReply; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils;public class FtpUtil {/*下載文件列表處理函數(shù),開始*/public boolean getFtpFileList(String strFtpSrv,String inputPath){//從ftp服務(wù)器上讀取文件列表String[] FtpSrvConn=strFtpSrv.split("\\|");//截取ftp服務(wù)器連接信息 FTPClient ftp = new FTPClient();try { ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和passwordint reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); return false; } String remotePath=FtpSrvConn[4];//Ftp服務(wù)器上文件目錄ftp.changeWorkingDirectory(remotePath);FTPFile[] fs = ftp.listFiles(remotePath);StringBuffer buffer = new StringBuffer();for(FTPFile ff:fs){String fileName = ff.getName();buffer.append(fileName+"\n");}if(writeBufferToHDFSFile(buffer, inputPath)){ftp.logout();return true;}ftp.logout();} catch (IOException e) {System.out.println(e.getMessage());} finally { if (ftp.isConnected()) { try { ftp.disconnect(); } catch (IOException ioe) { System.out.println(ioe.getMessage()); } } } return false; }private boolean writeBufferToHDFSFile(StringBuffer buffer, String inputPath){//將文件列表寫到hdfs中Configuration conf = new Configuration();FileSystem fs = null;String fileName="fileLists.txt";try {fs = FileSystem.get(conf);inputPath = ComMethod.changeToDir(inputPath) + fileName;Path fsInputPath=new Path(inputPath);FSDataOutputStream outputStream = fs.create(fsInputPath);outputStream.write(buffer.toString().getBytes("UTF-8"));outputStream.flush();outputStream.sync();outputStream.close();return true;} catch (IOException e) {System.out.println(e.getMessage());}return false;}/*下載文件列表處理函數(shù),結(jié)束*//*下載文件處理函數(shù),開始*/public String DownFileToLocal(String ftpSrv,String filename){//在節(jié)點上創(chuàng)建本地保存下載文件的目錄String localPath="/tmp/fjs/localftp";File localDir = new File(localPath);//如果不存在就創(chuàng)建if(!localDir.exists()){localDir.mkdirs();}FTPClient ftp = new FTPClient();String[] FtpSrvConn=ftpSrv.split("\\|");//截取ftp服務(wù)器連接信息try { ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和passwordint reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); return null;} String remotePath=FtpSrvConn[4];//Ftp服務(wù)器上文件目錄ftp.changeWorkingDirectory(remotePath);String localFilePath = ComMethod.changeToDir(localPath) + filename;File localFile = new File(localFilePath);OutputStream is = new FileOutputStream(localFile);ftp.retrieveFile(filename, is);//下載is.close();ftp.logout();return localFilePath;} catch (IOException e) { System.err.println(e.getMessage());} finally { if (ftp.isConnected()) { try { ftp.disconnect(); } catch (IOException ioe) { } } } return null; }/*下載文件處理函數(shù),結(jié)束*//*上傳文件到hdfs處理函數(shù),開始*/public void WriteFileToHDFS(String localFile,String outputPath,String filename){Configuration conf = new Configuration();FileSystem fs = null;try {fs=FileSystem.get(conf);InputStream in = new BufferedInputStream(new FileInputStream(localFile));String ouputFile = outputPath + filename;//hdfs存放文件路勁和名字OutputStream out = fs.create(new Path(ouputFile));IOUtils.copyBytes(in, out, 1024*1024,true);//遷移out.flush();if(out!=null) out.close();if(in!=null) in.close();//刪除本地文件File _outputFileName = new File(localFile);if(_outputFileName.exists()) _outputFileName.delete();} catch (IOException e) {e.printStackTrace();} }/*上傳文件到hdfs處理函數(shù),結(jié)束*/public static void main(String[] args) throws Exception { } }? ?3)通用函數(shù)類ComMethod:主要是一些通用字符處理函數(shù);
package ct.gd;import java.util.Random;public class ComMethod {public static String changeToDir(String dirPath){//目錄最后是否有/if(dirPath.charAt(dirPath.length()-1)!='/'){dirPath = dirPath + "/";}return dirPath;}public static String getRandomNbr(){//獲取隨機(jī)數(shù)Random rand = new Random();String nbr = String.valueOf(rand.nextInt(100));return nbr;}}7、執(zhí)行結(jié)果
? ?1)執(zhí)行命令:yarn jar /mnt/mr.jar /tmp/fjs/in /tmp/fjs/ftp
? ?2)hadoop fs -ls /tmp/fjs/in 可以看到文件列表文件
? ?3)hadoop fs -ls /tmp/fjs/ftp 可以看到下載的文件
? ?4)每個節(jié)點ls -l /tmp/fjs/localftp,如果文件都遷入hdfs,應(yīng)該為空
總結(jié)
以上是生活随笔為你收集整理的MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce基础开发之四参数传递
- 下一篇: MapReduce基础开发之六Map多输