Spark Core源码精读计划3 | SparkContext辅助属性及后初始化
本文适用于知识共享-署名-相同方式共享(CC-BY-SA)3.0协议
目录
前言
SparkContext中的辅助属性
creationSite
allowMultipleContexts
startTime & stopped
addedFiles/addedJars & _files/_jars
persistentRdds
executorEnvs & _executorMemory & _sparkUser
checkpointDir
localProperties
_eventLogDir & _eventLogCodec
_applicationId & _applicationAttemptId
_shutdownHookRef
nextShuffleId & nextRddId
SparkContext后初始化
setupAndStartListenerBus()方法
postEnvironmentUpdate()方法
postApplicationStart()方法
其他事项
总结
前言
在文章2中,我们了解了SparkContext的主体部分,即组件初始化。
除了它之外,SparkContext中还有一些与其内部机制紧密相关的属性,下文为了简单,就将它们称为“辅助属性”。
另外,在组件初始化完成后,还有一些善后工作,即后初始化(Post-init)。本文就来研究这两块内容。
SparkContext中的辅助属性
仿照文章2中的方式,仍然先将我们要关注的这些属性整理出来。
代码3.1 - SparkContext中的辅助属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private val creationSite: CallSite = Utils .getCallSite() private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts" , false ) val startTime = System .currentTimeMillis() private [spark] val stopped: AtomicBoolean = new AtomicBoolean (false ) private [spark] val addedFiles = new ConcurrentHashMap [String , Long ]().asScala private [spark] val addedJars = new ConcurrentHashMap [String , Long ]().asScala private [spark] val persistentRdds = { val map: ConcurrentMap [Int , RDD [_]] = new MapMaker ().weakValues().makeMap[Int , RDD [_]]() map.asScala } private [spark] val executorEnvs = HashMap [String , String ]() val sparkUser = Utils .getCurrentUserName() private [spark] var checkpointDir: Option [String ] = None protected [spark] val localProperties = new InheritableThreadLocal [Properties ] { override protected def childValue (parent: Properties ): Properties = { SerializationUtils .clone(parent) } override protected def initialValue (): Properties = new Properties () } private val nextShuffleId = new AtomicInteger (0 ) private val nextRddId = new AtomicInteger (0 ) private var _eventLogDir: Option [URI ] = None private var _eventLogCodec: Option [String ] = None private var _executorMemory: Int = _ private var _applicationId: String = _ private var _applicationAttemptId: Option [String ] = None private var _jars: Seq [String ] = _ private var _files: Seq [String ] = _ private var _shutdownHookRef: AnyRef = _
以下划线开头的字段如同代码2.2中一样,也有对应的Getter方法。为了节省篇幅,就不列出来了。下面按照它们初始化的顺序和相关性来介绍,必要时仍然会附上一些源码。
creationSite
creationSite指示SparkContext是在哪里创建的。
CallSite是个简单的数据结构,只有shortForm与longForm两个属性,用来描述代码的位置。Utils.getCallSite()方法遍历当前线程的线程栈,并找到最后一个(即最靠近栈顶的)Spark方法调用,与最先一个(即最靠近栈底的)用户方法调用,将它们的短形式和长形式包装在CallSite中返回。
有兴趣的看官可以自行去看这个方法的源代码,不难。
以代码0.1的WordCount为例,运行时打上断点,观察creationSite的内容如下图。
allowMultipleContexts
allowMultipleContexts指示是否允许一个JVM(即一个Application)内存在多个活动的SparkContext实例。它由spark.driver.allowMultipleContexts参数控制,默认为false,即只允许存在一个活动的SparkContext实例,如果有多个就会抛出异常。设为true的话,在有多个活动的SparkContext时只会输出警告。关于它在下一篇文章中还会涉及到,这里就不多说了。
startTime & stopped
startTime指示SparkContext启动时的时间戳。stopped则指示SparkContext是否停止,它采用AtomicBoolean类型。
addedFiles/addedJars & _files/_jars
Spark支持在提交应用时,附带用户自定义的其他文件与JAR包。
addedFiles和addedJars是两个ConcurrentHashMap,用来维护自定义文件及JAR包的URL路径,及它们被加入ConcurrentHashMap当时的时间戳。_files与_jars则接受Spark配置中定义的文件或JAR包路径。
由于它们的逻辑基本相同, 下面以JAR包为例来看一下代码。
代码3.2 - 构造方法中自定义JAR包的初始化
1 2 3 4 _jars = Utils .getUserJars(_conf) if (jars != null ) { jars.foreach(addJar) }
首先用Utils.getUserJars()方法从SparkConf的spark.jars配置项中取出路径组成的序列,然后分别调用addJar()方法。
代码3.3 - o.a.s.SparkContext.addJar()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def addJar (path: String ) { def addJarFile (file: File ): String = { try { if (!file.exists()) { throw new FileNotFoundException (s"Jar ${file.getAbsolutePath} not found" ) } if (file.isDirectory) { throw new IllegalArgumentException ( s"Directory ${file.getAbsoluteFile} is not allowed for addJar" ) } env.rpcEnv.fileServer.addJar(file) } catch { case NonFatal (e) => logError(s"Failed to add $path to Spark environment" , e) null } } if (path == null ) { logWarning("null specified as parameter to addJar" ) } else { val key = if (path.contains("\\" )) { addJarFile(new File (path)) } else { val uri = new URI (path) Utils .validateURL(uri) uri.getScheme match { case null => addJarFile(new File (uri.getRawPath)) case "file" => addJarFile(new File (uri.getPath)) case "local" => "file:" + uri.getPath case _ => path } } if (key != null ) { val timestamp = System .currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp " ) postEnvironmentUpdate() } } } }
addJar()方法检查JAR包路径的合法性和类型,然后调用RpcEnv中的RpcEnvFileServer.addJar()方法,将JAR包加进RPC环境中。在该方法的最后还调用了postEnvironmentUpdate(),用来更新执行环境,这属于后初始化逻辑的一部分,下一节会讲到。
persistentRdds
Spark支持RDD的持久化,可以持久化到内存或磁盘。persistentRdds维护的是持久化RDD的ID与其弱引用的映射关系。通过RDD内自带的cache()/persist()/unpersist()方法可以持久化与反持久化一个RDD,它们最终调用的是SparkContext.persistRDD()/unpersistRDD()内部方法。
代码3.4 - o.a.s.SparkContext.persistRDD()与unpersistRDD()方法
1 2 3 4 5 6 7 8 9 private [spark] def persistRDD (rdd: RDD [_]) { persistentRdds(rdd.id) = rdd } private [spark] def unpersistRDD (rddId: Int , blocking: Boolean = true ) { env.blockManager.master.removeRdd(rddId, blocking) persistentRdds.remove(rddId) listenerBus.post(SparkListenerUnpersistRDD (rddId)) }
executorEnvs & _executorMemory & _sparkUser
executorEnvs是一个HashMap,用来存储需要传递给Executor的环境变量。_executorMemory与_sparkUser就是其中之二,分别代表Executor内存大小和当前启动SparkContext的用户名。
代码3.5 - 构造方法中Executor环境变量的初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 _executorMemory = _conf.getOption("spark.executor.memory" ) .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY" ))) .orElse(Option(System.getenv("SPARK_MEM" )) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024 ) for { (envKey, propKey) <- Seq(("SPARK_TESTING" , "spark.testing" )) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES" )).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES" ) = v } executorEnvs("SPARK_EXECUTOR_MEMORY" ) = executorMemory + "m" executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER" ) = sparkUser
可见,Executor内存可以通过spark.executor.memory配置项、SPARK_EXECUTOR_MEMORY环境变量、SPARK_MEM环境变量指定,优先级依次降低,且默认大小是1GB。
用户名是通过Utils.getCurrentUserName()方法获得的。
checkpointDir
checkpointDir指定集群状态下,RDD检查点在HDFS上保存的目录。检查点的存在是为了当计算过程出错时,能够快速恢复,而不必从头重新计算。SparkContext提供了setCheckpointDir()方法用来设定检查点目录,如下。
代码3.6 - o.a.s.SparkContext.setCheckpointDir()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def setCheckpointDir (directory: String ) { if (!isLocal && Utils .nonLocalPaths(directory).isEmpty) { logWarning("Spark is not running in local mode, therefore the checkpoint directory " + s"must not be on the local filesystem. Directory '$directory ' " + "appears to be on the local filesystem." ) } checkpointDir = Option (directory).map { dir => val path = new Path (dir, UUID .randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } }
localProperties
localProperties用于维护一个Properties数据类型的线程本地变量。它是InheritableThreadLocal类型,继承自ThreadLocal,在后者的基础上允许本地变量从父线程到子线程的继承,也就是该Properties会沿着线程栈传递下去。
_eventLogDir & _eventLogCodec
这两个属性与EventLoggingListener相关。
EventLoggingListener打开时,事件日志会写入_eventLogDir指定的目录,可以用spark.eventLog.dir参数设置。
_eventLogCodec指定事件日志的压缩算法,当通过spark.eventLog.compress参数启用压缩后,就根据spark.io.compression.codec参数配置压缩算法,目前支持lz4、lzf、snappy、zstd四种。
_applicationId & _applicationAttemptId
这两个ID都是TaskScheduler初始化完毕并启动之后才分配的。TaskScheduler启动之后,应用代码的逻辑才真正被执行,并且可能会进行多次尝试。在SparkUI、BlockManager和EventLoggingListener初始化时,也会用到它们。
代码3.7 - 构造方法中_applicationId与_applicationAttemptId的初始化
1 2 3 _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _shutdownHookRef
它用来定义SparkContext的关闭钩子,主要是在JVM退出时,显式地执行SparkContext.stop()方法,以防止用户忘记而留下烂摊子。这实际上是后初始化逻辑,在下面的代码3.8中会出现。
nextShuffleId & nextRddId
这两个ID都是AtomicInteger类型。Shuffle和RDD都需要唯一ID来进行标识,并且它们是递增的。在代码3.4中已经出现过了RDD ID。
SparkContext后初始化
在文章2的ContextCleaner初始化之后,还有一小部分后初始化逻辑,其代码如下所示。
代码3.8 - SparkContext后初始化逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource (_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } logDebug("Adding shutdown hook" ) _shutdownHookRef = ShutdownHookManager .addShutdownHook( ShutdownHookManager .SPARK_CONTEXT_SHUTDOWN_PRIORITY ) { () => logInfo("Invoking stop() from shutdown hook" ) stop() } SparkContext .setActiveContext(this , allowMultipleContexts)
它的主要逻辑在开头的三个方法中,下面来逐一看它们的代码。
setupAndStartListenerBus()方法
代码3.9 - o.a.s.SparkContext.setupAndStartListenerBus()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private def setupAndStartListenerBus (): Unit = { try { conf.get(EXTRA_LISTENERS ).foreach { classNames => val listeners = Utils .loadExtensions(classOf[SparkListenerInterface ], classNames, conf) listeners.foreach { listener => listenerBus.addToSharedQueue(listener) logInfo(s"Registered listener ${listener.getClass().getName()} " ) } } } catch { case e: Exception => try { stop() } finally { throw new SparkException (s"Exception when registering SparkListener" , e) } } listenerBus.start(this , _env.metricsSystem) _listenerBusStarted = true }
这个方法用于注册自定义的监听器,并最终启动LiveListenerBus。自定义监听器都实现了SparkListener特征,通过spark.extraListeners配置参数来指定。然后调用Utils.loadExtensions()方法,通过反射来构建自定义监听器的实例,并将它们注册到LiveListenerBus。
postEnvironmentUpdate()方法
代码3.10 - o.a.s.SparkContext.postEnvironmentUpdate()方法
1 2 3 4 5 6 7 8 9 10 11 private def postEnvironmentUpdate () { if (taskScheduler != null ) { val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq val environmentDetails = SparkEnv .environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) val environmentUpdate = SparkListenerEnvironmentUpdate (environmentDetails) listenerBus.post(environmentUpdate) } }
该方法在添加自定义文件和JAR包时也都有调用,因为添加的资源会对程序的执行环境造成影响。它会取得当前的自定义文件和JAR包列表,以及Spark配置、调度方式,然后通过SparkEnv.environmentDetails()方法再取得JVM参数、Java系统属性等,一同封装成SparkListenerEnvironmentUpdate事件,并投递给事件总线。
postApplicationStart()方法
代码3.11 - o.a.s.SparkContext.postApplicationStart()方法
1 2 3 4 private def postApplicationStart() { listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) }
这个方法比较简单,就是向事件总线投递SparkListenerApplicationStart事件,表示Application已经启动。
其他事项
在这三个方法之后的其他事项如下。
调用TaskScheduler.postStartHook()方法,等待SchedulerBackend初始化完毕。
在度量系统中注册DAGScheduler、BlockManager、ExecutionAllocationManager的度量源,以收集它们的监控数据。
添加关闭钩子,这个在之前已经提过了,不再赘述。
调用伴生对象中的setActiveContext()方法,将当前SparkContext设为活动的。
总结
本文通过梳理SparkContext中的多个辅助属性,进一步了解了一些细节特性,如外部文件和JAR包的初始化、RDD持久化和检查点等。
在SparkContext构造方法的最后,还会执行一些扫尾的工作,如启动事件总线、更新执行环境等。
SparkContext除了初始化之外,还对外提供了不少通用的功能,如生成RDD,产生广播变量与累加器,启动Job等等。
另外,SparkContext类也有伴生对象,里面维护了一些常用的逻辑。
下一篇文章作为SparkContext概况的收尾,就来研究这些剩下的东西。