达观数据分析平台架构和Hive实践
http://www.infoq.com/cn/articles/hadoop-ten-years-part03
編者按:Hadoop于2006年1月28日誕生,至今已有10年,它改變了企業對數據的存儲、處理和分析的過程,加速了大數據的發展,形成了自己的極其火爆的技術生態圈,并受到非常廣泛的應用。在2016年Hadoop十歲生日之際,InfoQ策劃了一個Hadoop熱點系列文章,為大家梳理Hadoop這十年的變化,技術圈的生態狀況,回顧以前,激勵以后。
近十年來,隨著Hadoop生態系統的不斷完善,Hadoop早已成為大數據事實上的行業標準之一。面對當今互聯網產生的巨大的TB甚至PB級原始數據,利用基于Hadoop的數據倉庫解決方案Hive早已是Hadoop的熱點應用之一。達觀數據團隊長期致力于研究和積累Hadoop系統的技術和經驗,并構建起了分布式存儲、分析、挖掘以及應用的整套大數據處理平臺。
本文將從Hive原理、數據分析平臺架構、數據分析實戰、Hive優化等四個方面來分享一些關于系統架構和Hive的心得和實戰經驗,希望大家有所收獲。
1 Hive原理
Hadoop是一個流行的開源框架,用來存儲和處理商用硬件上的大規模數據集。對于HDFS上的海量日志而言,編寫Mapreduce程序代碼對于類似數據倉庫的需求來說總是顯得相對于難以維護和重用,Hive作為一種基于Hadoop的數據倉庫解決方案應運而生,并得到了廣泛應用。
Hive是基于Hadoop的數據倉庫平臺,由Facebook貢獻,其支持類似SQL的結構化查詢功能。Facebook設計開發Hive的初衷就是讓那些熟悉sql編程方式的人也可以更好的利用hadoop,hive可以讓數據分析人員只關注于具體業務模型,而不需要深入了解Map/Reduce的編程細節,但是這并不意味著使用hive不需要了解和學習Map/Reduce編程模型和hadoop。對于Hive分析人員來說,深入了解Hadoop和Hive的原理和Mapreduce模型,對于優化查詢總有益處。
1.1 Hive組件與模型
Hive的組件總體上可以分為以下幾個部分:用戶接口(UI)、驅動、編譯器、元數據(Hive系統參數數據)和執行引擎。Hive中包含4中數據模型:Tabel、ExternalTable、Partition、Bucket。
圖:hive數據模型
a) Table:每一個Table在Hive中都有一個相應的目錄來存儲數據;
b) Partition:表中的一個Partition對應于表下的一個目錄,所有的Partition數據都存儲在對應的目錄中;
c) Buckets:對指定列計算的hash,根據hash值切分數據,目的是為了便于并行,每一個Buckets對應一個文件;
d) External Table指向已存在HDFS中的數據,可創建Partition。
讀時驗證機制
與傳統數據庫對表數據進行寫時嚴重不同,Hive對數據的驗證方式為讀時模式,即只有在讀表數據的時候,hive才檢查解析具體的字段、shema等,從而保證了大數據量的快速加載。
如果表schema與表文件內容不匹配,Hive會盡其所能的去讀數據。如果schema中表有10個字段,而文件記錄卻只有3個字段,那么其中7個字段將為null;如果某些字段類型定位為數值類型,但是記錄中卻為非數值字符串,這些字段也將會被轉換為null。Hive會努力catch讀數據時遇到的錯誤,并努力返回。既然Hive表數據存儲在HDFS中且Hive采用的是讀時驗證方式,定義完表的schema會自動生成表數據的HDFS目錄,且我們可以以任何可能的方式來加載表數據或者利用HDFS API將數據寫入文件,同理,當我們若需要將hive數據寫入其他庫(如oracle),也可以直接通過api讀取數據再寫入目標庫。
再次注意,加載或者寫入的數據內容要和表定義的schema一致,否則將會造成字段或者表為空。
1.2 HQL翻譯成MapReduce Job
Hive編譯器將HQL代碼轉換成一組操作符(operator),操作符是Hive的最小操作單元,每個操作符代表了一種HDFS操作或者MapReduce作業。Hive中的操作符包括:TableScanOperator、ReduceSinkOperator、JoinOperator、SelectOperator、FileSinkOperator、FilterOperator、GroupByOperator、MapJoinOperator等。
Hive語句
INSERT OVERWRITE TABLE read_log_tmp SELECT a.userid,a.bookid,b.author,b.categoryid FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;其執行計劃為:
圖:join的任務執行流程
1.3 與一般SQL的區別
Hive 視圖與一般數據庫視圖
Hive視圖只支持邏輯視圖,不支持物化視圖,即每次對視圖的查詢hive都將執行查詢任務,因此視圖不會帶來性能上的提升。作為Hive查詢優化的一部分,對視圖的查詢條件語句和視圖的定義查詢條件語句將會盡可能的合并成一個條件查詢。
Hive索引與一般數據庫索引
Hive1.2.1版本目前支持的索引類型有CompactIndexHandler和Bitmap。
CompactIndexHandler 壓縮索引通過將列中相同的值得字段進行壓縮從而減小存儲和加快訪問時間。需要注意的是Hive創建壓縮索引時會將索引數據也存儲在Hive表中。對于表tb_index (id int, name string) 而言,建立索引后的索引表中默認的三列一次為索引列(id)、hdfs文件地址(_bucketname)、偏移量(offset)。
Bitmap 位圖索引作為一種常見的索引,如果索引列只有固定的幾個值,那么就可以采用位圖索引來加速查詢。利用位圖索引可以方便的進行AND/OR/XOR等各類計算,Hive0.8版本開始引入位圖索引,位圖索引在大數據處理方面的應用廣泛,比如可以利用bitmap來計算用戶留存率(索引做與運算,效率遠好于join的方式)。如果Bitmap索引很稀疏,那么就需要對索引壓縮以節省存儲空間和加快IO。Hive的Bitmap Handler采用的是EWAH(https://github.com/lemire/javaewah)壓縮方式。
2 數據分析平臺
2.1 架構與模塊
達觀數據分析平臺包括數據收集加載模塊、數據分析計算模塊、任務調度系統以及可視化系統。
圖:數據分析平臺基本框架
數據收集模塊
數據模塊負責收集移動端app、網頁端以及服務器端大量的日志數據。移動端可自行開發數據上報功能或者使用sdk來上報數據。網頁端利用植入的js將用戶的行為進行上報,服務器端通過http server來收集上報的數據。服務器端的日志信息可以通過DX模塊(一個跨庫的數據交換系統)來將待處理數據推入hive數據分析平臺。除此之外,數據來源還包括大量的user 、item基本數據等等。數據收集完成將所有需要處理分析的原始數據推入hadoop平臺。從物理形式來看,即將待分析數據寫入HDFS。
數據ETL模塊
一般而言,上報的數據都是非結構化或者半結構化的。ETL(抽取、轉換、加載)模塊負責將所有的非結構或者半結構的數據轉換成結構化的數據并加載到hive庫表中。例如對于用戶訪問日志(可能是web server日志),我們需要從每行日志中抽取出用戶的標識(cookie、imei或者userid),ip來源、url等。從形式上來看,ETL將HDFS的原始數據結構化,以表的形式提供分析。
數據分析與計算
根據業務需求和功能,利用HQL實現各種統計分析。一個Hive任務的來源表可能是多個,結果數據也有可能會寫入多張表。
圖:Hive任務執行輸入輸出
任務調度系統
從上圖可以看出,Hive任務之間存在依賴關系,不至于Hive任務之間存在依賴,Hive任務與DX任務之間、DX任務之間都可能存在某種依賴關系,達觀數據分析平臺支持的任務類型還包括MR任務、shell任務等,達觀數據分析平臺自行開發司南調度系統來完成平臺中所有任務的調度。關于司南調度系統可見后續討論。
數據分析平臺模塊
圖:數據分析平臺基本模塊
接下來將陸續介紹,數據分析平臺中的兩個重要模塊:DX數據交換系統以及任務調度系統。
2.2 DX數據交換
DX系統可以在關系型數據庫、Hive、FTP等系統之間實現數據的交換。DX定義了Writer和Reader接口來抽象對數據的讀寫操作,對于各種存儲類型的數據,需定制他們的實現方法。
關系型數據庫利用JDBC實現其讀寫功能;對于Hive而言,直接利用HDFS API實現對HDFS文件的讀寫,由于Hive的讀時驗證機制,需要在讀寫Hive表文件時,定義其字段個數、名稱等信息,保證與表定義一致;FTP文件目前的處理方法是先將數據從FTP服務器拉下來,然后將讀取文件內容,寫入Hive數據庫。
以上過程是其他數據源到Hive的數據傳輸過程,Hive數據同樣可以通過DX系統寫入其他數據源。
2.3 任務調度
達觀數據分析平臺開發的司南調度系統將任務分為資源依賴型和實踐依賴型。時間依賴型任務類似于crontab定時任務一樣,到時觸發其執行。資源依賴型任務需要其依賴的資源都滿足時才會觸發其執行。可調度的任務類型包括DX任務、Hive任務、MR任務、shell任務等。
司南系統中最為關鍵的是dispatcher模塊,該模塊通過zookeeper來調度任務在agent(執行任務的代理服務器,需要設置多個)上的運行,關于zookeeper如何協調分布式應用的一致性在此不再累述。
2.4 架構演化
達觀數據分析平臺在使用過程中,不斷提高其易用性和穩定性。在大量的研究和開發過程中,平臺從無到有,走出第一步到功能完善、發揮巨大的業務價值。
從分散的數據交換到集中的數據交換系統
在使用統一的數據交換系統DX后,各業務系統的數據可以更好的進行匯聚和打通,進行統一的分析和處理。
從分散的作業調度到集中的任務調度系統
每天幾千規模的任務數使得任務的調度極其困難,特別是當任務之間存在依賴關系時,顯然簡單的通過crontab已經無法滿足業務的需求。司南調度系統保證所有任務有序正確的運行。
從批量式處理到集成流式處理
隨著實時統計分析的需求越來越多,hive查詢基于MR任務來實現的缺點日益明顯(任務啟動開銷大)。為了提供實時的數據分析請求,平臺開始引入storm流式計算模型。Storm以數據流為驅動。觸發計算,每來一條數據就產生一次計算結果,時效性非常高,在業界也得到了豐富的應用。
從關系型數據庫到Hbase
初期,數據分析的結果數據都是通過DX導入關系型數據庫,以便數據可視化平臺調用或者其他系統使用,大量的數據造成關系數據庫的日益龐大,帶來嚴重的性能問題。HBase是一個開源、列式分布式的數據庫,基于HDFS文件系統,可以方面的和Hive進行集成。經過集成HBase,為可視化平臺和線上系統提供服務,降低DX任務量,降低訪問延遲。
3 Hive分析實踐
3.1 Schema設計
沒有通用的schema,只有合適的schema。在設計Hive的schema的時候,需要考慮到存儲、業務上的高頻查詢造成的開銷等等,設計適合自己的數據模型。
設置分區表
對于Hive來說,利用分區來設計表總是必要的,分區提供了一種隔離數據和優化查詢的便利的方式。設置分區時,需要考慮被設置成分區的字段,按照時間分區一般而言就是一個好的方案,其好處在于其是按照不同時間粒度來確定合適大小的數據積累量,隨著時間的推移,分區數量的增長是均勻的,分區的大小也是均勻的。
避免小文件
雖然分區有利于隔離數據和查詢,設置過多過細的分區也會帶來瓶頸,主要是因為過多的分區意味著文件的數目就越多,過多增長的小文件會給namecode帶來巨大的性能壓力。同時小文件過多會影響JOB的執行,hadoop會將一個job轉換成多個task,即使對于每個小文件也需要一個task去單獨處理,帶來性能開銷。因此,hive表設計的分區不應該過多過細,每個目錄下的文件足夠大,應該是文件系統中塊大小的若干倍。
選擇文件格式
Hive提供的默認文件存儲格式有textfile、sequencefile、rcfile等。用戶也可以通過實現接口來自定義輸入輸的文件格式。
在實際應用中,textfile由于無壓縮,磁盤及解析的開銷都很大,一般很少使用。Sequencefile以鍵值對的形式存儲的二進制的格式,其支持針對記錄級別和塊級別的壓縮。rcfile是一種行列結合的存儲方式(text file和sequencefile都是行表[row table]),其保證同一條記錄在同一個hdfs塊中,塊以列式存儲。rcfile的聚合運算不一定總是存在,但是rcfile的高壓縮率確實減少文件大小,因此實際應用中,rcfile總是成為不二的選擇,達觀數據平臺在選擇文件存儲格式時也大量選擇了rcfile方案。
3.2 統計分析
本節將從排序和窗口函數兩個方面的介紹Hive的統計分析功能。
排名
熱門排名在實際的業務場景中經常遇見。例如最受歡迎的書籍、銷量TOP100的商品等等。再實際情況下,我們不僅需要考慮各量化指標,還需要考慮置信度問題。
最簡單的排名:ORDER BY value LIMIT n
上述查詢僅僅考慮了量化指標,排名不夠平滑,波動較大。
各種排名方法眾多,達觀數據分析平臺在進行item 排名多采用基于用戶投票的排名算法。如基于威爾遜區間的排名算法,該算法可以較好的解決小樣本的不準確問題。
圖:威爾遜區間
窗口分析函數
Hive提供了豐富了數學統計函數,同時也提供了用戶自定義函數的接口,用戶可以自定義UDF、UDAF、UDTF Hive 0.11版本開始提供窗口和分析函數(Windowing and Analytics Functions),包括LEAD、LAG、FIRST_VALUE、LAST_VALUE、RANK、ROW_NUMBER、PERCENT_RANK、CUBE、ROLLUP等。窗口函數與聚合函數一樣,都是對表子集的操作,從結果上看,區別在于窗口函數的結果不會聚合,原有的每行記錄依然會存在。窗口函數的典型分析應用包括:按分區聚合(排序,top n問題)、行間計算(時間序列分析)、關聯計算(購物籃分析)。
我們以一個簡單的行間計算的例子說明窗口函數的應用(關于其他函數的具體說明,請參考hive文檔)。用戶閱讀行為的統計分析需要從點擊書籍行為中歸納統計出來。用戶瀏覽日志結構如下表所示,每條記錄為用戶的單次點擊行為。
通過對連續的用戶點擊日志分析,通過Hive提供的窗口分析函數可以計算出用戶各章節的閱讀時間。
SELECT userid, bookid, chapterid, end_time – start_time as read_time FROM (SELECT userid, bookid, chapterid, log_time as start_time, lead(log_time,1,null) over(partition by userid, bookid order by log_time) as end_time FROM user_read_log where pt=’2015-12-01’ ) t;通過上述查詢既可以找出2015-12-01日所有用戶對每一章節的閱讀時間。只能通過開發mr代碼或者實現udaf來實現上述功能。
窗口分析函數關鍵在于定義的窗口數據集及其對窗口的操作,通過over(窗口定義語句)來定義窗口。日常分析和實際應用中,經常會有窗口分析應用的場景,例如基于分區的排序、集合、統計等復雜操作。例如我們需要統計每個用戶閱讀時間最多的3本書:
圖:行間計算示意圖及代碼
窗口函數使得Hive的具備了完整的數據分析功能,在實際的應用環境中,達觀數據分析團隊大量使用hive窗口分析函數來實現較為復雜的邏輯,提高開發和迭代效率。
3.3 用戶畫像
用戶畫像即基于真實數據的用戶模型。簡單來說,用戶畫像提取了用戶的屬性信息、行為信息,從而歸納統計出其人口學特征、偏好特征等。建立用戶模型的首要任務就是提取特征,既包括用戶基本特征,也包括行為特征和統計特征。
用戶模型本質上就是刻畫用戶興趣的模型,而用戶的興趣模型是多維度、多尺度的。刻畫用戶模型還需要從時間上進行度量,甚至是進行多尺度的組合,根據用戶行為統計時間的長短,可以將用戶的偏好分為短期偏好和長期偏好。偏好的權重即為用戶的偏好程度的度量。
對用戶偏好的描述,還需要考慮置信度的問題,例如對于一個閱讀行為極其稀疏的用戶來說,刻畫其閱讀類別偏好是毫無意義的。
圖:用戶畫像刻畫
3.4 反作弊分析
眾所周知,存在排名就可能存在作弊。搜索廣告、索互聯網刷單、刷榜現象層出不窮。一般來說,作弊的目的都是為了提高自己的排名,或者是降低對手的排名。利用Hive對數據進行分析可以過濾掉較明顯的作弊數據,達到數據清洗的目的。
例如對于一個刷榜作弊行為,需要作弊著不斷刷日志行為來提高其排名,我們可以指定若干規則來過濾作弊數據。如同IP同物品同行為數目異常、同用戶ID行為頻次異常、同物品ID行為頻次異常等等。如下圖,如果相比于所有item的平均增長趨勢,如果某item的增長趨勢相對平均水平過大,那么其作弊的概率就比較高。
圖:作弊數據趨勢與平均趨勢數據對比
作弊分析還需要結合業務需求和特點,采用合適的機器學習算法來進行更進一步的判斷和過濾,達到反作弊的目標。
4 Hive優化
達觀的數據倉庫基于Hive搭建,每日需要處理大量的計算流程,Hive的穩定性和性能至關重要。眾多的任務需要我們合理的調節分配集群資源,合理的配置各參數,合理的優化查詢。Hive優化包含各個方面,如job個數優化、job的map/reducer個數優化、并行執行優化等等,本節將主要討論HQL中的無時不在的JOIN的優化經驗。
4.1 Join語句
對于上述的join語句,其中book_info表數量為千規模,
INSERT OVERWRITE TABLE read_log_tmp SELECT a.userid,a.bookid,b.author FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;該語句的執行計劃為:
圖:map join的任務執行流程
對于小數據量,hive會自動采取map join的方式來優化join,從mapreduce的編程模型來看,實現join的方式主要有map端join、reduce端join。Map端join利用hadoop 分布式緩存技術通過將小表變換成hashtable文件分發到各個task,map大表時可以直接判斷hashtable來完成join,注意小表的hashtable是放在內存中的,在內存中作匹配,因此map join是一種非常快的join方式,也是一種常見的優化方式。如果小表夠小,那么就可以以map join的方式來完成join完成。Hive通過設置hive.auto.convert.join=true(默認值)來自動完成map join的優化,而無需顯示指示map join。缺省情況下map join的優化是打開的。
Reduce端join需要reducer來完成join過程,對于上述join代碼,reduce 端join的mr流程如下,
圖:reduce端join的mapreduce過程
相比于map join, reduce 端join無法再map過程中過濾任何記錄,只能將join的兩張表的所有數據按照join key進行shuffle/sort,并按照join key的hash值將<key,value>對分發到特定的reducer。Reducer對于所有的鍵值對執行join操作,例如0號(bookid的hash值為0)reducer收到的鍵值對如下,其中T1、T2表示記錄的來源表,起到標識作用:
圖:reduce端join的reducer join
Reducer端join無法避免的reduce截斷以及傳輸的大量數據都會給集群網絡帶來壓力,從上圖可以看出所有hash(bookid) % reducer_number等于0的key-value對都會通過shuffle被分發到0號reducer,如果分到0號reducer的記錄數目遠大于其他reducer的記錄數目,顯然0號的reducer的數據處理量將會遠大于其他reducer,因此處理時間也會遠大于其他reducer,甚至會帶來內存等其他問題,這就是數據傾斜問題。對于join造成的數據傾斜問題我們可以通過設置參數set Hive.optimize.skewjoin=true,讓hive自己嘗試解決join過程中產生的傾斜問題。
4.2 Group by語句
我們對user_read_log表按userid goup by語句來繼續探討數據傾斜問題,首先我們explain group by語句:
explain select userid,count(*) from user_read_log group by userid
圖:goup by的執行計劃
Group by的執行計劃按照userid的hash值分發數據,同時在map端也做了本地reduce,group by的shuffle過程是按照hash(userid)來分發的,實際應用中日志中很多用戶都是未注冊用戶或者未登錄,userid字段為空的記錄數遠大于userid不為空的記錄數,當所有的空userid記錄都分發到特定某一個reducer后,也會帶來嚴重的數據傾斜問題。造成數據傾斜的主要原因在于分發到某個或某幾個reducer的數據量遠大于其他reducer的數據量。
對于group by造成的數據傾斜問題,我們可以通過設置參數
set hive.map.aggr=true (開啟map端combiner);
set hive.groupby.skewindata=true;
這個參數的作用是做reduce操作的時候,拿到的key并不是所有相同值給同一個Reduce,而是隨機分發,然后reduce做聚合,做完之后再做一輪MR,拿前面聚合過的數據再算結果。雖然多了一輪MR任務,但是可以有效的減少數據傾斜問題可能帶來的危險。
Hive解決數據傾斜
正確的設置Hive參數可以在某種程度上避免的數據傾斜問題,合適的查詢語句也可以避免數據傾斜問題。要盡早的過濾數據和裁剪數據,減少后續處理的數據量,使得join key的數據分布較為均勻,將空字段隨機賦予值,這樣既可以均勻分發傾斜的數據:
select userid,name from user_info a join (select case when userid is null then cast(rand(47)*100000 as int)else userid from user_read_log ) b on a.userid = b.userid如果用戶在定義schema的時候就已經預料到表數據可能會存在嚴重的數據傾斜問題,Hive自0.10.0引入了skew table的概念,如建表語句
CREATE TABLE user_read_log (userid int,bookid, …) SKEWED BY (userid) ON (null) [STORED AS DIRECTORIES];需要注意的是,skew table只是將傾斜特別嚴重的列的分開存儲為不同的文件,每個制定的傾斜值制定為一個文件或者目錄,因此在查詢的時候可以通過過濾傾斜值來避免數據傾斜問題:
select userid,name from user_info a join ( select userid from user_read_log where pt=’2015’ and userid is not null ) b on a.userid = b.userid可以看出,如果不加過濾條件,傾斜問題還是會存在,通過對skew table加過濾條件的好處是避免了mapper的表掃描過濾操作。
4.3 Join的物理優化
Hive內部實現了MapJoinResolver(處理MapJoin)、SkewJoinResolver(處理傾斜join)、CommonJoinResolver(處理普通Join)等類來實現join的查詢物理優化(/org/apache/hadoop/hive/ql/optimizer/physical)。
CommonJoinResolver類負責將普通Join轉換成MapJoin,Hive通過這個類來實現mapjoin的自動優化。對于表A和表B的join查詢,會產生3個分支:
1) 以表A作為大表進行Mapjoin;
2) 以表A作為大表進行Mapjoin;
3) Map-reduce join
由于不知道輸入數據規模,因此編譯時并不會決定走那個分支,而是在運行時判斷走那個分支。需要注意的是要像完成上述自動轉換,需要將hive.auto.convert.join.noconditionaltask設置為true(默認值),同時可以手工控制轉載進內存的小表的大小(hive.auto.convert.join.noconditionaltask.size)。
MapJoinResolver 類負責迭代各個mr任務,檢查每個任務是否存在map join操作,如果有,會將local map work轉換成local map join work。
SkewJoinResolver類負責迭代有join操作的reducer任務,一旦單個reducer產生了傾斜,那么就會將傾斜值得數據寫入hdfs,然后用一個新的map join的任務來處理傾斜值的計算。雖然多了一輪mr任務,但是由于采用的map join,效率也是很高的。良好的mr模式和執行流程總是至關重要的。
5 總結
本文詳細介紹了達觀大數據分析平臺的基本架構和原理,基于hadoop/hive的大數據分析平臺使海量數據的存儲、分析、挖掘逐步成為現實,并帶來意想不到的益處。作為數據分析平臺主力軍的Hive仍然處在不斷的發展之中,將HQL理解成Mapreduce程序、理解Hadoop的核心能力是更好的使用和優化Hive的根本。達觀數據團隊也將緊跟技術發展潮流,結合自身的業務需求,采取合理的框架架構,提升數據平臺的處理能力。
6 參考資料
- Hive wiki:https://cwiki.apache.org/confluence/display/Hive/Home
- Hive Design Docs:https://cwiki.apache.org/confluence/display/Hive/DesignDocs
- Hadoop: The Definitive Guide (3rd Edition)
- Programming Hive
- Analytical Queries with Hive:http://www.slideshare.net/Hadoop_Summit/analytical-queries-with-hive
轉載于:https://www.cnblogs.com/davidwang456/articles/9746333.html
總結
以上是生活随笔為你收集整理的达观数据分析平台架构和Hive实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 兼顾稳定和性能,58大数据平台的技术演进
- 下一篇: 大数据sql引擎