Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)...
前言
首先確保已經(jīng)搭建好Hadoop集群環(huán)境,可以參考《Linux下Hadoop集群環(huán)境的搭建》一文的內(nèi)容。我在測(cè)試mapreduce任務(wù)時(shí),發(fā)現(xiàn)相比于使用Job.setNumReduceTasks(int)控制reduce任務(wù)數(shù)量而言,控制map任務(wù)數(shù)量一直是一個(gè)困擾我的問題。好在經(jīng)過很多摸索與實(shí)驗(yàn),終于梳理出來,希望對(duì)在工作中進(jìn)行Hadoop進(jìn)行性能調(diào)優(yōu)的新人們有個(gè)借鑒。本文只針對(duì)FileInputFormat的任務(wù)劃分進(jìn)行分析,其它類型的InputFormat的劃分方式又各有不同。雖然如此,都可以按照本文類似的方法進(jìn)行分析和總結(jié)。
為了簡(jiǎn)便起見,本文以Hadoop2.6.0自帶的word count例子為例,進(jìn)行展開。
wordcount
我們首先準(zhǔn)備好wordcount所需的數(shù)據(jù),一共有兩份文件,都位于hdfs的/wordcount/input目錄下:
這兩個(gè)文件的內(nèi)容分別為:
On the top of the Crumpretty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat.和But his face you could not see, On account of his Beaver Hat.有關(guān)如何操作hdfs并準(zhǔn)備好數(shù)據(jù)的細(xì)節(jié),本文不作贅述。
現(xiàn)在我們不作任何性能優(yōu)化(不增加任何配置參數(shù)),然后執(zhí)行hadoop-mapreduce-examples子項(xiàng)目(有關(guān)此項(xiàng)目介紹,可以閱讀《Hadoop2.6.0子項(xiàng)目hadoop-mapreduce-examples的簡(jiǎn)單介紹》一文)中自帶的wordcount例子:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result1當(dāng)然也可以使用樸素的方式運(yùn)行wordcount例子:
hadoop org.apache.hadoop.examples.WordCount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result1最后執(zhí)行的結(jié)果在hdfs的/wordcount/output/result1目錄下:
執(zhí)行結(jié)果可以查看/wordcount/output/result1/part-r-00000的內(nèi)容:
第一次優(yōu)化
wordcount例子,查看運(yùn)行結(jié)果不是本文的目的。在執(zhí)行wordcount例子時(shí),在任務(wù)運(yùn)行信息中可以看到創(chuàng)建的map及reduce任務(wù)的數(shù)量:
可以看到FileInputFormat的輸入文件有2個(gè),JobSubmitter任務(wù)劃分的數(shù)量是2,最后產(chǎn)生的map任務(wù)數(shù)量也是2,看到這我們可以猜想由于我們提供了兩個(gè)輸入文件,所以會(huì)有2個(gè)map任務(wù)。我們此處姑且不論這種猜測(cè)正確與否,現(xiàn)在我們打算改變map任務(wù)的數(shù)量。通過查看文檔,很多人知道使用mapreduce.job.maps參數(shù)可以快速修改map任務(wù)的數(shù)量,事實(shí)果真如此?讓我們先來實(shí)驗(yàn)一番,輸入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.job.maps=1 /wordcount/input /wordcount/output/result2執(zhí)行以上命令后,觀察輸出的信息,與之前未添加mapreduce.job.maps參數(shù)的輸出信息幾乎沒有變化。難道Hadoop的實(shí)現(xiàn)人員開了一個(gè)玩笑,亦或者這是一個(gè)bug?我們先給這個(gè)問題在我們的大腦中設(shè)置一個(gè)檢查點(diǎn),最后再來看看究竟是怎么回事。第二次優(yōu)化
用mapreduce.job.maps調(diào)整map任務(wù)數(shù)量沒有見效,我們翻翻文檔,發(fā)現(xiàn)還有mapreduce.input.fileinputformat.split.minsize參數(shù),它可以控制map任務(wù)輸入劃分的最小字節(jié)數(shù)。這個(gè)參數(shù)和mapreduce.input.fileinputformat.split.maxsize通常配合使用,后者控制map任務(wù)輸入劃分的最大字節(jié)數(shù)。我們目前只調(diào)整mapreduce.input.fileinputformat.split.minsize的大小,劃分最小的尺寸變小是否預(yù)示著任務(wù)劃分?jǐn)?shù)量變多?來看看會(huì)發(fā)生什么?輸入以下命令: hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.minsize=1 /wordcount/input /wordcount/output/result3執(zhí)行以上命令后,觀察輸出信息,依然未發(fā)生改變。好吧,弟弟不給力,我們用它的兄弟參數(shù)mapreduce.input.fileinputformat.split.maxsize來控制。如果我們將mapreduce.input.fileinputformat.split.maxsize改得很小,會(huì)怎么樣?輸入以下命令: hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result4這是的信息有了改變,我們似乎取得了想要的結(jié)果:呵呵,任務(wù)劃分成了177個(gè),想想也是,我們把最大的劃分字節(jié)數(shù)僅僅設(shè)置為1字節(jié)。接著往下看確實(shí)執(zhí)行了177個(gè)map任務(wù):
我們還可以通過Web UI觀察map任務(wù)所分配的Container。首先查看Slave1節(jié)點(diǎn)上分配的Container情況:
再來看看Slave2節(jié)點(diǎn)上分配的Container情況:
確實(shí)說明最多有15個(gè)Container分配給當(dāng)前作業(yè)執(zhí)行map任務(wù)。由于在YARN中yarn.nodemanager.resource.cpu-vcores參數(shù)的默認(rèn)值是8,所以Slave1和Slave2兩臺(tái)機(jī)器上的虛擬cpu總數(shù)是16,由于ResourceManager會(huì)為mapreduce任務(wù)分配一個(gè)Container給ApplicationMaster(即MrAppMaster),所以整個(gè)集群只剩余了15個(gè)Container用于ApplicationMaster向NodeManager申請(qǐng)和運(yùn)行map任務(wù)。
第三次優(yōu)化
閱讀文檔我們知道dfs.blocksize可以控制塊的大小,看看這個(gè)參數(shù)能否發(fā)揮作用。為便于測(cè)試,我們首先需要修改hdfs-site.xml中dfs.blocksize的大小為10m(最小就只能這么小,Hadoop限制了參數(shù)單位至少是10m)。
<property><name>dfs.blocksize</name><value>10m</value> </property>然后,將此配置復(fù)制到集群的所有NameNode和DataNode上。為了使此配置在不重啟的情況下生效,在NameNode節(jié)點(diǎn)上執(zhí)行以下命令: hadoop dfsadmin -refreshNodes yarn rmadmin -refreshNodes我們使用以下命令查看下系統(tǒng)內(nèi)的文件所占用的blocksize大小:
hadoop dfs -stat "%b %n %o %r %y" /wordcount/input/quangle*輸出結(jié)果如下:可以看到雖然quangle.txt和quangle2.txt的字節(jié)數(shù)分別是121字節(jié)和56字節(jié),但是在hdfs中這兩個(gè)文件的blockSize已經(jīng)是10m了。現(xiàn)在我們?cè)囋囈韵旅?#xff1a;
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result5觀察輸出信息,發(fā)現(xiàn)沒有任何效果。源碼分析
經(jīng)過以上3次不同實(shí)驗(yàn),發(fā)現(xiàn)只有mapreduce.input.fileinputformat.split.maxsize參數(shù)確實(shí)影響了map任務(wù)的數(shù)量。現(xiàn)在我們通過源碼分析,來一探究竟吧。
首先我們看看WordCount例子的源碼,其中和任務(wù)劃分有關(guān)的代碼如下:
for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1); 我們看到使用的InputFormat是FileOutputFormat,任務(wù)執(zhí)行調(diào)用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代碼如下: public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit();}// 省略本文不關(guān)心的代碼return isSuccessful();} 這里的submit方法的實(shí)現(xiàn)如下: public void submit() throws IOException, InterruptedException, ClassNotFoundException {// 省略本文不關(guān)心的代碼</span>final JobSubmitter submitter =?getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException,?ClassNotFoundException {return submitter.submitJobInternal(Job.this, cluster);}});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());}submit方法首先創(chuàng)建了JobSubmitter實(shí)例,然后異步調(diào)用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有關(guān)劃分任務(wù)的代碼如下:
// Create the splits for the jobLOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));int maps = writeSplits(job, submitJobDir);conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info("number of splits:" + maps); writeSplits方法的實(shí)現(xiàn)如下: private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;if (jConf.getUseNewMapper()) {maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}由于WordCount使用的是新的mapreduce API,所以最終會(huì)調(diào)用writeNewSplits方法。writeNewSplits的實(shí)現(xiàn)如下:
private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go firstArrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}writeNewSplits方法中,劃分任務(wù)數(shù)量最關(guān)鍵的代碼即為InputFormat的getSplits方法(提示:大家可以直接通過此處的調(diào)用,查看不同InputFormat的劃分任務(wù)實(shí)現(xiàn))。根據(jù)前面的分析我們知道此時(shí)的InputFormat即為FileOutputFormat,其getSplits方法的實(shí)現(xiàn)如下:
public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = new Stopwatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.elapsedMillis());}return splits;}getFormatMinSplitSize方法固定返回1,getMinSplitSize方法實(shí)際就是mapreduce.input.fileinputformat.split.minsize參數(shù)的值(默認(rèn)為1),那么變量minSize的大小為mapreduce.input.fileinputformat.split.minsize與1之間的最大值。
getMaxSplitSize方法實(shí)際是mapreduce.input.fileinputformat.split.maxsize參數(shù)的值,那么maxSize即為mapreduce.input.fileinputformat.split.maxsize參數(shù)的值。
由于我的試驗(yàn)中有兩個(gè)輸入源文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小為2。
在遍歷files列表的過程中,會(huì)獲取每個(gè)文件的blockSize,最終調(diào)用computeSplitSize方法計(jì)算每個(gè)輸入文件應(yīng)當(dāng)劃分的任務(wù)數(shù)。computeSplitSize方法的實(shí)現(xiàn)如下:
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));} 因此我們知道每個(gè)輸入文件被劃分的公式如下:map任務(wù)要?jiǎng)澐值拇笮?#xff08;splitSize?)=(maxSize與blockSize之間的最小值)與minSize之間的最大值
bytesRemaining 是單個(gè)輸入源文件未劃分的字節(jié)數(shù)
根據(jù)getSplits方法,我們知道m(xù)ap任務(wù)劃分的數(shù)量=輸入源文件數(shù)目 * (bytesRemaining / splitSize個(gè)劃分任務(wù)+bytesRemaining不能被splitSize 整除的剩余大小單獨(dú)劃分一個(gè)任務(wù)?)
總結(jié)
根據(jù)源碼分析得到的計(jì)算方法和之前的優(yōu)化結(jié)果,我們最后總結(jié)一下:
對(duì)于第一次優(yōu)化,由于FileOutputFormat壓根沒有采用mapreduce.job.maps參數(shù)指定的值,所以它當(dāng)然不會(huì)有任何作用。
對(duì)于第二次優(yōu)化,minSize幾乎由mapreduce.input.fileinputformat.split.minsize控制;mapreduce.input.fileinputformat.split.maxsize默認(rèn)的大小是Long.MAX_VALUE,所以blockSize即為maxSize與blockSize之間的最小值;blockSize的默認(rèn)大小是128m,所以blockSize與值為1的mapreduce.input.fileinputformat.split.minsize之間的最大值為blockSize,即map任務(wù)要?jiǎng)澐值拇笮〉拇笮∨cblockSize相同。
對(duì)于第三次優(yōu)化,雖然我們將blockSize設(shè)置為10m(最小也只能這么小了,hdfs對(duì)于block大小的最低限制),根據(jù)以上公式maxSize與blockSize之間的最小值必然是blockSize,而blockSize與minSize之間的最大值也必然是blockSize。說明blockSize實(shí)際上已經(jīng)發(fā)揮了作用,它決定了splitSize的大小就是blockSize。由于blockSize大于bytesRemaining,所以并沒有對(duì)map任務(wù)數(shù)量產(chǎn)生影響。
針對(duì)以上分析,我們用更加容易理解的方式列出這些配置參數(shù)的關(guān)系:
鳴謝
我在試驗(yàn)的過程中,遇到很多問題。但是很多問題在網(wǎng)絡(luò)上都能找到,特此感謝在互聯(lián)網(wǎng)上分享經(jīng)驗(yàn)的同仁們。
后記:個(gè)人總結(jié)整理的《深入理解Spark:核心思想與源碼分析》一書現(xiàn)在已經(jīng)正式出版上市,目前京東、當(dāng)當(dāng)、天貓等網(wǎng)站均有銷售,歡迎感興趣的同學(xué)購(gòu)買。
京東(現(xiàn)有滿150減50活動(dòng))):http://item.jd.com/11846120.html?
當(dāng)當(dāng):http://product.dangdang.com/23838168.html?
總結(jié)
以上是生活随笔為你收集整理的Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UIImagePikerControll
- 下一篇: 关于 继承、扩展和协议,深度好文