Spark-Sql源码解析之八 Codegen详解大数据

Codegen,动态字节码技术,那么什么是动态字节码技术呢?先看来一段代码,假设SparkPlanSort
case class Sort( 
    sortOrder: Seq[SortOrder], 
    global: Boolean, 
    child: SparkPlan) 
  extends UnaryNode { 
  override def requiredChildDistribution: Seq[Distribution] = 
    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil 
 
  protected override def doExecute(): RDD[Row] = attachTree(this, "sort") { 
    child.execute().mapPartitions( { iterator => 
      val ordering = newOrdering(sortOrder, child.output) 
      iterator.map(_.copy()).toArray.sorted(ordering).iterator 
    }, preservesPartitioning = true) 
  } 
 
  override def output: Seq[Attribute] = child.output 
 
  override def outputOrdering: Seq[SortOrder] = sortOrder 
} 
 
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { 
protected def newOrdering(order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[Row] = { 
  if (codegenEnabled) {//开启动态字节码技术 
    GenerateOrdering.generate(order, inputSchema) 
  } else {//否则关闭 
    new RowOrdering(order, inputSchema) 
  } 
} 
}

可见针对Sort的SparkPlan,针对是否开启动态字节码技术的情况下会发生两种情况:当关闭的时候,其Compare函数如下:

class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] { 
  def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = 
    this(ordering.map(BindReferences.bindReference(_, inputSchema))) 
 
  def compare(a: Row, b: Row): Int = { 
    var i = 0 
    while (i < ordering.size) { 
      val order = ordering(i) 
      val left = order.child.eval(a)//虚函数调用,然后装箱 
      val right = order.child.eval(b)//虚函数调用,然后装箱 
 
      if (left == null && right == null) { 
        // Both null, continue looking. 
      } else if (left == null) { 
        return if (order.direction == Ascending) -1 else 1 
      } else if (right == null) { 
        return if (order.direction == Ascending) 1 else -1 
      } else { 
        val comparison = order.dataType match { 
          case n: AtomicType if order.direction == Ascending => 
            n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)//调用具体对象的compare函数 
          case n: AtomicType if order.direction == Descending => 
            n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)//调用具体对象的compare函数 
          case other => sys.error(s"Type $other does not support ordered operations") 
        } 
        if (comparison != 0) return comparison 
      } 
      i += 1 
    } 
    return 0 
  } 
}

其涉及到虚函数调用及装箱,虚函数的调用相对普通函数而言比较耗时。

当开启动态字节码技术的时候,其Compare函数如下:

object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] with Logging { 
  import scala.reflect.runtime.{universe => ru} 
  import scala.reflect.runtime.universe._ 
 
 protected def canonicalize(in: Seq[SortOrder]): Seq[SortOrder] = 
    in.map(ExpressionCanonicalizer.execute(_).asInstanceOf[SortOrder]) 
 
  protected def bind(in: Seq[SortOrder], inputSchema: Seq[Attribute]): Seq[SortOrder] = 
    in.map(BindReferences.bindReference(_, inputSchema)) 
 
