SparkListener血缘——Openlineage插件实现思路( 四 ) 2023-10-08 生活百科逻辑 > rdd = stage.rdd();List nodes = new ArrayList<>();nodes.addAll(Arrays.asList(event.stageInfo(), stage));nodes.addAll(Rdds.flattenRDDs(rdd));return populateRun(parentRunFacet, runEventBuilder, jobBuilder, nodes);} 核心代码逻辑在 中,它负责包装元数据,拿取元数据信息,并且调用先前的注册好的访问器进行逻辑计划的解析 。仅拿获取逻辑计划中的Input数据集举例,在类加载时拿取(访问器)的逻辑如下: @Overridepublic Collection>>createInputDatasetQueryPlanVisitors(OpenLineageContext context) {List>> inputDatasets =visitorFactory.getInputVisitors(context);ImmutableList>> inputDatasetVisitors =ImmutableList.>>builder().addAll(generate(eventHandlerFactories,factory -> factory.createInputDatasetQueryPlanVisitors(context))).addAll(inputDatasets).build();context.getInputDatasetQueryPlanVisitors().addAll(inputDatasetVisitors);return inputDatasetVisitors;} 每次触发监听时,会调用 中的 收取 Input数据集的逻辑(即map()): Function1> inputVisitor =visitLogicalPlan(PlanUtils.merge(inputDatasetQueryPlanVisitors));List datasets =Stream.concat(buildDatasets(nodes, inputDatasetBuilders),openLineageContext.getQueryExecution().map(qe ->fromSeq(qe.optimizedPlan().map(inputVisitor)).stream().flatMap(Collection::stream).map(((Class) InputDataset.class)::cast)).orElse(Stream.empty())).collect(Collectors.toList()); 2.5 获取元数据 在获取完所有元数据后,会包装成一个 private RunEvent populateRun(Optional parentRunFacet,RunEventBuilder runEventBuilder,JobBuilder jobBuilder,List nodes) {OpenLineage openLineage = openLineageContext.getOpenLineage();RunFacetsBuilder runFacetsBuilder = openLineage.newRunFacetsBuilder();OpenLineage.JobFacetsBuilder jobFacetsBuilder =openLineageContext.getOpenLineage().newJobFacetsBuilder();parentRunFacet.ifPresent(runFacetsBuilder::parent);OpenLineage.JobFacets jobFacets = buildJobFacets(nodes, jobFacetBuilders, jobFacetsBuilder);List inputDatasets = buildInputDatasets(nodes);List outputDatasets = buildOutputDatasets(nodes);··· ······ ···return runEventBuilder.build(); 最后调用emit方法,进行的元信息发送 public void emit(OpenLineage.RunEvent event) {try {this.client.emit(event);log.debug("Emitting lineage completed successfully: {}", OpenLineageClientUtils.toJson(event));} catch (OpenLineageClientException exception) {log.error("Could not emit lineage w/ exception", exception);}} 这里报红可以忽略,因为采用@slf4j注解,代码会在后生成 三、的具体实现思路(后续) 这块留个新坑,的主要逻辑就是Spark逻辑计划(树)的遍历操作,涉及到的内容会比较复杂,不同的插件遍历思路不一样,所以博主会先从Spark本身如何遍历逻辑计划来作铺垫,后续逐步拓展到每个插件的实现思路 。 四、总结 【SparkListener血缘——Openlineage插件实现思路】后续还会介绍的有:Atlas 血缘插件实现思路、血缘插件实现思路、数据血缘解析、Atlas--多系统元数据同步 。欢迎大家留言评论~ 上一页1234下一页 字节跳动DataLeap数据血缘实践 USB摄像头——v4l2打开设备、获取设备支持的格式【以t113-s3为例】 【大数据day04】——JVM优化 上(jvm的内存模型 身份鉴别——Linux配置登录失败处理功能 IE11 —— F12 开发者工具 AI与游戏——引言 量化交易——波动率 OSChina 周六乱弹 ——淘宝上买U盘 你们猜店家送的啥礼品 ascll码 — c语言转换 电脑小技巧系列——私密文件的保护措施