内存有限的情况下 Spark 如何处理 T 级别的数据?
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                内存有限的情况下 Spark 如何处理 T 级别的数据?
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                
                            
                            
                            簡單起見,下述答案僅就無shuffle的單stage Spark作業做了概要解釋。對于多stage任務而言,在內存的使用上還有很多其他重要問題沒有覆蓋。部分內容請參考評論中?
@邵賽賽
?給出的補充。Spark確實擅長內存計算,內存容量不足時也可以回退,但題主給出的條件(8GB內存跑1TB數據)也確實是過于苛刻了……
 
首先需要解開的一個誤區是,對于Spark這類內存計算系統,并不是說要處理多大規模的數據就需要多大規模的內存。Spark相對Hadoop MR有大幅性能提升的一個前提就是大量大數據作業同一時刻需要加載進內存的數據只是整體數據的一個子集,且大部分情況下可以完全放入內存,正如Shark(Spark上的Hive兼容的data warehouse)論文1.1節所述:
 
 
在Spark內部,單個executor進程內RDD的分片數據是用Iterator流式訪問的,Iterator的hasNext方法和next方法是由RDD lineage上各個transformation攜帶的閉包函數復合而成的。該復合Iterator每訪問一個元素,就對該元素應用相應的復合函數,得到的結果再流式地落地(對于shuffle stage是落地到本地文件系統留待后續stage訪問,對于result stage是落地到HDFS或送回driver端等等,視選用的action而定)。如果用戶沒有要求Spark cache該RDD的結果,那么這個過程占用的內存是很小的,一個元素處理完畢后就落地或扔掉了(概念上如此,實現上有buffer),并不會長久地占用內存。只有在用戶要求Spark cache該RDD,且storage level要求在內存中cache時,Iterator計算出的結果才會被保留,通過cache manager放入內存池。
 
簡單起見,暫不考慮帶shuffle的多stage情況和流水線優化。這里拿最經典的log處理的例子來具體說明一下(取出所有以ERROR開頭的日志行,按空格分隔并取第2列):
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 按傳統單機immutable FP的觀點來看,上述代碼運行起來好像是:
 
把HDFS上的日志文件全部拉入內存形成一個巨大的字符串數組, Filter一遍再生成一個略小的新的字符串數組, 再map一遍又生成另一個字符串數組。  
真這么玩兒的話Spark早就不用混了……
 
