Flume+Kafka+HDFS 业务数据采集_零点漂移处理方法

最近做了一个业务数据采集,总体架构就是
Flume-+ kafka=>
Kafka =>
Flume-kafka++ hdfs sink (标红的地方就是我们处理零点漂移的位置)
需求:采集用户行为数据到HDFS,以天为单位分文件.
分析:因为传输数据需要时间,可能23:59:58的日志,到hdfs已经第二天了.所以一不小心,就漂移了.
如果不考虑零点漂移,可以不使用kafka souce +的组合,直接使用kafka .

Flume+Kafka+HDFS  业务数据采集_零点漂移处理方法

文章插图
但是使用kafka 有一个缺陷,就是没法写拦截器!
因为kafka 读取数据,不需要有端,但是拦截器是绑在端上的,没有就写不了拦截器.
在这里我就使用了一个拦截器,在每条event的Head上加上时间标记. 这样做有什么依据吗? 这合理吗?
【Flume+Kafka+HDFS业务数据采集_零点漂移处理方法】那又要使用秘籍了! 直接看看
这是flume官网上hdfs sink中的一行note.可以看到,要在上加个时间戳!!!
ok,有官方撑腰,这我就不怕了,直接IDEA 启动!!!
Flume+Kafka+HDFS  业务数据采集_零点漂移处理方法

文章插图
先来说一下flume拦截器的书写点
首先实现接口,然后重写其中方法,别忘记还要写一个静态内部类!!
@Overridepublic Event intercept(Event event) {Map headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}
重要的就是这个单个Event的方法,毕竟另一个对List的方法其实就是反复调用这个
首先要先拿到event的,然后将json格式的日志文件解析出来,将名为"ts"()对应的数据添加到中,即可!
现在来一个完整的拦截器代码~~
import com.alibaba.fastjson.JSONObject;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.List;import java.util.Map;public class TimeStampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List intercept(List events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}}
ok!这样就解决了咱零点漂移的问题!