SparkListener血缘——Openlineage插件实现思路( 四 )

rdd = stage.rdd();List nodes = new ArrayList<>();nodes.addAll(Arrays.asList(event.stageInfo(), stage));nodes.addAll(Rdds.flattenRDDs(rdd));return populateRun(parentRunFacet, runEventBuilder, jobBuilder, nodes);}
核心代码逻辑在 中,它负责包装元数据,拿取元数据信息,并且调用先前的注册好的访问器进行逻辑计划的解析 。仅拿获取逻辑计划中的Input数据集举例,在类加载时拿取(访问器)的逻辑如下:
@Overridepublic Collection>>createInputDatasetQueryPlanVisitors(OpenLineageContext context) {List>> inputDatasets =visitorFactory.getInputVisitors(context);ImmutableList>> inputDatasetVisitors =ImmutableList.>>builder().addAll(generate(eventHandlerFactories,factory -> factory.createInputDatasetQueryPlanVisitors(context))).addAll(inputDatasets).build();context.getInputDatasetQueryPlanVisitors().addAll(inputDatasetVisitors);return inputDatasetVisitors;}
每次触发监听时,会调用 中的 收取 Input数据集的逻辑(即map()):
Function1> inputVisitor =visitLogicalPlan(PlanUtils.merge(inputDatasetQueryPlanVisitors));List datasets =Stream.concat(buildDatasets(nodes, inputDatasetBuilders),openLineageContext.getQueryExecution().map(qe ->fromSeq(qe.optimizedPlan().map(inputVisitor)).stream().flatMap(Collection::stream).map(((Class) InputDataset.class)::cast)).orElse(Stream.empty())).collect(Collectors.toList());
2.5 获取元数据
在获取完所有元数据后,会包装成一个
private RunEvent populateRun(Optional parentRunFacet,RunEventBuilder runEventBuilder,JobBuilder jobBuilder,List nodes) {OpenLineage openLineage = openLineageContext.getOpenLineage();RunFacetsBuilder runFacetsBuilder = openLineage.newRunFacetsBuilder();OpenLineage.JobFacetsBuilder jobFacetsBuilder =openLineageContext.getOpenLineage().newJobFacetsBuilder();parentRunFacet.ifPresent(runFacetsBuilder::parent);OpenLineage.JobFacets jobFacets = buildJobFacets(nodes, jobFacetBuilders, jobFacetsBuilder);List inputDatasets = buildInputDatasets(nodes);List outputDatasets = buildOutputDatasets(nodes);··· ······ ···return runEventBuilder.build();
最后调用emit方法,进行的元信息发送
public void emit(OpenLineage.RunEvent event) {try {this.client.emit(event);log.debug("Emitting lineage completed successfully: {}", OpenLineageClientUtils.toJson(event));} catch (OpenLineageClientException exception) {log.error("Could not emit lineage w/ exception", exception);}}
这里报红可以忽略,因为采用@slf4j注解,代码会在后生成
三、的具体实现思路(后续)
这块留个新坑,的主要逻辑就是Spark逻辑计划(树)的遍历操作,涉及到的内容会比较复杂,不同的插件遍历思路不一样,所以博主会先从Spark本身如何遍历逻辑计划来作铺垫,后续逐步拓展到每个插件的实现思路 。
四、总结
【SparkListener血缘——Openlineage插件实现思路】后续还会介绍的有:Atlas 血缘插件实现思路、血缘插件实现思路、数据血缘解析、Atlas--多系统元数据同步 。欢迎大家留言评论~