SparkListener血缘——Openlineage插件实现思路

文章目录二、 的插件实现 三、的具体实现思路(后续)四、总结
一、 1.1 源码剖析
Spark 主要用于监控Spark应用程序的各项活动,可以通过注册到实现事件的监听 。只要在启动Spark 时参数加上"spark.",spark就会通过Utils.解析并植入我们实现的 。
private def setupAndStartListenerBus(): Unit = {// Use reflection to instantiate listeners specified via `spark.extraListeners`try {val listenerClassNames: Seq[String] =conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")for (className <- listenerClassNames) {// Use reflection to find the right constructorval constructors = {val listenerClass = Utils.classForName(className)listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]}val constructorTakingSparkConf = constructors.find { c =>c.getParameterTypes.sameElements(Array(classOf[SparkConf]))}lazy val zeroArgumentConstructor = constructors.find { c =>c.getParameterTypes.isEmpty}val listener: SparkListenerInterface = {if (constructorTakingSparkConf.isDefined) {constructorTakingSparkConf.get.newInstance(conf)} else if (zeroArgumentConstructor.isDefined) {zeroArgumentConstructor.get.newInstance()} else {throw new SparkException(s"$className did not have a zero-argument constructor or a" +" single-argument constructor that accepts SparkConf. Note: if the class is" +" defined inside of another Scala class, then its constructors may accept an" +" implicit parameter that references the enclosing class; in this case, you must" +" define the listener as a top-level class in order to prevent this extra" +" parameter from breaking Spark's ability to find a valid constructor.")}}listenerBus.addListener(listener)logInfo(s"Registered listener $className")}} catch {case e: Exception =>try {stop()} finally {throw new SparkException(s"Exception when registering SparkListener", e)}}listenerBus.start()
是所有的总线,start方法会启动线程,在有事件进入时,会把事件传递给所有已经注册的
/** Post the application start event */private def postApplicationStart() {// Note: this code assumes that the task scheduler has been initialized and has contacted// the cluster manager to get an application ID (in case the cluster manager provides one).listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))}
上述代码事例描述了App启动时,传递event(事件)的过程 。post方法会将event先塞入队列,然后会循环拿取event,并且根据类型判断应该触发所有已注册监听器的哪个方法 。
protected override def doPostEvent(listener: StreamingListener,event: StreamingListenerEvent): Unit = {event match {case receiverStarted: StreamingListenerReceiverStarted =>listener.onReceiverStarted(receiverStarted)case receiverError: StreamingListenerReceiverError =>listener.onReceiverError(receiverError)case receiverStopped: StreamingListenerReceiverStopped =>listener.onReceiverStopped(receiverStopped)case batchSubmitted: StreamingListenerBatchSubmitted =>listener.onBatchSubmitted(batchSubmitted)case batchStarted: StreamingListenerBatchStarted =>listener.onBatchStarted(batchStarted)case batchCompleted: StreamingListenerBatchCompleted =>listener.onBatchCompleted(batchCompleted)case outputOperationStarted: StreamingListenerOutputOperationStarted =>listener.onOutputOperationStarted(outputOperationStarted)case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>listener.onOutputOperationCompleted(outputOperationCompleted)case _ =>}}
1.2提供的方法
@DeveloperApiabstract class SparkListener extends SparkListenerInterface {override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }override def onOtherEvent(event: SparkListenerEvent): Unit = { }}