Shuffle过程解析
ShuffleMapTask
ShuffleMapTask时shuffle过程的入口,runTask方法实现了shuffle的主要逻辑,runTask依赖ShuffleManager和ShuffleWriter实现具体的操作,其中ShuffleManager和ShuffleWriter在目前spark版本中都有多种实现,可以通过spark.shuffle.manager参数配置。
spark中task分为两类
-
Task
- ResultTask
一般一个Job的作业一个task为ResultTask,对应执行spark中action操作 - ShuffleMapTask
RDD遇到shuffle依赖,RDD将被分为多个不同的buckets,此时会执行ShuffleMapTask
- ResultTask
-
ShuffleMapTask具体执行逻辑如下:
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { //获取shuffleManager对象,目前有两种实现方式:hash[HashShuffleManager]和sort[SortShuffleManager] //由spark.shuffle.manager参数空中,目前默认sort val manager = SparkEnv.get.shuffleManager //获取ShuffleWriter对象,目前有多种实现方式,ShuffleWriter主要负责写出shuffle文件 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
-
ShuffleManager初始化
ShuffleManager中SparkEnv中初始化,SparkEnv是全局变量,在所有现场中可以得到相同的SuffleManager对象,具体初始化路基如下:// Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
shuffle类图
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9552.html