深入分析Spark任务调度原理,Java后端同学入门Spark编程系列[亲测有效]

Java (146) 2023-03-24 16:06
深入分析Spark任务调度原理,Java后端同学入门Spark编程系列[亲测有效]_https://bianchenghao6.com/blog_Java_第1张

对于Spark来说,任务的调度和执行可以说是其运行的核心流程,所以本文从源码的角度对这个过程进行详细的分析。

【一】概述

【Spark任务执行流程】

  1. 用户创建SparkContext,SparkContext连接到Cluster Manager实例
  2. Cluster Manager根据配置的CPU、内存信息,分配资源,启动Executor
  3. Driver将程序划分为成各个阶段,创建Task,每个阶段包含一组相同的Task(他们分别作用于不同的分区)。
  4. Driver向Executor发送Task
  5. Executor收到Task,下载Task的运行时依赖,准备执行环境
  6. Executor执行Task,并把Task的运行状态汇报给Driver
  7. Driver根据Task的运行状态处理更新
  • Shuffle Map Task:数据Shuffle操作(“重新洗牌”)
  • Result Task:生成数据结果
  1. Driver不断调用Task,发送Task给Executor,当所有Task正确执行或超过限制次数后停止。

【二】原理分析

【总述】

通过上文的基本介绍我们知道,Spark的编程模型基础是RDD,数据被封装为RDD类型,进行后续一系列转换,那么Spark是如何将RDD进行处理的呢?

在Spark内部,对RDD又进行了整合,首先,以行动算子为划分粒度,划分为Job(作业),各个Job根据依赖关系(宽窄依赖)划分为Stage(调度阶段),每个Stage中存在多个Task,组成一个TaskSet(任务集),各个Task可以并发执行,执行逻辑相同,但处理的数据不同,处理的是不同partition(分区)下的数据。上述这些划分工作都是在Driver上进行,Task是被分发到Executor上的任务,是Spark实际执行的基本单元。

【涉及的主要类】

上述过程具体到代码实现中,主要是下面几个类,在接下来的源码分析中,也几乎都是在这几个类下进行跳转:

  1. org.apache.spark.scheduler.DAGScheduler

负责分析用户提交的应用:① 根据任务的依赖关系建立DAG;② 将DAG划分到不同Stage

  1. org.apache.spark.scheduler.TaskScheduler

负责为创建它的SparkContext调度任务:① 从DAGScheduler接收不同stage的任务;② 向集群提交这些任务; ③ 为执行慢的任务动备份任务

  1. org.apache.spark.scheduler.SchedulerBackend

负责分配当前可用资源:① 向目前等待分配Executor的Task分配Executor;② 在已分配的Executor上启动Task,完成计算的调度

【主要流程分析】

【步骤一:创建SparkContext、TaskScheduler、DAGScheduler】

TaskScheduler、DAGScheduler都是在创建SparkContext时创建(

TaskScheduler的创建方式:调用createTaskScheduler方法:

org.apache.spark.SparkContext#createTaskScheduler

DAGScheduler的创建方式:调用构造函数:

 _dagScheduler = new DAGScheduler(this)

【步骤二:提交Job】

提交一个Job遵循着如下的调用步骤:

  • org.apache.spark.SparkContext#runJob
  • org.apache.spark.scheduler.DAGScheduler#runJob
  • org.apache.spark.scheduler.DAGScheduler#submitJob

在这里有一点需要说明:

runJob调用submitJob会发生阻塞,直到完成或者返回失败

submitJob过程如下:

def submitJob[T, U](
 rdd: RDD[T],
 func: (TaskContext, Iterator[T]) => U,
 partitions: Seq[Int],
 callSite: CallSite,
 resultHandler: (Int, U) => Unit,
 properties: Properties): JobWaiter[U] = {
 
 // 忽略一些参数校验等
 ......
 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
 eventProcessLoop.post(JobSubmitted(
 jobId, rdd, func2, partitions.toArray, callSite, waiter,
 SerializationUtils.clone(properties)))
 waiter
 }

创建JobWaiter对象,通过内部消息处理,把这个JobWaiter对象发给DAGSchedulerEventProcessLoop的onReceive方法

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#onReceive

onReceive方法中调用了doOnReceive方法如下:

 private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
 ......
 }

如果JobSubmitted模式可以匹配成功,那么就会调用org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted;

调用了handleJobSubmitted之后,就是划分stage了,我们将它放在下个部分分析。

【步骤三:划分Stage】

划分Stage的主要逻辑在org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

 private[scheduler] def handleJobSubmitted(jobId: Int,
 finalRDD: RDD[_],
 func: (TaskContext, Iterator[_]) => _,
 partitions: Array[Int],
 callSite: CallSite,
 listener: JobListener,
 properties: Properties) {
 var finalStage: ResultStage = null
 try {
 // 获取最后一个Stage
 finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
 } catch {
 ......
 }
 // 后面部分代码省略,是下一部分研究的~~
 ......
 // 提交调度,第四部分内容,暂留伏笔
 submitStage(finalStage)
 }