如前所述,Spark在運行時動態構造了一個復合Iterator。就上述示例來說,構造出來的Iterator的邏輯概念上大致長這樣:
new Iterator[String] {private var head: String = _private var headDefined: Boolean = falsedef hasNext: Boolean = headDefined || {do {try head = readOneLineFromHDFS(...) // (1) read from HDFScatch {case _: EOFException => return false}} while (!head.startsWith("ERROR")) // (2) filter closuretrue}def next: String = if (hasNext) {headDefined = falsehead.split(" ")(1) // (3) map closure} else {throw new NoSuchElementException("...")} } 上面這段代碼是我按照Spark中FilteredRDD、MappedRDD的定義和Scala Iterator的filter、map方法的框架寫的偽碼,并且省略了從cache或checkpoint中讀取現成結果的邏輯。1、2、3三處便是RDD lineage DAG中相應邏輯嵌入復合出的Iterator的大致方式。每種RDD變換嵌入復合Iterator的具體方式是由不同的RDD以及Scala Iterator的相關方法定義的。可以看到,用這個Iterator訪問整個數據集,空間復雜度是O(1)。可見,Spark RDD的immutable語義并不會造成大數據內存計算任務的龐大內存開銷。
 
然后來看加cache的情況。我們假設errors這個RDD比較有用,除了拿出空格分隔的第二列以外,可能在同一個application中我們還會再頻繁用它干別的事情,于是選擇將它cache住:
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")).cache() // <-- !!! val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 加了cache之后有什么變化呢?實際上相當于在上述復合Iterator偽碼的(2)處,將filter出來的文本行逐一追加到了內存中的一個ArrayBuffer[String]里存起來形成一個block,然后通過cache manager扔進受block manager管理的內存池。注意這里僅僅cache了filter出來的結果,HDFS讀出的原始數據沒有被cache,對errors做map操作后得到的messages RDD也沒有被cache。這樣一來,后續任務復用errors這個RDD時,直接從內存中取就好,就不用重新計算了。
                        
                        
                        首先需要解開的一個誤區是,對于Spark這類內存計算系統,并不是說要處理多大規模的數據就需要多大規模的內存。Spark相對Hadoop MR有大幅性能提升的一個前提就是大量大數據作業同一時刻需要加載進內存的數據只是整體數據的一個子集,且大部分情況下可以完全放入內存,正如Shark(Spark上的Hive兼容的data warehouse)論文1.1節所述:
In fact, one study [1] analyzed the access patterns in the Hive warehouses at Facebook and discovered that for the vast majority (96%) of jobs, the entire inputs could fit into a fraction of the cluster’s total memory.
[1] G. Ananthanarayanan, A. Ghodsi, S. Shenker, and I. Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.
至于數據子集仍然無法放入集群物理內存的情況,Spark仍然可以妥善處理,下文還會詳述。在Spark內部,單個executor進程內RDD的分片數據是用Iterator流式訪問的,Iterator的hasNext方法和next方法是由RDD lineage上各個transformation攜帶的閉包函數復合而成的。該復合Iterator每訪問一個元素,就對該元素應用相應的復合函數,得到的結果再流式地落地(對于shuffle stage是落地到本地文件系統留待后續stage訪問,對于result stage是落地到HDFS或送回driver端等等,視選用的action而定)。如果用戶沒有要求Spark cache該RDD的結果,那么這個過程占用的內存是很小的,一個元素處理完畢后就落地或扔掉了(概念上如此,實現上有buffer),并不會長久地占用內存。只有在用戶要求Spark cache該RDD,且storage level要求在內存中cache時,Iterator計算出的結果才會被保留,通過cache manager放入內存池。
簡單起見,暫不考慮帶shuffle的多stage情況和流水線優化。這里拿最經典的log處理的例子來具體說明一下(取出所有以ERROR開頭的日志行,按空格分隔并取第2列):
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 按傳統單機immutable FP的觀點來看,上述代碼運行起來好像是:
如前所述,Spark在運行時動態構造了一個復合Iterator。就上述示例來說,構造出來的Iterator的邏輯概念上大致長這樣:
new Iterator[String] {private var head: String = _private var headDefined: Boolean = falsedef hasNext: Boolean = headDefined || {do {try head = readOneLineFromHDFS(...) // (1) read from HDFScatch {case _: EOFException => return false}} while (!head.startsWith("ERROR")) // (2) filter closuretrue}def next: String = if (hasNext) {headDefined = falsehead.split(" ")(1) // (3) map closure} else {throw new NoSuchElementException("...")} } 上面這段代碼是我按照Spark中FilteredRDD、MappedRDD的定義和Scala Iterator的filter、map方法的框架寫的偽碼,并且省略了從cache或checkpoint中讀取現成結果的邏輯。1、2、3三處便是RDD lineage DAG中相應邏輯嵌入復合出的Iterator的大致方式。每種RDD變換嵌入復合Iterator的具體方式是由不同的RDD以及Scala Iterator的相關方法定義的。可以看到,用這個Iterator訪問整個數據集,空間復雜度是O(1)。可見,Spark RDD的immutable語義并不會造成大數據內存計算任務的龐大內存開銷。
然后來看加cache的情況。我們假設errors這個RDD比較有用,除了拿出空格分隔的第二列以外,可能在同一個application中我們還會再頻繁用它干別的事情,于是選擇將它cache住:
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")).cache() // <-- !!! val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 加了cache之后有什么變化呢?實際上相當于在上述復合Iterator偽碼的(2)處,將filter出來的文本行逐一追加到了內存中的一個ArrayBuffer[String]里存起來形成一個block,然后通過cache manager扔進受block manager管理的內存池。注意這里僅僅cache了filter出來的結果,HDFS讀出的原始數據沒有被cache,對errors做map操作后得到的messages RDD也沒有被cache。這樣一來,后續任務復用errors這個RDD時,直接從內存中取就好,就不用重新計算了。
總結
以上是生活随笔為你收集整理的内存有限的情况下 Spark 如何处理 T 级别的数据?的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 程序员必备的代码审查(Code Revi
- 下一篇: MapReduce的自制Writable
