Spark SQL Catalyst源代码分析Optimizer
??/**?Spark SQL源代碼分析系列*/
? 前幾篇文章介紹了Spark SQL的Catalyst的核心運行流程、SqlParser,和Analyzer?以及核心類庫TreeNode,本文將具體解說Spark SQL的Optimizer的優(yōu)化思想以及Optimizer在Catalyst里的表現(xiàn)方式,并加上自己的實踐。對Optimizer有一個直觀的認識。
? Optimizer的主要職責是將Analyzer給Resolved的Logical Plan依據(jù)不同的優(yōu)化策略Batch。來對語法樹進行優(yōu)化。優(yōu)化邏輯計劃節(jié)點(Logical Plan)以及表達式(Expression),也是轉(zhuǎn)換成物理運行計劃的前置。例如以下圖:
??
一、Optimizer
? Optimizer這個類是在catalyst里的optimizer包下的唯一一個類。Optimizer的工作方式事實上相似Analyzer,由于它們都繼承自RuleExecutor[LogicalPlan],都是運行一系列的Batch操作:
??
? Optimizer里的batches包括了3類優(yōu)化策略:1、Combine Limits 合并Limits ?2、ConstantFolding 常量合并 3、Filter Pushdown 過濾器下推,每一個Batch里定義的優(yōu)化伴隨對象都定義在Optimizer里了:
object Optimizer extends RuleExecutor[LogicalPlan] {val batches =Batch("Combine Limits", FixedPoint(100),CombineLimits) ::Batch("ConstantFolding", FixedPoint(100),NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions) ::Batch("Filter Pushdown", FixedPoint(100),CombineFilters,PushPredicateThroughProject,PushPredicateThroughJoin,ColumnPruning) :: Nil }
? 另外提一點,Optimizer里不但對Logical Plan進行了優(yōu)化,并且對Logical Plan中的Expression也進行了優(yōu)化,所以有必要了解一下Expression相關(guān)類。主要是用到了references和outputSet,references主要是Logical Plan或Expression節(jié)點的所依賴的那些Expressions,而outputSet是Logical Plan所有的Attribute的輸出:
? 如:Aggregate是一個Logical Plan, 它的references就是group by的表達式 和 aggreagate的表達式的并集去重。
case class Aggregate(groupingExpressions: Seq[Expression],aggregateExpressions: Seq[NamedExpression],child: LogicalPlan)extends UnaryNode {override def output = aggregateExpressions.map(_.toAttribute)override def references =(groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet }??
二、優(yōu)化策略具體解釋
? Optimizer的優(yōu)化策略不僅有對plan進行transform的,也有對expression進行transform的,究其原理就是遍歷樹。然后應用優(yōu)化的Rule,可是注意一點,對Logical Plantransfrom的是先序遍歷(pre-order),而對Expression transfrom的時候是后序遍歷(post-order):2.1、Batch:?Combine Limits
假設(shè)出現(xiàn)了2個Limit,則將2個Limit合并為一個,這個要求一個Limit是還有一個Limit的grandChild。 /*** Combines two adjacent [[Limit]] operators into one, merging the* expressions into one single expression.*/ object CombineLimits extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll為當前Limit,le為其expression。 nl是ll的grandChild,ne是nl的expressionLimit(If(LessThan(ne, le), ne, le), grandChild) //expression比較,假設(shè)ne比le小則表達式為ne,否則為le} }給定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")?scala> query.queryExecution.analyzed res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Limit 10Project [key#13,value#14]Limit 100Project [key#13,value#14]MetastoreRelation default, temp_shengli, None
子查詢里limit100,外層查詢limit10,這里我們當然能夠在子查詢里不必查那么多。由于外層僅僅須要10個,所以這里會合并Limit10。和Limit100 為 Limit 10。
2.2、Batch:?ConstantFolding
? 這個Batch里包括了Rules:NullPropagation,ConstantFolding。BooleanSimplification,SimplifyFilters。SimplifyCasts。SimplifyCaseConversionExpressions。
2.2.1、Rule:NullPropagation
? 這里先提一下Literal字面量。它事實上是一個能匹配隨意基本類型的類。(為下文做鋪墊)
object Literal {def apply(v: Any): Literal = v match {case i: Int => Literal(i, IntegerType)case l: Long => Literal(l, LongType)case d: Double => Literal(d, DoubleType)case f: Float => Literal(f, FloatType)case b: Byte => Literal(b, ByteType)case s: Short => Literal(s, ShortType)case s: String => Literal(s, StringType)case b: Boolean => Literal(b, BooleanType)case d: BigDecimal => Literal(d, DecimalType)case t: Timestamp => Literal(t, TimestampType)case a: Array[Byte] => Literal(a, BinaryType)case null => Literal(null, NullType)} }? 注意Literal是一個LeafExpression,核心方法是eval,給定Row。計算表達式返回值:
case class Literal(value: Any, dataType: DataType) extends LeafExpression {override def foldable = truedef nullable = value == nulldef references = Set.emptyoverride def toString = if (value != null) value.toString else "null"type EvaluatedType = Anyoverride def eval(input: Row):Any = value }? 如今來看一下NullPropagation都做了什么。
??NullPropagation是一個能將Expression Expressions替換為等價的Literal值的優(yōu)化。并且能夠避免NULL值在SQL語法樹的傳播。
/*** Replaces [[Expression Expressions]] that can be statically evaluated with* equivalent [[Literal]] values. This rule is more specific with* Null value propagation from bottom to top of the expression tree.*/ object NullPropagation extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //假設(shè)count(null)則轉(zhuǎn)化為count(0)case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">//假設(shè)sum(null)則轉(zhuǎn)化為sum(0)</span>case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)case e @ Coalesce(children) => {val newChildren = children.filter(c => c match {case Literal(null, _) => falsecase _ => true})if (newChildren.length == 0) {Literal(null, e.dataType)} else if (newChildren.length == 1) {newChildren(0)} else {Coalesce(newChildren)}}case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValuecase e @ In(Literal(v, _), list) if (list.exists(c => c match {case Literal(candidate, _) if candidate == v => truecase _ => false})) => Literal(true, BooleanType)// Put exceptional cases above if anycase e: BinaryArithmetic => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: BinaryComparison => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: StringRegexExpression => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}}} }給定SQL:?val query = sql("select count(null) from temp_shengli where key is not null")
scala> query.queryExecution.analyzed res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [], [COUNT(null) AS c0#5L] //這里count的是nullFilter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None調(diào)用NullPropagation
scala> NullPropagation(query.queryExecution.analyzed) res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [], [CAST(0, LongType) AS c0#5L] //優(yōu)化后為0了Filter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None
2.2.2、Rule:ConstantFolding?
? 常量合并是屬于Expression優(yōu)化的一種,對于能夠直接計算的常量,不用放到物理運行里去生成對象來計算了,直接能夠在計劃里就計算出來: object ConstantFolding extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform { //先對plan進行transformcase q: LogicalPlan => q transformExpressionsDown { //對每一個plan的expression進行transform// Skip redundant folding of literals.case l: Literal => lcase e if e.foldable => Literal(e.eval(null), e.dataType) //調(diào)用eval方法計算結(jié)果}}}給定SQL:?val query = sql("select 1+2+3+4 from temp_shengli")scala> query.queryExecution.analyzed res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(((1 + 2) + 3) + 4) AS c0#21] //這里還是常量表達式MetastoreRelation default, src, None優(yōu)化后:
scala> query.queryExecution.optimizedPlan res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [10 AS c0#21] //優(yōu)化后。直接合并為10MetastoreRelation default, src, None
2.2.3、BooleanSimplification
?這個是對布爾表達式的優(yōu)化,有點像java布爾表達式中的短路推斷。只是這個寫的倒是非常優(yōu)雅。
?看看布爾表達式2邊能不能通過僅僅計算1邊,而省去計算還有一邊而提高效率,稱為簡化布爾表達式。
?解釋請看我寫的凝視:
/*** Simplifies boolean expressions where the answer can be determined without evaluating both sides.* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus* is only safe when evaluations of expressions does not result in side effects.*/ object BooleanSimplification extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case and @ And(left, right) => //假設(shè)布爾表達式是AND操作,即exp1 and exp2(left, right) match { //(左邊表達式。右邊表達式)case (Literal(true, BooleanType), r) => r // 左邊true。返回右邊的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span>case (l, Literal(true, BooleanType)) => l //右邊true,返回左邊的bool值case (Literal(false, BooleanType), _) => Literal(false)//左邊都false,右邊隨便。反正是返回falsecase (_, Literal(false, BooleanType)) => Literal(false)//僅僅要有1邊是false了,都是falsecase (_, _) => and}case or @ Or(left, right) =>(left, right) match {case (Literal(true, BooleanType), _) => Literal(true) //僅僅要左邊是true了,不用推斷右邊都是truecase (_, Literal(true, BooleanType)) => Literal(true) //僅僅要有一邊是true,都返回truecase (Literal(false, BooleanType), r) => r //希望右邊r是truecase (l, Literal(false, BooleanType)) => lcase (_, _) => or}}} }
2.3 Batch:?Filter Pushdown
Filter Pushdown下包括了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruningPs:感覺Filter Pushdown的名字起的有點不能涵蓋所有比方ColumnPruning列裁剪。2.3.1、Combine Filters
?合并兩個相鄰的Filter,這個和上述Combine Limit差點兒相同。合并2個節(jié)點,就能夠降低樹的深度從而降低反復運行過濾的代價。/*** Combines two adjacent [[Filter]] operators into one, merging the* conditions into one conjunctive predicate.*/ object CombineFilters extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)} }給定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")?
優(yōu)化前:我們看到一個filter 是還有一個filter的grandChild
scala> query.queryExecution.analyzed res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#27]Filter (key#27 > 80) //filter>80Project [key#27]Filter (key#27 > 100) //filter>100MetastoreRelation default, src, None優(yōu)化后:事實上filter也能夠表達為一個復雜的boolean表達式
scala> query.queryExecution.optimizedPlan res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#27]Filter ((key#27 > 100) && (key#27 > 80)) //合并為1個MetastoreRelation default, src, None
2.3.2 ?Filter Pushdown?
? Filter Pushdown,過濾器下推。
? 原理就是更早的過濾掉不須要的元素來降低開銷。
? 給定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")
? 生成的邏輯計劃為:
scala> scala> query.queryExecution.analyzed res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =? Project [key#31]Filter (key#31 > 100) //先select key, value,然后再FilterProject [key#31,value#32]MetastoreRelation default, src, None?優(yōu)化后的計劃為:
query.queryExecution.optimizedPlan res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#31]Filter (key#31 > 100) //先filter,然后再selectMetastoreRelation default, src, None
2.3.3、ColumnPruning
? 列裁剪用的比較多,就是降低不必要select的某些列。? 列裁剪在3種地方能夠用:? 1、在聚合操作中,能夠做列裁剪? 2、在join操作中,左右孩子能夠做列裁剪? 3、合并相鄰的Project的列object ColumnPruning extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {// Eliminate attributes that are not needed to calculate the specified aggregates.case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => 假設(shè)project的outputSet中減去a.references的元素假設(shè)不同,那么就將Aggreagte的child替換為a.referencesa.copy(child = Project(a.references.toSeq, child))// Eliminate unneeded attributes from either side of a Join.case Project(projectList, Join(left, right, joinType, condition)) =>//?消除join的left 和 right孩子的不必要屬性,將join的左右子樹的列進行裁剪// Collect the list of off references required either above or to evaluate the condition.val allReferences: Set[Attribute] =projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)/** Applies a projection only when the child is producing unnecessary attributes */def prunedChild(c: LogicalPlan) =if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {Project(allReferences.filter(c.outputSet.contains).toSeq, c)} else {c}Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))// Combine adjacent Projects.case Project(projectList1, Project(projectList2, child)) => //合并相鄰Project的列// Create a map of Aliases to their values from the child projection.// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).val aliasMap = projectList2.collect {case a @ Alias(e, _) => (a.toAttribute: Expression, a)}.toMap// Substitute any attributes that are produced by the child projection, so that we safely// eliminate it.// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'// TODO: Fix TransformBase to avoid the cast below.val substitutedProjection = projectList1.map(_.transform {case a if aliasMap.contains(a) => aliasMap(a)}).asInstanceOf[Seq[NamedExpression]]Project(substitutedProjection, child)// Eliminate no-op Projectscase Project(projectList, child) if child.output == projectList => child} }分別舉三個樣例來相應三種情況進行說明:1、在聚合操作中,能夠做列裁剪給定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key")優(yōu)化前:
res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51,value#52] //優(yōu)化前默認select key 和 value兩列MetastoreRelation default, temp_shengli, None優(yōu)化后:
scala> ColumnPruning1(query.queryExecution.analyzed) MetastoreRelation default, temp_shengli, None res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51] //優(yōu)化后。列裁剪掉了value,僅僅select keyMetastoreRelation default, temp_shengli, None
2、在join操作中,左右孩子能夠做列裁剪
給定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b ?on a.key =b.key ")
沒有優(yōu)化之前:
scala> query.queryExecution.analyzed res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [value#42 AS qween#39]Join Inner, Some((key#41 = key#43))Project [key#41,value#42] //這里多select了一列,即valueMetastoreRelation default, temp_shengli, NoneProject [key#43,value#44] //這里多select了一列。即valueMetastoreRelation default, temp_shengli, None優(yōu)化后:(ColumnPruning2是我自己調(diào)試用的)
scala> ColumnPruning2(query.queryExecution.analyzed) allReferences is -> Set(key#35, key#37) MetastoreRelation default, temp_shengli, None MetastoreRelation default, temp_shengli, None res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#35 AS qween#33]Join Inner, Some((key#35 = key#37))Project [key#35] //經(jīng)過列裁剪之后,left Child僅僅須要select key這一個列MetastoreRelation default, temp_shengli, NoneProject [key#37] //經(jīng)過列裁剪之后。right Child僅僅須要select key這一個列MetastoreRelation default, temp_shengli, None3、合并相鄰的Project的列,裁剪
給定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ") ?
優(yōu)化前:
scala> query.queryExecution.analyzed res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(c#56 + 1) AS c0#57]Project [(1 + 1) AS c#56]MetastoreRelation default, temp_shengli, None優(yōu)化后:
scala> query.queryExecution.optimizedPlan res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(2 AS c#56 + 1) AS c0#57] //將子查詢里的c 代入到 外層select里的c,直接計算結(jié)果MetastoreRelation default, temp_shengli, None
三、總結(jié):
? 本文介紹了Optimizer在Catalyst里的作用即將Analyzed Logical Plan 經(jīng)過對Logical Plan和Expression進行Rule的應用transfrom,從而達到樹的節(jié)點進行合并和優(yōu)化。當中基本的優(yōu)化的策略總結(jié)起來是合并、列裁剪、過濾器下推幾大類。
? Catalyst應該在不斷迭代中,本文僅僅是基于spark1.0.0進行研究。興許假設(shè)新增加的優(yōu)化策略也會在興許補充進來。
? 歡迎大家討論。共同進步!
——EOF——
原創(chuàng)文章,轉(zhuǎn)載請注明:
轉(zhuǎn)載自:OopsOutOfMemory盛利的Blog。作者:?OopsOutOfMemory
本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/38121259
注:本文基于署名-非商業(yè)性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協(xié)議,歡迎轉(zhuǎn)載、轉(zhuǎn)發(fā)和評論,可是請保留本文作者署名和文章鏈接。如若須要用于商業(yè)目的或者與授權(quán)方面的協(xié)商,請聯(lián)系我。
總結(jié)
以上是生活随笔為你收集整理的Spark SQL Catalyst源代码分析Optimizer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: javascript中的console.
- 下一篇: 生产环境中配置的samba