  protected def create(ordering: Seq[SortOrder]): Ordering[Row] = { 
    val a = newTermName("a") 
    val b = newTermName("b") 
    val comparisons = ordering.zipWithIndex.map { case (order, i) => 
      val evalA = expressionEvaluator(order.child) 
      val evalB = expressionEvaluator(order.child) 
 
      val compare = order.child.dataType match { 
        case BinaryType => 
          q""" 
          val x = ${if (order.direction == Ascending) evalA.primitiveTerm else evalB.primitiveTerm}//直接指定类型,不涉及虚函数调用 
          val y = ${if (order.direction != Ascending) evalB.primitiveTerm else evalA.primitiveTerm}//直接指定类型,不涉及虚函数调用 
          var i = 0 
          while (i < x.length && i < y.length) { 
            val res = x(i).compareTo(y(i)) 
            if (res != 0) return res 
            i = i+1 
          } 
          return x.length - y.length 
          """ 
        case _: NumericType => 
          q""" 
          val comp = ${evalA.primitiveTerm} - ${evalB.primitiveTerm}//直接指定类型 
          if(comp != 0) { 
            return ${if (order.direction == Ascending) q"comp.toInt" else q"-comp.toInt"} 
          } 
          """ 
        case StringType => 
          if (order.direction == Ascending) { 
            q"""return ${evalA.primitiveTerm}.compare(${evalB.primitiveTerm})"""//直接指定类型,不涉及虚函数调用 
          } else { 
            q"""return ${evalB.primitiveTerm}.compare(${evalA.primitiveTerm})""" 
          } 
      } 
 
      q""" 
        i = $a 
        ..${evalA.code} 
        i = $b 
        ..${evalB.code} 
        if (${evalA.nullTerm} && ${evalB.nullTerm}) { 
          // Nothing 
        } else if (${evalA.nullTerm}) { 
          return ${if (order.direction == Ascending) q"-1" else q"1"} 
        } else if (${evalB.nullTerm}) { 
          return ${if (order.direction == Ascending) q"1" else q"-1"} 
        } else { 
          $compare 
        } 
      """ 
    } 
 
    val q"class $orderingName extends $orderingType { ..$body }" = reify { 
      class SpecificOrdering extends Ordering[Row] { 
        val o = ordering 
      } 
    }.tree.children.head 
 
    val code = q""" 
      class $orderingName extends $orderingType { 
        ..$body 
        def compare(a: $rowType, b: $rowType): Int = { 
          var i: $rowType = null // Holds current row being evaluated. 
          ..$comparisons 
          return 0 
        } 
      } 
      new $orderingName() 
      """ 
    logDebug(s"Generated Ordering: $code") 
    toolBox.eval(code).asInstanceOf[Ordering[Row]] 
  } 
}

可见动态字节码技术中不涉及虚函数的调用,其本质就是scala的反射机制。关于虚调用为什么耗时的原因如下:

以具体的SQL语句 select a+b fromtable 为例进行说明,下面是它的解析过程:  
    1.调用虚函数Add.eval(),需确认Add两边数据类型  
    2.调用虚函数a.eval(),需要确认a的数据类型  
    3.确认a的数据类型是int,装箱  
    4.调用虚函数b.eval(),需确认b的数据类型  
    5.确认b的数据类型是int,装箱  
    6.调用int类型的add  
    7.返回装箱后的计算结果  
    从上面的步骤可以看出,一条SQL语句的解析需要进行多次虚函数的调用。我们知道,虚函数的调用会极大的降低效率。那么,虚函数的调用为什么会影响效率呢?  
    有人答案是:虚函数调用会进行一次间接寻址过程。事实上这一步间接寻址真的会显著降低运行效率?显然不是。  
    流水线的打断才是真正降低效率的原因。  
    我们知道,虚函数的调用时是运行时多态,意思就是在编译期你是无法知道虚函数的具体调用。设想一下,如果说不是虚函数,那么在编译时期,其相对地址是确定的,编译器可以直接生成jmp/invoke指令; 如果是虚函数,多出来的一次查找vtable所带来的开销,倒是次要的,关键在于,这个函数地址是动态的,譬如 取到的地址在eax里,则在call eax之后的那些已经被预取进入流水线的所有指令都将失效。流水线越长,一次分支预测失败的代价也就越大,如下所示:  
   pf->test  
    001E146D mov eax,dword ptr[pf]  
    011E1470 mov edx,dword,ptr[eax]  
    011E1472 mov esi,esp  
    011E1474 mov ecx,dword ptr[pf]  
    011E1477 mov eax,dword ptr[edx]  
    011E1479 eax <-----------------------分支预测失败  
    011E147B cmp esi esp  
    011E147D @ILT+355(__RTC_CheckEsp)(11E1168h) 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9306.html

(0)
上一篇 2021年7月19日 09:24
下一篇 2021年7月19日 09:24

相关推荐

发表回复

登录后才能评论