找回密码
 立即注册
首页 业界区 安全 17、Canal监听MySQL-Binlog实现数据监听

17、Canal监听MySQL-Binlog实现数据监听

钦娅芬 10 小时前
一、Canal简介:

Canal 是阿里巴巴开源的一款基于数据库增量日志解析的中间件,主要用于实现数据库变更数据的实时同步。
1.png

Canal源码
 
二、工作原理:

1、MySQL主备复制原理:

 
2.png

(1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
(2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
(3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2、canal工作原理:

(1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
(2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
(3)、canal 解析 binary log 对象(原始为 byte 流)
 
三、MySQL 配置(开启 Binlog):

1、开启 Binlog(ROW 模式):
  1. # MySQL 配置文件
  2. # Linux:my.cnf配置文件(/etc/mysql/)
  3. # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
  4. # 开启 Binlog
  5. log_bin = mysql-bin
  6. # 选择 ROW 模式(记录行级变更)
  7. binlog-format = ROW
  8. # 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
  9. server-id = 1
复制代码
3.png

4.png

2、重启 MySQL 并验证:
  1. # 打开命令提示符(cmd/services.msc):
  2. # 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
  3. # 停止MySQL服务:
  4. net stop MySQL57
  5. # 启动MySQL服务:
  6. net start MySQL57
  7. # 验证
  8. SHOW VARIABLES LIKE 'log_bin';
  9. SHOW VARIABLES LIKE 'binlog_format';
复制代码
5.png

6.png

3、创建 Canal 专用账号(权限最小化):
  1. -- 1. 创建支持远程连接的用户(% 表示任意 IP)
  2. -- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
  3. -- 授予权限
  4. -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  5. -- 2. 创建支持本地连接的用户(localhost)
  6. CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
  7. -- 授予相同权限
  8. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
  9. -- 刷新权限,使配置生效
  10. FLUSH PRIVILEGES;
复制代码
 
四、Canal 服务端配置:

1、下载并解压 Canal 服务端:

github-canal包
7.png

2、配置 Canal 实例:

(1)、instance.properties配置:
  1. # MySQL 主库地址(Canal 连接的 MySQL 地址)
  2. canal.instance.master.address=127.0.0.1:3306
  3. # MySQL 账号密码
  4. canal.instance.dbUsername=canal
  5. canal.instance.dbPassword=canal
复制代码
8.png

(2)、windows启动 Canal 服务端:
1)、双击启动bin/startup.bat:
 
9.png

2)、存在黑屏闪退,修改bin/startup.bat,重启:
10.png

3)、日志:
 
11.png

12.png

13.png

14.png

 
五、SpringBoot整合Canal实现MySQL数据监听:

1、POM配置:
  1.         <dependency>
  2.             <groupId>com.alibaba.otter</groupId>
  3.             canal.client</artifactId>
  4.             <version>1.1.8</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>com.alibaba.otter</groupId>
  8.             canal.protocol</artifactId>
  9.             <version>1.1.8</version>
  10.         </dependency>
复制代码
2、YML配置:
  1. canal:
  2.   # 自动启动同步标志位
  3.   auto-sync: true
  4.   instances:
  5.     # 第一个实例
  6.     instance1:
  7.       host: 127.0.0.1
  8.       port: 11111
  9.       # canal server 中配置的实例名(canal.destinations = example)
  10.       name: example
  11.       # 批量拉取条数
  12.       batch-size: 100
  13.       # 无数据时休眠时间(ms)
  14.       sleep-time: 1000
复制代码
3、Entity类声明:

CanalProperties.class
  1. import lombok.Data;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.stereotype.Component;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. /**
  7. * Canal配置属性类(映射YAML配置)
  8. */
  9. @Data
  10. @Component
  11. @ConfigurationProperties(prefix = "canal")
  12. public class CanalProperties {
  13.     // 是否自动启动同步
  14.     private boolean autoSync = true;
  15.     // 多实例配置
  16.     private Map<String, InstanceConfig> instances = new HashMap<>();
  17.     @Data
  18.     public static class InstanceConfig {
  19.         private String host;
  20.         private Integer port;
  21.         private String name;
  22.         private Integer batchSize = 100;
  23.         private Integer sleepTime = 1000;
  24.     }
  25. }
复制代码
DataEventTypeEnum.enum
  1. import org.springframework.util.StringUtils;
  2. import java.util.Arrays;
  3. import java.util.Map;
  4. import java.util.function.Function;
  5. import java.util.stream.Collectors;
  6. public enum DataEventTypeEnum {
  7.     INSERT("INSERT"),
  8.     UPDATE("UPDATE"),
  9.     DELETE("DELETE");
  10.     private final String name;
  11.     DataEventTypeEnum(String name) {
  12.         this.name = name;
  13.     }
  14.     public String NAME() {
  15.         return name;
  16.     }
  17.     private static final Map<String, DataEventTypeEnum> NAME_MAP =
  18.             Arrays.stream(DataEventTypeEnum.values())
  19.                     .collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity()));
  20.     public static DataEventTypeEnum getEnum(String name) {
  21.         if (!StringUtils.hasText(name)) {
  22.             return null;
  23.         }
  24.         return NAME_MAP.get(name);
  25.     }
  26. }
