spark(四):shuffle

shuflle write

spark(四):shuffle

  1. 上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。
  2. 在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。

shuflle read

  1. 在什么时候 fetch 数据?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。
  2. 边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。使用可以 aggregate 的数据结构,比如 HashMap,每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 <Key, Value> record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value)
  3. fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。
  4. 怎么获得要 fetch 的数据的存放位置?reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给MapOutputTrackerMaster。

Shuffle read 中的 HashMap

ashMap 是 Spark shuffle read 过程中频繁使用的、用于 aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另一种是内存+磁盘的 ExternalAppendOnlyMap。
spark(四):shuffle

  1. 类似 HashMap,但没有remove(key)方法。其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,白色部分存储 Value。
  2. 如果 Array 的利用率达到 70%,那么就扩张一倍,并对所有 key 进行 rehash 后,重新排列每个 key 的位置。
    spark(四):shuffle
  3. ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 过程与原始的 AppendOnlyMap 一模一样。
  4. 如果 AppendOnlyMap 快被装满时检查一下内存剩余空间是否可以够扩展,够就直接在内存中扩展,不够就 sort 一下 AppendOnlyMap,将其内部所有 records 都 spill 到磁盘上。
  5. 每次 spill 完在磁盘上生成一个 spilledMap 文件,然后重新 new 出来一个 AppendOnlyMap。
  6. 最后一个 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 来的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已经被处理完,因为每次 insert 的时候,新来的 record 只与 AppendOnlyMap 中的 records 进行 aggregate,并不是与所有的 records 进行 aggregate(一些 records 已经被 spill 到磁盘上了)。因此当需要 aggregate 的最终结果时,需要对 AppendOnlyMap 和所有的 spilledMaps 进行全局 merge-aggregate。
  7. 全局 merge-aggregate 的流程:先将 AppendOnlyMap 中的 records 进行 sort,形成 sortedMap。
  8. 然后分别从 sortedMap 和各个 spilledMap 读出一部分数据(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key)
  9. mergeHeap 顾名思义就是使用堆排序不断提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并将其一个个放入 mergeBuffers 中,放入的时候与已经存在于 mergeBuffers 中的 StreamBuffer 进行 merge-combine

在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/190477.html

(0)
上一篇 2021年11月13日
下一篇 2021年11月13日

相关推荐

发表回复

登录后才能评论