HashShuffleManager
HashShuffleManager在spark早期版本中为默认shuffle管理器(spark1.2以前)。单此版本存在明显的弊端,此shuffleManager在作业运行阶段会产生大量的文件,任务在此环节会产生大量IO操作。接下来我们会一起探讨HashShuffleManager的具体执行逻辑。
未经优化的HashShuffleManager
此shuffle过程每个task都会产生numReducers个文件,如果一个executor有M个task,经过shuffle阶段后R个reducers。那么当前executor会产生M*R个文件。如果一个节点运行E个executor,此节点上将会产生E*M*R个文件。
优化后的HashShuffleManager
为解决HashShuffleManager产生大量文件的问题,spark对HashShuffleManager进行了优化,在executor中为每一个reducer创建一个文件,将在创建的文件放在一个文件连接池中,当每个task运行时,从这个pool中取得文件引用。这样就可以将每个executor shuffle阶段产生文件减少到R个(reducer数),示意图如下:
代码解析:
-
ShuffleMapTask中runTask(context: TaskContext)方法执行逻辑
var writer: ShuffleWriter[Any, Any] = null try { //获取HashShuffleManager对象,HashShuffleManager初始化时, //同时创建FileShuffleBlockResolver对象 val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { .... }
-
HashShuffleManager初始化时,同时创建FileShuffleBlockResolver对象
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { if (!conf.getBoolean("spark.shuffle.spill", true)) { logWarning( "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." + " Shuffle will continue to spill to disk when necessary.") } private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf) }
-
获取ShuffleWriter
private[spark] class HashShuffleWriter[K, V]( shuffleBlockResolver: FileShuffleBlockResolver, handle: BaseShuffleHandle[K, V, _], mapId: Int, context: TaskContext) extends ShuffleWriter[K, V] with Logging { //初始化ShuffleWriterGroup private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) }
调用shuffleBlockResolver.forMapTask初始化文件
def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { new ShuffleWriterGroup { shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) private val shuffleState = shuffleStates(shuffleId) val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() val writers: Array[DiskBlockObjectWriter] = { Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) val tmp = Utils.tempFileWith(blockFile) blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) } } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, so should be included in the shuffle write time. writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) override def releaseWriters(success: Boolean) { shuffleState.completedMapTasks.add(mapId) } } }
-调用 ShuffleWriter.write向文件中写内容
override def write(records: Iterator[Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId).write(elem._1, elem._2)
}
}
参考资料
https://0x0fff.com/spark-architecture-shuffle/
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9553.html