spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析

小编给大家分享一下spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

分析

直接进入主题FileFormatWriter.write,这个是spark写入文件的入口:

 def write(
      sparkSession: SparkSession,
      plan: SparkPlan,
      fileFormat: FileFormat,
      committer: FileCommitProtocol,
      outputSpec: OutputSpec,
      hadoopConf: Configuration,
      partitionColumns: Seq[Attribute],
      bucketSpec: Option[BucketSpec],
      statsTrackers: Seq[WriteJobStatsTracker],
      options: Map[String, String])
    : Set[String] = {

因为delta是基于parquet实现的, 所以我们fileformat选择分析ParquetFileFormat, 而对于FileCommitProtocol,我们分析SQLHadoopMapReduceCommitProtocol

  1. 该write方法实现比较长,我们讲重点 :

committer.setupJob(job)

这个做一些job提交前的准备工作,比如设置jobId,taskId,设置OutputCommitter,OutputCommitter是用来。。

override def setupJob(jobContext: JobContext): Unit = {
    // Setup IDs
    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
    val taskId = new TaskID(jobId, TaskType.MAP, 0)
    val taskAttemptId = new TaskAttemptID(taskId, 0)

    // Set up the configuration object
    jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
    jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
    jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
    jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
    jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

    val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
    committer = setupCommitter(taskAttemptContext)
    committer.setupJob(jobContext)
  }

ParquetFileFormat对应的OutputCommitter是ParquetOutputCommitter,我们看一下方法:format.getOutputCommitter(context),ParquetOutputCommitter为:

@Override
  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
      throws IOException {
    if (committer == null) {
      Path output = getOutputPath(context);
      committer = new ParquetOutputCommitter(output, context);
    }
    return committer;
  }

而最终调用了父类的构造方法:

public FileOutputCommitter(Path outputPath, 
                             TaskAttemptContext context) throws IOException {
    this(outputPath, (JobContext)context);
    if (outputPath != null) {
      workPath = getTaskAttemptPath(context, outputPath);
    }
  }

注意这里的workPath(全局变量)赋值为$outputPath/_temporary,在以下newTaskTempFile方法中会用到

接着进行setupJob操作:

public void setupJob(JobContext context) throws IOException {
    if (hasOutputPath()) {
      Path jobAttemptPath = getJobAttemptPath(context);
      FileSystem fs = jobAttemptPath.getFileSystem(
          context.getConfiguration());
      if (!fs.mkdirs(jobAttemptPath)) {
        LOG.error("Mkdirs failed to create " + jobAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in setupJob()");

而getJobAttemptPath中引用到$path/_temporary目录(其中path是文件输出目录),且建立该目录

接下来是进行任务的提交:

sparkSession.sparkContext.runJob(
        rddWithNonEmptyPartitions,
        (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
          executeTask(
            description = description,
            jobIdInstant = jobIdInstant,
            sparkStageId = taskContext.stageId(),
            sparkPartitionId = taskContext.partitionId(),
            sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
            committer,
            iterator = iter)
        },
        rddWithNonEmptyPartitions.partitions.indices,
        (index, res: WriteTaskResult) => {
          committer.onTaskCommit(res.commitMsg)
          ret(index) = res
        })

其中重点看看executeTask方法:

committer.setupTask(taskAttemptContext)

    val dataWriter =
      if (sparkPartitionId != 0 && !iterator.hasNext) {
        // In case of empty job, leave first partition to save meta for file format like parquet.
        new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
      } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
        new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
      } else {
        new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
      }
  • 对于SQLHadoopMapReduceCommitProtocol:setupTask实现如下:

committer = setupCommitter(taskContext)
    committer.setupTask(taskContext)
    addedAbsPathFiles = mutable.Map[String, String]()
    partitionPaths = mutable.Set[String]()

而committer.setupTask(taskContext),对应到ParquetOutputCommitter为空实现,

  • 之后看数据写入的最终执行者dataWriter, 如果是没有分区,则是SingleDirectoryDataWriter:

class SingleDirectoryDataWriter(
    description: WriteJobDescription,
    taskAttemptContext: TaskAttemptContext,
    committer: FileCommitProtocol)
  extends FileFormatDataWriter(description, taskAttemptContext, committer) {
  private var fileCounter: Int = _
  private var recordsInFile: Long = _
  // Initialize currentWriter and statsTrackers
  newOutputWriter()

  private def newOutputWriter(): Unit = {
    recordsInFile = 0
    releaseResources()

    val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
    val currentPath = committer.newTaskTempFile(
      taskAttemptContext,
      None,
      f"-c$fileCounter%03d" + ext)

    currentWriter = description.outputWriterFactory.newInstance(
      path = currentPath,
      dataSchema = description.dataColumns.toStructType,
      context = taskAttemptContext)

    statsTrackers.foreach(_.newFile(currentPath))
  }

  override def write(record: InternalRow): Unit = {
    if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
      fileCounter += 1
      assert(fileCounter < MAX_FILE_COUNTER,
        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

      newOutputWriter()
    }

    currentWriter.write(record)
    statsTrackers.foreach(_.newRow(record))
    recordsInFile += 1
  }
}

这里写文件是哪里呢?

    val currentPath = committer.newTaskTempFile(
      taskAttemptContext,
      None,
      f"-c$fileCounter%03d" + ext)

对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为:

override def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
    val filename = getFilename(taskContext, ext)

    val stagingDir: Path = committer match {
      case _ if dynamicPartitionOverwrite =>
        assert(dir.isDefined,
          "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
        partitionPaths += dir.get
        this.stagingDir
      // For FileOutputCommitter it has its own staging path called "work path".
      case f: FileOutputCommitter =>
        new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
      case _ => new Path(path)
    }

    dir.map { d =>
      new Path(new Path(stagingDir, d), filename).toString
    }.getOrElse {
      new Path(stagingDir, filename).toString
    }
  }

如果开启partitionOverwriteMode,则设置为new Path(path, ".spark-staging-" + jobId) 如果没有开启partitionOverwriteMode,且FileOutputCommitter的子类,如果workpath存在则设置为workPath,否则为path,注意我们之前FileOutputCommitter构造方法中已经设置了workPath,所以最终的输出目录为$path/_temporary

所以job向该目录写入数据。 DynamicPartitionDataWriter的分析,读者可以进行类似的分析,只不过目录则加了分区信息,只写入自己的分区目录中

  • 如果写入成功的话执行如下:

try {
      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
        // Execute the task to write rows out and commit the task.
        while (iterator.hasNext) {
          dataWriter.write(iterator.next())
        }
        dataWriter.commit()
      })(catchBlock = {
        // If there is an error, abort the task
        dataWriter.abort()
        logError(s"Job $jobId aborted.")
      }, finallyBlock = {
        dataWriter.close()
      })

dataWriter.commit()如下:

override def commit(): WriteTaskResult = {
    releaseResources()
    val summary = ExecutedWriteSummary(
      updatedPartitions = updatedPartitions.toSet,
      stats = statsTrackers.map(_.getFinalStats()))
    WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
  }

首先会释放资源,也就是关闭writer 之后调用FileCommitProtocol.commitTask();

 override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
    val attemptId = taskContext.getTaskAttemptID
    logTrace(s"Commit task ${attemptId}")
    SparkHadoopMapRedUtil.commitTask(
      committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
    new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
  }

而SparkHadoopMapRedUtil.commitTask最终调用FileOutputCommitter的commitTask方法把$PATH/_temporary下文件mv到$PATH下
之后返回统计的数值,数据格式如下:

case class BasicWriteTaskStats(
    numPartitions: Int,
    numFiles: Int,
    numBytes: Long,
    numRows: Long)
  extends WriteTaskStats
  1. 之后会committer.onTaskCommit(res.commitMsg)操作,
    对于SQLHadoopMapReduceCommitProtocol的实现为: logDebug(s"onTaskCommit($taskCommit)")

  2. 下一步committer.commitJob(job, commitMsgs):

...
 committer.commitJob(jobContext)
 ...
  for ((src, dst) <- filesToMove) {
        fs.rename(new Path(src), new Path(dst))
      }
 ...
 fs.delete(stagingDir, true)

这里主要涉及清理job,以及把task所产生的文件(writer输出的临时文件)移动到path目录下,且清理临时目录,至此文件真正的写入到了path目录下

  1. 指标记录

private[datasources] def processStats(
      statsTrackers: Seq[WriteJobStatsTracker],
      statsPerTask: Seq[Seq[WriteTaskStats]])
  : Unit = {

    val numStatsTrackers = statsTrackers.length
    assert(statsPerTask.forall(_.length == numStatsTrackers),
      s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
         |There are $numStatsTrackers statsTrackers, but some task returned
         |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead.
       """.stripMargin)

    val statsPerTracker = if (statsPerTask.nonEmpty) {
      statsPerTask.transpose
    } else {
      statsTrackers.map(_ => Seq.empty)
    }

    statsTrackers.zip(statsPerTracker).foreach {
      case (statsTracker, stats) => statsTracker.processStats(stats)
    }
  }

主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播, 如下代码:

class BasicWriteJobStatsTracker(
    serializableHadoopConf: SerializableConfiguration,
    @transient val metrics: Map[String, SQLMetric])
  extends WriteJobStatsTracker {
   ...

  override def processStats(stats: Seq[WriteTaskStats]): Unit = {
    ...

    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)

    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
  }
}

至此整个spark parquet write文件的数据流程我们就已经全部过了一遍了,部分细节没有展示。 最终的数据流如下:

实例化Job对象
      |
      v
FileCommitProtocol.setupJob -> OutputCommitter.setupJob  进行作业运行前的准备,如建立临时目录_temporary等
      |
      v
executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前为空实现 
                |
                v
         FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立写任务的临时目录
                |
                v
         dataWriter.write() 
                |
                v
         dataWriter.commit() 释放资源以及返回写入文件的指标信息 -> HadoopMapReduceCommitProtocol.commitTask
                          |
                          v
                 SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目录,以及做outputCommitCoordination
                           |
                           v
                 返回需要额外临时目录的信息  
      |
      v
FileCommitProtocol.onTaskCommit
      |
      v
FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目录且把写额外临时目录下的文件mv到最终path目录下
      |
      v
processStats,处理写入的文件指标

那对应到delta中,spark写入delta数据是怎么写入的呢?其实流程和以上的流程一模一样,唯一不同的是FileCommitProtocol类的实现,直接到TransactionalWrite.writeFiles:

def writeFiles(
      data: Dataset[_],
      writeOptions: Option[DeltaOptions],
      isOptimize: Boolean): Seq[AddFile] = {
    hasWritten = true

    ...
    val committer = getCommitter(outputPath)

    ...
      FileFormatWriter.write(
        sparkSession = spark,
        plan = physicalPlan,
        fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats.
        committer = committer,
        outputSpec = outputSpec,
        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
        partitionColumns = partitioningColumns,
        bucketSpec = None,
        statsTrackers = statsTrackers,
        options = Map.empty)
    }

    committer.addedStatuses
  }

而这里的commiter为DelayedCommitProtocol,如下:

    new DelayedCommitProtocol("delta", outputPath.toString, None)

我们来看一下DelayedCommitProtocol方法:

override def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
    val filename = getFileName(taskContext, ext)
    val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])
    val relativePath = randomPrefixLength.map { prefixLength =>
      getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
    }.orElse {
      dir // or else write into the partition directory if it is partitioned
    }.map { subDir =>
      new Path(subDir, filename)
    }.getOrElse(new Path(filename)) // or directly write out to the output path

    addedFiles.append((partitionValues, relativePath.toUri.toString))
    new Path(path, relativePath).toString
  }

  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
    if (addedFiles.nonEmpty) {
      val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration)
      val statuses: Seq[AddFile] = addedFiles.map { f =>
        val filePath = new Path(path, new Path(new URI(f._2)))
        val stat = fs.getFileStatus(filePath)
        AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true)
      }

      new TaskCommitMessage(statuses)
    } else {
      new TaskCommitMessage(Nil)
    }
  }

  override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
    val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray
    addedStatuses ++= fileStatuses
  }
  • 其中newTaskTempFile生成的文件中多了一个UUID.randomUUID.toString,这能减少文件的冲突

  • newTaskTempFile目前直接是返回了输出目录,而不是_temporary目录

  • commitTask只是记录增加的文件

  • commitJob并没有真正的提交job,只是把AddFile保存到了内存中

后续我们会分析delta怎么处理AddFile,从而做到事务性

注意task输出的文件目录为:
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

以上是“spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/223005.html

(0)
上一篇 2022年1月6日
下一篇 2022年1月6日

相关推荐

发表回复

登录后才能评论