复制代码
JsonMessageType.class
  1. import lombok.Data;
  2. @Data
  3. public class JsonMessageType {
  4.     /**
  5.      * 库名
  6.      */
  7.     private String schemaName;
  8.     /**
  9.      * 表名
  10.      */
  11.     private String tableName;
  12.     /**
  13.      * 事件类型
  14.      * (INSERT/UPDATE/DELETE)
  15.      */
  16.     private String eventType;
  17.     /**
  18.      * 数据JSON字符串
  19.      */
  20.     private String data;
  21. }
复制代码
4、CanalRunnerAutoConfig启动Canal配置:
  1. import com.iven.canal.entity.CanalProperties;
  2. import com.iven.canal.handle.CanalWorkRegistry;
  3. import com.iven.canal.utils.JsonMessageParser;
  4. import lombok.RequiredArgsConstructor;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.boot.ApplicationRunner;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. /**
  10. * Canal自动配置
  11. */
  12. @Slf4j
  13. @Configuration
  14. @RequiredArgsConstructor
  15. public class CanalRunnerAutoConfig {
  16.     private final CanalProperties canalProperties;
  17.     private final JsonMessageParser jsonMessageParser;
  18.     private final CanalWorkRegistry workRegistry;
  19.     @Bean
  20.     public ApplicationRunner canalApplicationRunner() {
  21.         return args -> {
  22.             if (!canalProperties.isAutoSync()) {
  23.                 log.info("Canal自动同步已关闭");
  24.                 return;
  25.             }
  26.             // 如果没有任何Work,则不启动Canal
  27.             if (!workRegistry.hasWork()) {
  28.                 log.info("无表同步处理器,不启动Canal");
  29.                 return;
  30.             }
  31.             // 启动所有配置的Canal实例
  32.             canalProperties.getInstances().forEach((instanceKey, config) -> {
  33.                 CanalRunner runner = new CanalRunner(
  34.                         config.getHost(),
  35.                         config.getPort(),
  36.                         config.getName(),
  37.                         config.getBatchSize(),
  38.                         config.getSleepTime(),
  39.                         jsonMessageParser,
  40.                         workRegistry
  41.                 );
  42.                 runner.start();
  43.             });
  44.         };
  45.     }
  46. }