该方法首先根据最后一个RDD生成ResultStage,其中newResultStage()中调用org.apache.spark.scheduler.DAGScheduler#getParentStagesAndId,进而调用org.apache.spark.scheduler.DAGScheduler#getParentStages获取ParentStage

getParentStages()方法就是stage划分阶段重要的逻辑所在了,划分依据就是是否存在shuffle操作。

代码执行的主要逻辑就是每遇到一个ShuffleDependency,生成一个ParentStage

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
 // 要返回的 ParentStage
 val parents = new HashSet[Stage]
 // 已被访问过的RDD
 val visited = new HashSet[RDD[_]]
 // 需要被处理的RDD,非ShuffleDependency的RDD
 val waitingForVisit = new Stack[RDD[_]]
 
 waitingForVisit.push(rdd)
 while (waitingForVisit.nonEmpty) {
 visit(waitingForVisit.pop())
 }
 parents.toList
 }

其中visit()方法就是遍历处理的方法,先标记访问过的RDD,然后判断当前RDD所依赖的RDD的操作类型,如果是ShuffleDependency,就调用getShuffleMapStage(),划分ShuffleMap调度阶段(向前遍历划分),如果非ShuffleDependency,入waitingForVisit栈中。

 def visit(r: RDD[_]) {
 if (!visited(r)) {
 visited += r
 for (dep <- r.dependencies) {
 dep match {
 case shufDep: ShuffleDependency[_, _, _] =>
 parents += getShuffleMapStage(shufDep, firstJobId)
 case _ =>
 waitingForVisit.push(dep.rdd)
 }
 }
 }
 }

划分调度阶段的方法:org.apache.spark.scheduler.DAGScheduler#getShuffleMapStage

主要逻辑是首先寻找该分支上所有宽依赖RDD,生成ShuffleMapStage

private def getShuffleMapStage(
 shuffleDep: ShuffleDependency[_, _, _],
 firstJobId: Int): ShuffleMapStage = {
 shuffleToMapStage.get(shuffleDep.shuffleId) match {
 case Some(stage) => stage
 case None =>
 // 寻找该分支上其他的宽依赖
 getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
 if (!shuffleToMapStage.contains(dep.shuffleId)) {
 shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
 }
 }
 // 生成 ShuffleStage
 val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
 shuffleToMapStage(shuffleDep.shuffleId) = stage
 stage
 }
 }

【步骤四:提交Stage】

是否还记得在步骤二中提到的org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted方法中,留下的伏笔submitStage(finalStage)。这里为了阅读方便,删掉了一些原有代码,特此标明~~

概括来说,调用getMissingParentStages()获取父stage,如果已经不存在父stage了,就调用 submitMissingTasks(stage, jobId.get),否则继续递归调用,直到不存在父stage为止。

 private def submitStage(stage: Stage) {
 val jobId = activeJobForStage(stage)
 // 获取finalStage的父stage
 val missing = getMissingParentStages(stage).sortBy(_.id)
 // 不存在父stage
 if (missing.isEmpty) {
 submitMissingTasks(stage, jobId.get)
 }
 else {
 for (parent <- missing) {
 // 递归调用 submitStage
 submitStage(parent)
 }
 waitingStages += stage
 }
 }

这里有个小坑,需要说明:

在提交Stage前,要判断所依赖的父调度阶段(父Stage)是否运行成功,成功才提交该Stage,否则重新提交父Stage。

判断逻辑在ShuffleMapTask完成时进行,是通过下面的方式完成的:

在Executor.run()任务执行完成发送消息,通知DAGScheduler等调度器的更新状态,handleTaskCompletion()对事件进行处理。

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
 ......
 
 case completion: CompletionEvent =>
 dagScheduler.handleTaskCompletion(completion)
 
 ......
 }

这时DAGScheduler会检查调度阶段的所有任务是否已经完成,如果存在执行失败的Stage,则重新提交。具体判断逻辑在下面代码的第21~32行。

private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
 val task = event.task
 val taskId = event.taskInfo.id
 val stageId = task.stageId
 val taskType = Utils.getFormattedClassName(task)
 ......
 val stage = stageIdToStage(task.stageId)
 event.reason match {
 case Success =>
 stage.pendingPartitions -= task.partitionId
 task match {
 case rt: ResultTask[_, _] =>
 
 val resultStage = stage.asInstanceOf[ResultStage]
 resultStage.activeJob match {
 case Some(job) =>
 case None =>
 
 }
 case smt: ShuffleMapTask =>
 val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
 updateAccumulators(event)
 val status = event.result.asInstanceOf[MapStatus]
 val execId = status.location.executorId
 logDebug("ShuffleMapTask finished on " + execId)
 if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
 logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
 } else {
 shuffleStage.addOutputLoc(smt.partitionId, status)
 }
 }
 case Resubmitted =>
 logInfo("Resubmitted " + task + ", so marking it as still running")
 stage.pendingPartitions += task.partitionId
 case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
 val failedStage = stageIdToStage(task.stageId)
 val mapStage = shuffleToMapStage(shuffleId)
 
 submitWaitingStages()
 }

