hadooppythonsql_python - hadoop,mapreduce demo
Hadoop,mapreduce?介紹
59888745@qq.com
大數(shù)據(jù)工程師是在Linux系統(tǒng)下搭建Hadoop生態(tài)系統(tǒng)(cloudera是最大的輸出者類似于Linux的紅帽),
把用戶的交易或行為信息通過HDFS(分布式文件系統(tǒng))等存儲用戶數(shù)據(jù)文件,然后通過Hbase(類似于NoSQL)等存儲數(shù)據(jù),再通過Mapreduce(并行計算框架)等計算數(shù)據(jù),然后通過hiv或pig(數(shù)據(jù)分析平臺)等分析數(shù)據(jù),最后按照用戶需要重現(xiàn)出數(shù)據(jù).
Hadoop是一個由Apache基金會所開發(fā)的開源分布式系統(tǒng)基礎架構
Hadoop,最基礎的也就是HDFS和Mapreduce了,
HDFS是一個分布式存儲文件系統(tǒng)
Mapreduce是一個分布式計算的框架,兩者結合起來,就可以很容易做一些分布式處理任務了
大綱:
一、MapReduce 基本原理
二、MapReduce 入門示例 - WordCount 單詞統(tǒng)計
三、MapReduce 執(zhí)行過程分析
實例1 - 自定義對象序列化
實例2 - 自定義分區(qū)
實例3 - 計算出每組訂單中金額最大的記錄
實例4 - 合并多個小文件
實例5 - 分組輸出到多個文件
四、MapReduce 核心流程梳理
實例6 - join 操作
實例7 - 計算出用戶間的共同好友
五、下載方式
一、MapReduce基本原理
MapReduce是一種編程模型,用于大規(guī)模數(shù)據(jù)集的分布式運算。
1、MapReduce通俗解釋
圖書館要清點圖書數(shù)量,有10個書架,管理員為了加快統(tǒng)計速度,找來了10個同學,每個同學負責統(tǒng)計一個書架的圖書數(shù)量。
張同學統(tǒng)計 書架1
王同學統(tǒng)計 書架2
劉同學統(tǒng)計 書架3
……
過了一會兒,10個同學陸續(xù)到管理員這匯報自己的統(tǒng)計數(shù)字,管理員把各個數(shù)字加起來,就得到了圖書總數(shù)。
這個過程就可以理解為MapReduce的工作過程。
2、MapReduce中有兩個核心操作
(1)map
管理員分配哪個同學統(tǒng)計哪個書架,每個同學都進行相同的“統(tǒng)計”操作,這個過程就是map。
(2)reduce
每個同學的結果進行匯總,這個過程是reduce。
3、MapReduce工作過程拆解
下面通過一個景點案例(單詞統(tǒng)計)看MapReduce是如何工作的。
有一個文本文件,被分成了4份,分別放到了4臺服務器中存儲
Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather
現(xiàn)在要統(tǒng)計出每個單詞的出現(xiàn)次數(shù)。
處理過程
(1)拆分單詞
map節(jié)點1
輸入:“the weather is good”
輸出:(the,1),(weather,1),(is,1),(good,1)
map節(jié)點2
輸入:“today is good”
輸出:(today,1),(is,1),(good,1)
map節(jié)點3
輸入:“good weather is good”
輸出:(good,1),(weather,1),(is,1),(good,1)
map節(jié)點4
輸入:“today has good weather”
輸出:(today,1),(has,1),(good,1),(weather,1)
(2)排序
map節(jié)點1
map節(jié)點2
map節(jié)點3
map節(jié)點4
(3)合并
map節(jié)點1
map節(jié)點2
map節(jié)點3
map節(jié)點4
(4)匯總統(tǒng)計
每個map節(jié)點都完成以后,就要進入reduce階段了。
例如使用了3個reduce節(jié)點,需要對上面4個map節(jié)點的結果進行重新組合,比如按照26個字母分成3段,分配給3個reduce節(jié)點。
Reduce節(jié)點進行統(tǒng)計,計算出最終結果。
這就是最基本的MapReduce處理流程。
4、MapReduce編程思路
了解了MapReduce的工作過程,我們思考一下用代碼實現(xiàn)時需要做哪些工作?
在4個服務器中啟動4個map任務
每個map任務讀取目標文件,每讀一行就拆分一下單詞,并記下來次單詞出現(xiàn)了一次
目標文件的每一行都處理完成后,需要把單詞進行排序
在3個服務器上啟動reduce任務
每個reduce獲取一部分map的處理結果
reduce任務進行匯總統(tǒng)計,輸出最終的結果數(shù)據(jù)
但不用擔心,MapReduce是一個非常優(yōu)秀的編程模型,已經(jīng)把絕大多數(shù)的工作做完了,我們只需要關心2個部分:
map處理邏輯——對傳進來的一行數(shù)據(jù)如何處理?輸出什么信息?
reduce處理邏輯——對傳進來的map處理結果如何處理?輸出什么信息?
編寫好這兩個核心業(yè)務邏輯之后,只需要幾行簡單的代碼把map和reduce裝配成一個job,然后提交給Hadoop集群就可以了。
至于其它的復雜細節(jié),例如如何啟動map任務和reduce任務、如何讀取文件、如對map結果排序、如何把map結果數(shù)據(jù)分配給reduce、reduce如何把最終結果保存到文件等等,MapReduce框架都幫我們做好了,而且還支持很多自定義擴展配置,例如如何讀文件、如何組織map或者reduce的輸出結果等等,后面的示例中會有介紹。
二、MapReduce入門示例:WordCount單詞統(tǒng)計
WordCount是非常好的入門示例,相當于helloword,下面就開發(fā)一個WordCount的MapReduce程序,體驗實際開發(fā)方式。
example:
#刪除已有文件夾
hadoop fs -rmr /chenshaojun/input/example_1
hadoop fs -rmr /chenshaojun/output/example_1
#創(chuàng)建輸入文件夾
hadoop fs -mkdir /chenshaojun/input/example_1
#放入輸入文件
hadoop fs -put text* /chenshaojun/input/example_1
#查看文件是否放好
hadoop fs -ls /chenshaojun/input/example_1
#本地測試一下map和reduce
head -20 text1.txt | python count_mapper.py | sort | python count_reducer.py
#集群上跑任務
hadoop jar /usr/lib/hadoop-current/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-file count_mapper.py \ ? #提交文件到集群
-mapper count_mapper.py \
-file count_reducer.py \
-reducer count_reducer.py \
-input /chenshaojun/input/example_1 \
-output /chenshaojun/output/example_1 ? # 必須不存在,若存在output會抱錯,不會覆蓋
count_mapper.py
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word.lower(), 1)
count_reducer.py
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
總結
以上是生活随笔為你收集整理的hadooppythonsql_python - hadoop,mapreduce demo的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: multipartfile 获取音频时长
- 下一篇: 车连星账号密码忘了咋办?