SparkListener血缘——Openlineage插件实现思路( 二 )


二、 的插件实现
相关介绍可以参见我的专栏
数据地图
2.1 初始化参数
的Spark监听器实现代码类为io..spark.agent.ener,每次在监听器初始化时,都会从Spark Conf 中拿取对应的参数,参数主要包含解析血缘完成后,需要发送的目标信息(比如发送给kafka的地址或者是http通信的url)
@Overridepublic void onApplicationStart(SparkListenerApplicationStart applicationStart) {initializeContextFactoryIfNotInitialized();}private void initializeContextFactoryIfNotInitialized() {if (contextFactory != null || isDisabled) {return;}SparkEnv sparkEnv = SparkEnv$.MODULE$.get();if (sparkEnv != null) {try {ArgumentParser args = ArgumentParser.parse(sparkEnv.conf());contextFactory = new ContextFactory(new EventEmitter(args));··· ······ ···// how argument parser workspublic static ArgumentParser parse(SparkConf conf) {ArgumentParserBuilder builder = ArgumentParser.builder();adjustDeprecatedConfigs(conf);conf.setIfMissing(SPARK_CONF_DISABLED_FACETS, DEFAULT_DISABLED_FACETS);conf.setIfMissing(SPARK_CONF_TRANSPORT_TYPE, "http");if (conf.get(SPARK_CONF_TRANSPORT_TYPE).equals("http")) {findSparkConfigKey(conf, SPARK_CONF_HTTP_URL).ifPresent(url -> UrlParser.parseUrl(url).forEach(conf::set));}findSparkConfigKey(conf, SPARK_CONF_APP_NAME).filter(str -> !str.isEmpty()).ifPresent(builder::appName);findSparkConfigKey(conf, SPARK_CONF_NAMESPACE).ifPresent(builder::namespace);findSparkConfigKey(conf, SPARK_CONF_JOB_NAME).ifPresent(builder::jobName);findSparkConfigKey(conf, SPARK_CONF_PARENT_RUN_ID).ifPresent(builder::parentRunId);builder.openLineageYaml(extractOpenlineageConfFromSparkConf(conf));return builder.build();
注意这里的方法主要是为了将conf解析成可以序列化的类用于发送,见下述代码

SparkListener血缘——Openlineage插件实现思路

文章插图
/** Configuration for {@link OpenLineageClient}. */public class OpenLineageYaml {@Getter@JsonProperty("transport")private TransportConfig transportConfig;@Getter@JsonProperty("facets")private FacetsConfig facetsConfig;}
2.2 类加载信息
在初始化时,会生成类,该类会对进行信息解析,生成具体的通信类,用于发送给元数据地图或者是消息中间件(如kafka)
this.client =OpenLineageClient.builder().transport(new TransportFactory(argument.getOpenLineageYaml().getTransportConfig()).build()).disableFacets(disabledFacets).build();}
接收参数,并且根据spark环境识别spark版本(目前支持,3的血缘解析),内部生成对应版本的 。会根据spark 不同的版本加载访问器 。
static VisitorFactory getInstance() {String version = package$.MODULE$.SPARK_VERSION();try {return (VisitorFactory) Class.forName(getVisitorFactoryForVersion(version)).newInstance();} catch (Exception e) {throw new RuntimeException(String.format("Can't instantiate visitor factory for version: %s", version), e);}}··· ······ ···static String getVisitorFactoryForVersion(String version) {if (version.startsWith("2.")) {return SPARK2_FACTORY_NAME;} else if (version.startsWith("3.2")) {return SPARK32_FACTORY_NAME;} else {return SPARK3_FACTORY_NAME;}}··· ······ ···private static final String SPARK2_FACTORY_NAME ="io.openlineage.spark.agent.lifecycle.Spark2VisitorFactoryImpl";private static final String SPARK3_FACTORY_NAME ="io.openlineage.spark.agent.lifecycle.Spark3VisitorFactoryImpl";private static final String SPARK32_FACTORY_NAME ="io.openlineage.spark.agent.lifecycle.Spark32VisitorFactoryImpl";
同时还负责创建 具体的(每次触发监听事件都会在的中更新一个),成员变量(内含)主要用于处理分析Spark逻辑计划 。在初始化时,会注册元数据构造器,以及具体的Input/逻辑计划访问器(从中获取先前加载好的对应版本的访问器) 。