【步骤五:提交Task】

根据调度阶段分区拆分对应个数的Task,组成任务集交给TaskScheduler

主要逻辑:

对于ShuffleMapStage,生成ShuffleMapTask

对于ResultStage,生成ResultTask

每个TaskSet包含了对应Stage中的所有Task,划分依据是数据Partition。

private def submitMissingTasks(stage: Stage, jobId: Int) {

......

val tasks: Seq[Task[_]] = try {

stage match {

case stage: ShuffleMapStage =>

partitionsToCompute.map { id =>

val locs = taskIdToLocations(id)

val part = stage.rdd.partitions(id)

new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)

}

case stage: ResultStage =>

val job = stage.activeJob.get

partitionsToCompute.map { id =>

val p: Int = stage.partitions(id)

val part = stage.rdd.partitions(p)

val locs = taskIdToLocations(id)

new ResultTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)

}

}

} catch {

case NonFatal(e) =>

abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))

runningStages -= stage

return

}

// 提交

if (tasks.size > 0) {

stage.pendingPartitions ++= tasks.map(_.partitionId)

taskScheduler.submitTasks(new TaskSet(

tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

} else {

markStageAsFinished(stage, None)

}

}

提交任务步骤如下:

  1. 创建任务集管理器
  2. 将该任务集管理器加入到系统资源调度池,系统统一调配,支持FIFO和FAIR
  3. 执行调度器后台进程SparkDeploySchedulerBackend的reviveOffers()方法,分配资源
  4. 向DriverEndPoint终端点发消息,调用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers
  • 获取集群中可用的Executor
  • 发到TaskSchedulerImpl分配资源
  • 提交到launchTasks
 override def submitTasks(taskSet: TaskSet) {
 val tasks = taskSet.tasks
 
 this.synchronized {
 // 创建任务集管理器
 val manager = createTaskSetManager(taskSet, maxTaskFailures)
 val stage = taskSet.stageId
 val stageTaskSets =
 taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
 stageTaskSets(taskSet.stageAttemptId) = manager
 val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
 ts.taskSet != taskSet && !ts.isZombie
 }
 
 
 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
 ......
 backend.reviveOffers()
 }
 private def makeOffers() {
 
 val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
 val workOffers = activeExecutors.map { case (id, executorData) =>
 new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
 }.toSeq
 launchTasks(scheduler.resourceOffers(workOffers))
 }

【步骤六:执行Task】

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend接收到LaunchTask的消息,

会调用org.apache.spark.executor.Executor#launchTask

会初始化一个TaskRunner,然后放到线程池中执行。

def launchTask(
 context: ExecutorBackend,
 taskId: Long,
 attemptNumber: Int,
 taskName: String,
 serializedTask: ByteBuffer): Unit = {
 val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
 serializedTask)
 runningTasks.put(taskId, tr)
 threadPool.execute(tr)
 }

org.apache.spark.executor.Executor.TaskRunner#run省略了一些代码,包括反序列化Task以及Task所依赖的jar文件,

 override def run(): Unit = {
 ......
 var taskStart = System.currentTimeMillis()
 val value = try {
 val res = task.run(
 taskAttemptId = taskId,
 attemptNumber = attemptNumber,
 metricsSystem = env.metricsSystem)
 threwException = false
 res
 } finally {
 ......
 }
 val taskFinish = System.currentTimeMillis()
 
 }

然后会调用org.apache.spark.scheduler.Task#runTask方法,由于Task是一个抽象类,有两个实现类

org.apache.spark.scheduler.ShuffleMapTask

org.apache.spark.scheduler.ResultTask

对于ResultTask,计算结果会直接返回

override def runTask(context: TaskContext): U = {
 // Deserialize the RDD and the func using the broadcast variables.
 val deserializeStartTime = System.currentTimeMillis()
 val ser = SparkEnv.get.closureSerializer.newInstance()
 val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
 ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
 _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
 func(context, rdd.iterator(partition, context))
 }

对于ShuffleMapTask,计算结果写入BlockManager中,返回一个MapStatus对象,这个MapStatus对象存储的是结果存入BlockManager的相关信息,这样做是为了方便下一阶段任务获得输入数据。

override def runTask(context: TaskContext): MapStatus = {
 // Deserialize the RDD using the broadcast variable.
 val deserializeStartTime = System.currentTimeMillis()
 val ser = SparkEnv.get.closureSerializer.newInstance()
 val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
 ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
 _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
 var writer: ShuffleWriter[Any, Any] = null
 try {
 val manager = SparkEnv.get.shuffleManager
 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
 writer.stop(success = true).get
 } catch {
 ......
 }
 }

【三】总结

关于Spark任务调度部分的源码,大体流程就是如此了,对于Spark集群之间只如何通信的,以及Spark对于数据是如何存储的,后续会继续分析。

喜欢的就给小编点一下关注吧,想继续了解学习的朋友,可以私信我“学习”,领取最新免费视频学习资料。

发表回复