复制代码
5、CanalRunner拉取数据:
  1. import com.alibaba.fastjson2.JSON;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.protocol.CanalEntry;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import com.iven.canal.entity.JsonMessageType;
  7. import com.iven.canal.handle.CanalWork;
  8. import com.iven.canal.handle.CanalWorkRegistry;
  9. import com.iven.canal.utils.JsonMessageParser;
  10. import lombok.extern.slf4j.Slf4j;
  11. import java.net.InetSocketAddress;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.stream.Collectors;
  15. /**
  16. * Canal运行器
  17. * 手动管理生命周期
  18. *
  19. * 1、启动Canal实例
  20. * 2、处理解析后的数据
  21. */
  22. @Slf4j
  23. public class CanalRunner {
  24.     private Thread thread;
  25.     private final String canalIp;
  26.     private final Integer canalPort;
  27.     private final String canalInstance;
  28.     private final Integer batchSize;
  29.     private final Integer sleepTime;
  30.     private final JsonMessageParser jsonMessageParser;
  31.     private final CanalWorkRegistry workRegistry;
  32.     public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize,
  33.                        Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) {
  34.         this.canalIp = canalIp;
  35.         this.canalPort = canalPort;
  36.         this.canalInstance = canalInstance;
  37.         this.batchSize = batchSize;
  38.         this.sleepTime = sleepTime;
  39.         this.jsonMessageParser = jsonMessageParser;
  40.         this.workRegistry = workRegistry;
  41.     }
  42.     /**
  43.      * 启动Canal实例
  44.      */
  45.     public void start() {
  46.         if (thread == null || !thread.isAlive()) {
  47.             thread = new Thread(this::run, "canal-runner-" + canalInstance);
  48.             thread.start();
  49.             log.info("Canal实例[{}]启动成功", canalInstance);
  50.         }
  51.     }
  52.     /**
  53.      * 停止Canal实例
  54.      */
  55.     public void stop() {
  56.         if (thread != null && !thread.isInterrupted()) {
  57.             thread.interrupt();
  58.         }
  59.     }
  60.     private void run() {
  61.         log.info("Canal实例[{}]启动中...", canalInstance);
  62.         CanalConnector connector = CanalConnectors.newSingleConnector(
  63.                 new InetSocketAddress(canalIp, canalPort), canalInstance, "", "");
  64.         try {
  65.             connector.connect();
  66.             // 订阅所有表(后续通过Work过滤)
  67.             connector.subscribe();
  68.             connector.rollback();
  69.             while (!thread.isInterrupted()) {
  70.                 Message message = connector.getWithoutAck(batchSize);
  71.                 long batchId = message.getId();
  72.                 List<CanalEntry.Entry> entries = message.getEntries();
  73.                 if (batchId == -1 || entries.isEmpty()) {
  74.                     Thread.sleep(sleepTime);
  75.                 } else {
  76.                     // 解析数据并处理
  77.                     Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries);
  78.                     processParsedData(parsedData);
  79.                     // 确认处理成功
  80.                     connector.ack(batchId);
  81.                 }
  82.             }
  83.         } catch (InterruptedException e) {
  84.             log.info("Canal实例[{}]被中断", canalInstance);
  85.         } catch (Exception e) {
  86.             log.error("Canal实例[{}]运行异常", canalInstance, e);
  87.             // 处理失败回滚
  88.             connector.rollback();
  89.         } finally {
  90.             connector.disconnect();
  91.             log.info("Canal实例[{}]已停止", canalInstance);
  92.         }
  93.     }
  94.     /**
  95.      * 调用Work处理解析后的数据
  96.      *
  97.      * @param parsedData
  98.      */
  99.     private void processParsedData(Map<String, List<JsonMessageType>> parsedData) {
  100.         parsedData.forEach((tableKey, dataList) -> {
  101.             // 获取该表的所有Work
  102.             List<CanalWork> works = workRegistry.getWorksByTable(tableKey);
  103.             if (!works.isEmpty() && !dataList.isEmpty()) {
  104.                 // 转换数据格式(Json字符串 -> Map)
  105.                 List<Map<String, Object>> dataMaps = dataList.stream()
  106.                         .map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class))
  107.                         .collect(Collectors.toList());
  108.                 String schemaName = dataList.get(0).getSchemaName();
  109.                 // 调用每个Work的处理方法
  110.                 works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName));
  111.             }
  112.         });
  113.     }
  114.    
  115. }
复制代码
6、JsonMessageParser解析数据:

MessageParser
  1. import com.alibaba.otter.canal.protocol.CanalEntry;
  2. import java.util.List;
  3. /**
  4. * 消息解析器接口
  5. *
  6. */
  7. public interface MessageParser<T> {
  8.     T parse(List<CanalEntry.Entry> canalEntryList);
  9. }
