修改启动类
需要在项目启动时便持续监听
package com.xx;import com.xx.client.CanalClient;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;/*** @author aqi* DateTime: 2021/2/24 4:43 下午* Description: canal启动类*/@SpringBootApplicationpublic class CanalApplication implements CommandLineRunner {@Resourceprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 项目启动,执行canal客户端监听canalClient.run();}}
编写canal客户端
这里操作数据库用的是
package com.xx.client;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.net.InetSocketAddress;import java.util.List;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;/*** @author aqi* DateTime: 2021/2/24 4:48 下午* Description: No Description*/@Componentpublic class CanalClient {//sql队列private Queue> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate JdbcTemplate jdbcTemplate;/*** canal入库方法*/public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服务器ipd地址", 11111), "example", "", "");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException | InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql);}}/*** 数据处理*/private void dataHandle(List
- 工作两年了,不想做客服,转行学数据分析改变未来!
- 专业的数据分析报告应该这么写!
- Cloudcanal数据同步神器
- 数据分析真的很火吗?真的有很多企业需要这样的岗位吗?求大佬指点。
- android9 无法使用数据库,使用greenDao操作本地数据库
- 用户数据中的幸存者偏差
- 只学mysql能就业吗_如果只是 数据库学的好 以后可以找到什么样的工作
- 宽凳科技公布最新进展:已完成百余座城市数据采集,即将发布首张全自动高精度地图
- 易备数据备份软件,主动防勒索备份 MySQL 和 MariaDB 数据库
- 【全网首发开源教程】【Labview机器人仿真与控制】Labview与Solid