博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交2
阅读量:4215 次
发布时间:2019-05-26

本文共 3606 字,大约阅读时间需要 12 分钟。

private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      callSite: CallSite,      listener: JobListener,      properties: Properties) {#finalstage表示最后一个stage    var finalStage: ResultStage = null    try {#新建final stage      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)    } catch {      case e: Exception =>        logWarning("Creating new stage failed due to exception - job: " + jobId, e)        listener.jobFailed(e)        return    }    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)    clearCacheLocs()    val jobSubmissionTime = clock.getTimeMillis()    jobIdToActiveJob(jobId) = job    activeJobs += job    finalStage.setActiveJob(job)    val stageIds = jobIdToStageIds(jobId).toArray    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))#listenerthread后台线程处理事件    listenerBus.post(      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))#提交finalstage    submitStage(finalStage)  }我们看看如何乘车finalstage  private def createResultStage(      rdd: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      jobId: Int,      callSite: CallSite): ResultStage = {    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)#得到parent stage    val parents = getOrCreateParentStages(rdd, jobId)    val id = nextStageId.getAndIncrement()#创建resultstage    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)    stageIdToStage(id) = stage    updateJobIdStageIdMaps(jobId, stage)    stage  }getOrCreateParentStages的实现如下,可以看到是根据shuffledependency来生成一个新的stage  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {    getShuffleDependencies(rdd).map { shuffleDep =>      getOrCreateShuffleMapStage(shuffleDep, firstJobId)    }.toList  }继续看getOrCreateShuffleMapStage private def getOrCreateShuffleMapStage(      shuffleDep: ShuffleDependency[_, _, _],      firstJobId: Int): ShuffleMapStage = {    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {#stage 如果存在,则返回stage      case Some(stage) =>        stage      case None =>#如果不存在,则调用createShuffleMapStage来新建        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {            createShuffleMapStage(dep, firstJobId)          }        }        // Finally, create a stage for the given shuffle dependency.        createShuffleMapStage(shuffleDep, firstJobId)    }  }  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {    val rdd = shuffleDep.rdd    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)    val numTasks = rdd.partitions.length    val parents = getOrCreateParentStages(rdd, jobId)    val id = nextStageId.getAndIncrement()#可以看到这里调用函数新建stage    val stage = new ShuffleMapStage(      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)    stageIdToStage(id) = stage    shuffleIdToMapStage(shuffleDep.shuffleId) = stage    updateJobIdStageIdMaps(jobId, stage)    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {      // Kind of ugly: need to register RDDs with the cache and map output tracker here      // since we can't do it in the RDD constructor because # of partitions is unknown      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)    }    stage  }

 

转载地址:http://ksnmi.baihongyu.com/

你可能感兴趣的文章
FQDN
查看>>
时序数据库
查看>>
jmxtrans+influxdb+granafa监控hbase
查看>>
使用jmxtrans监控Spark JVM信息到grafana显示
查看>>
HBase - ROOT 和 META 表结构 (region定位原理)
查看>>
HBase API 和 基本操作
查看>>
Hbase的存储模型
查看>>
InfluxDB influxdbc.conf配置文件详解
查看>>
通过BulkLoad的方式快速导入海量数据
查看>>
Mysql根据内容查找在哪个表(Go版本)
查看>>
玩转Anaconda
查看>>
kali linux中文版安装
查看>>
安卓逆向之环境搭建
查看>>
修改包名实现app分身
查看>>
NDK静态注册之调用C层并返回字符串
查看>>
AndroidStudio踩坑记
查看>>
go-colly官方文档翻译(持续翻译中)
查看>>
adb禁用手机更新
查看>>
partition 函数使用练习
查看>>
set容器的并、交、差
查看>>