复制代码
 JsonMessageParser
  1. import com.alibaba.fastjson2.JSON;
  2. import com.alibaba.otter.canal.protocol.CanalEntry;
  3. import com.iven.canal.entity.DataEventTypeEnum;
  4. import com.iven.canal.entity.JsonMessageType;
  5. import com.iven.canal.handle.CanalWorkRegistry;
  6. import lombok.RequiredArgsConstructor;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.util.CollectionUtils;
  10. import java.util.*;
  11. /**
  12. * Json消息解析器
  13. *
  14. * 1、遍历原始数据列表接收
  15. * 2、解析行级变更数据
  16. * 3、封装为 JsonParseType
  17. */
  18. @Slf4j
  19. @Component
  20. @RequiredArgsConstructor
  21. public class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> {
  22.     private final CanalWorkRegistry workRegistry;
  23.     @Override
  24.     public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) {
  25.         Map<String, List<JsonMessageType>> dataMap = new HashMap<>();
  26.         for (CanalEntry.Entry entry : canalEntryList) {
  27.             if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {
  28.                 continue;
  29.             }
  30.             // 1. 获取库名、表名、带库名的表标识
  31.             String schemaName = entry.getHeader().getSchemaName();
  32.             String tableName = entry.getHeader().getTableName();
  33.             String fullTableName = schemaName + "." + tableName;
  34.             // 2. 检查是否有对应的处理器(支持两种格式)
  35.             boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName));
  36.             boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName));
  37.             if (!hasFullTableWork && !hasSimpleTableWork) {
  38.                 log.debug("表[{}]和[{}]均无同步处理器,跳过", fullTableName, tableName);
  39.                 continue;
  40.             }
  41.             try {
  42.                 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  43.                 rowChange.getRowDatasList().forEach(rowData -> {
  44.                     JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData);
  45.                     if (jsonMessageType != null) {
  46.                         // 3. 按存在的处理器类型,分别添加到数据映射中
  47.                         if (hasFullTableWork) {
  48.                             dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType);
  49.                         }
  50.                         if (hasSimpleTableWork) {
  51.                             dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType);
  52.                         }
  53.                     }
  54.                 });
  55.             } catch (Exception e) {
  56.                 log.error("解析数据失败", e);
  57.             }
  58.         }
  59.         return dataMap;
  60.     }
  61.     private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType,
  62.                                          CanalEntry.RowData rowData) {
  63.         // 获取库名
  64.         String schemaName = header.getSchemaName();
  65.         // 获取表名
  66.         String tableName = header.getTableName();
  67.         if (eventType == CanalEntry.EventType.DELETE) {
  68.             return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList());
  69.         } else if (eventType == CanalEntry.EventType.INSERT) {
  70.             return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList());
  71.         } else if (eventType == CanalEntry.EventType.UPDATE) {
  72.             return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList());
  73.         }
  74.         return null;
  75.     }
  76.     private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType,
  77.                                         List<CanalEntry.Column> columns) {
  78.         Map<String, String> data = new HashMap<>();
  79.         columns.forEach(column -> data.put(column.getName(), column.getValue()));
  80.         JsonMessageType result = new JsonMessageType();
  81.         result.setSchemaName(schemaName);
  82.         result.setTableName(tableName);
  83.         result.setEventType(eventType);
  84.         result.setData(JSON.toJSONString(data));
  85.         return result;
  86.     }
  87. }
复制代码
7、CanalWorkRegistry匹配处理器:

CanalWork
  1. import java.util.List;
  2. import java.util.Map;
  3. /**
  4. * Canal-Work处理器
  5. *
  6. */
  7. public interface CanalWork {
  8.     /**
  9.      * 返回需要处理的表名(如:tb_user)
  10.      */
  11.     String getTableName();
  12.     /**
  13.      * 处理表数据的方法
  14.      * @param dataList 表数据列表(每条数据是字段名-值的Map)
  15.      * @param eventType 事件类型(INSERT/UPDATE/DELETE)
  16.      * @param schemaName 库名(用于区分不同库的表)
  17.      */
  18.     void handle(List<Map<String, Object>> dataList, String eventType, String schemaName);
  19. }
复制代码
CanalWorkRegistry
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.BeansException;
  3. import org.springframework.context.ApplicationContext;
  4. import org.springframework.context.ApplicationContextAware;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Collections;
  7. import java.util.HashMap;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.stream.Collectors;
  11. /**
  12. * 处理器注册器,
  13. * 扫描并缓存所有CanalWork实现类,按表名分组管理,提供查询表对应处理器的方法
  14. */
  15. @Slf4j
  16. @Component
  17. public class CanalWorkRegistry implements ApplicationContextAware {
  18.     /**
  19.      * 表名 -> Work列表(支持一个表多个Work)
  20.      */
  21.     private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>();
  22.     @Override
  23.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  24.         // 扫描所有CanalWork实现类
  25.         Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class);
  26.         // 按表名分组
  27.         tableWorkMap.putAll(workMap.values().stream()
  28.                 .collect(Collectors.groupingBy(CanalWork::getTableName)));
  29.         log.info("已注册的表同步处理器: {}", tableWorkMap.keySet());
  30.     }
  31.     /**
  32.      * 获取指定表的Work列表
  33.      *
  34.      * @param tableName
  35.      * @return
  36.      */
  37.     public List<CanalWork> getWorksByTable(String tableName) {
  38.         return tableWorkMap.getOrDefault(tableName, Collections.emptyList());
  39.     }
  40.     /**
  41.      * 判断是否有表需要处理
  42.      *
  43.      * @return
  44.      */
  45.     public boolean hasWork() {
  46.         return !tableWorkMap.isEmpty();
  47.     }
  48. }
复制代码
8、CanalWork实现类处理数据:
  1. import com.iven.canal.entity.DataEventTypeEnum;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.stereotype.Component;
  4. import java.util.List;
  5. import java.util.Map;
  6. /**
  7. * tb_user表数据处理
  8. *
  9. * Canal服务 → 变更数据 → CanalRunner 拉取 → JsonMessageParser 解析 →
  10. * 筛选出 tb_user 数据 → CanalWorkRegistry 获取 TbUserCanalWorkHandle →
  11. * 调用 handle 方法 → 按事件类型(INSERT/UPDATE/DELETE)执行对应逻辑
  12. */
  13. @Slf4j
  14. @Component
  15. public class TbUserCanalWorkHandle implements CanalWork {
  16.     @Override
  17.     public String getTableName() {
  18.         return "demo.tb_user";
  19.     }
  20.     @Override
  21.     public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) {
  22.         log.info("开始处理[{}库]的tb_user表数据,事件类型:{},数据量:{}", schemaName, eventType, dataList.size());
  23.         DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType);
  24.         
  25.         // 根据事件类型分别处理
  26.         switch (dataEventTypeEnum) {
  27.             case INSERT:
  28.                 handleInsert(dataList, schemaName);
  29.                 break;
  30.             case UPDATE:
  31.                 handleUpdate(dataList, schemaName);
  32.                 break;
  33.             case DELETE:
  34.                 handleDelete(dataList, schemaName);
  35.                 break;
  36.             default:
  37.                 log.warn("未处理的事件类型:{}", eventType);
  38.         }
  39.     }
  40.     /**
  41.      * 处理新增数据
  42.      */
  43.     private void handleInsert(List<Map<String, Object>> dataList, String schemaName) {
  44.         log.info("处理[{}库]的tb_user新增数据,共{}条", schemaName, dataList.size());
  45.         dataList.forEach(data -> {
  46.             Object userId = data.get("id");
  47.             Object username = data.get("name");
  48.             // 新增逻辑:如同步到ES、缓存初始化等
  49.             log.info("新增用户 - ID: {}, 用户名: {}", userId, username);
  50.         });
  51.     }
  52.     /**
  53.      * 处理更新数据
  54.      */
  55.     private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) {
  56.         log.info("处理[{}库]的tb_user更新数据,共{}条", schemaName, dataList.size());
  57.         dataList.forEach(data -> {
  58.             Object userId = data.get("id");
  59.             Object newPhone = data.get("phone"); // 假设更新了手机号
  60.             // 更新逻辑:如更新ES文档、刷新缓存等
  61.             log.info("更新用户 - ID: {}, 新手机号: {}", userId, newPhone);
  62.         });
  63.     }
  64.     /**
  65.      * 处理删除数据
  66.      */
  67.     private void handleDelete(List<Map<String, Object>> dataList, String schemaName) {
  68.         log.info("处理[{}库]的tb_user删除数据,共{}条", schemaName, dataList.size());
  69.         dataList.forEach(data -> {
  70.             Object userId = data.get("id");
  71.             // 删除逻辑:如从ES删除、清除缓存等
  72.             log.info("删除用户 - ID: {}", userId);
  73.         });
  74.     }
  75. }
复制代码
15.png

 
调度流程:
整个流程通过注册器管理处理器、解析器转换数据格式、运行器控制 Canal 客户端生命周期,最终将数据库变更事件分发到对应表的处理器,实现了变更数据的监听与业务处理解耦。用户只需实现CanalWork接口,即可自定义任意表的变更处理逻辑。
(1)、初始化阶段
1)、Spring 容器启动时,CanalWorkRegistry 扫描所有 CanalWork 实现类(如 TbUserCanalWorkHandle),按表名分组缓存到 tableWorkMap 中。
2)、CanalRunnerAutoConfig 检查配置(CanalProperties),若开启自动同步且存在 CanalWork,则为每个 Canal 实例创建 CanalRunner 并启动。
(2)、运行阶段
1)、CanalRunner 建立与 Canal 服务的连接,订阅数据库变更事件。
2)、循环拉取变更数据(Message),通过 JsonMessageParser 解析为表名 - 数据列表的映射(Map)。
3)、调用 processParsedData 方法,根据表名从 CanalWorkRegistry 获取对应的 CanalWork 列表,执行 handle 方法处理数据。
(3)、销毁阶段
程序停止时,CanalRunner 中断线程,断开与 Canal 服务的连接。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册