| 一、Canal简介: 
 Canal 是阿里巴巴开源的一款基于数据库增量日志解析的中间件,主要用于实现数据库变更数据的实时同步。
 
 Canal源码
 
 二、工作原理:
 
 1、MySQL主备复制原理:
 
 
 (1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
 (2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
 (3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
 2、canal工作原理:
 
 (1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
 (2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
 (3)、canal 解析 binary log 对象(原始为 byte 流)
 
 三、MySQL 配置(开启 Binlog):
 
 1、开启 Binlog(ROW 模式):
 
 复制代码# MySQL 配置文件# Linux:my.cnf配置文件(/etc/mysql/)# Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)# 开启 Binloglog_bin = mysql-bin# 选择 ROW 模式(记录行级变更)binlog-format = ROW# 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)server-id = 1
 2、重启 MySQL 并验证:
 
 复制代码# 打开命令提示符(cmd/services.msc):# 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。# 停止MySQL服务:net stop MySQL57# 启动MySQL服务:net start MySQL57# 验证SHOW VARIABLES LIKE 'log_bin';SHOW VARIABLES LIKE 'binlog_format';
 3、创建 Canal 专用账号(权限最小化):
 
 复制代码-- 1. 创建支持远程连接的用户(% 表示任意 IP)-- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';-- 授予权限-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- 2. 创建支持本地连接的用户(localhost)CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';-- 授予相同权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';-- 刷新权限,使配置生效FLUSH PRIVILEGES;
四、Canal 服务端配置:
 
 1、下载并解压 Canal 服务端:
 
 github-canal包
 
 2、配置 Canal 实例:
 
 (1)、instance.properties配置:
 复制代码# MySQL 主库地址(Canal 连接的 MySQL 地址)canal.instance.master.address=127.0.0.1:3306# MySQL 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canal
(2)、windows启动 Canal 服务端:
 1)、双击启动bin/startup.bat:
 
 2)、存在黑屏闪退,修改bin/startup.bat,重启:
 
 3)、日志:
 
 
 
 
 
 五、SpringBoot整合Canal实现MySQL数据监听:
 
 1、POM配置:
 
 2、YML配置:复制代码        <dependency>            <groupId>com.alibaba.otter</groupId>            canal.client</artifactId>            <version>1.1.8</version>        </dependency>        <dependency>            <groupId>com.alibaba.otter</groupId>            canal.protocol</artifactId>            <version>1.1.8</version>        </dependency>
 3、Entity类声明:复制代码canal:  # 自动启动同步标志位  auto-sync: true  instances:    # 第一个实例    instance1:      host: 127.0.0.1      port: 11111      # canal server 中配置的实例名(canal.destinations = example)      name: example      # 批量拉取条数      batch-size: 100      # 无数据时休眠时间(ms)      sleep-time: 1000
 CanalProperties.class
 DataEventTypeEnum.enum复制代码import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;/** * Canal配置属性类(映射YAML配置) */@Data@Component@ConfigurationProperties(prefix = "canal")public class CanalProperties {    // 是否自动启动同步    private boolean autoSync = true;    // 多实例配置    private Map<String, InstanceConfig> instances = new HashMap<>();    @Data    public static class InstanceConfig {        private String host;        private Integer port;        private String name;        private Integer batchSize = 100;        private Integer sleepTime = 1000;    }}
JsonMessageType.class复制代码import org.springframework.util.StringUtils;import java.util.Arrays;import java.util.Map;import java.util.function.Function;import java.util.stream.Collectors;public enum DataEventTypeEnum {    INSERT("INSERT"),    UPDATE("UPDATE"),    DELETE("DELETE");    private final String name;    DataEventTypeEnum(String name) {        this.name = name;    }    public String NAME() {        return name;    }    private static final Map<String, DataEventTypeEnum> NAME_MAP =            Arrays.stream(DataEventTypeEnum.values())                    .collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity()));    public static DataEventTypeEnum getEnum(String name) {        if (!StringUtils.hasText(name)) {            return null;        }        return NAME_MAP.get(name);    }}
4、CanalRunnerAutoConfig启动Canal配置:复制代码import lombok.Data;@Datapublic class JsonMessageType {    /**     * 库名     */    private String schemaName;    /**     * 表名     */    private String tableName;    /**     * 事件类型     * (INSERT/UPDATE/DELETE)     */    private String eventType;    /**     * 数据JSON字符串     */    private String data;}
 5、CanalRunner拉取数据:复制代码import com.iven.canal.entity.CanalProperties;import com.iven.canal.handle.CanalWorkRegistry;import com.iven.canal.utils.JsonMessageParser;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.ApplicationRunner;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Canal自动配置 */@Slf4j@Configuration@RequiredArgsConstructorpublic class CanalRunnerAutoConfig {    private final CanalProperties canalProperties;    private final JsonMessageParser jsonMessageParser;    private final CanalWorkRegistry workRegistry;    @Bean    public ApplicationRunner canalApplicationRunner() {        return args -> {            if (!canalProperties.isAutoSync()) {                log.info("Canal自动同步已关闭");                return;            }            // 如果没有任何Work,则不启动Canal            if (!workRegistry.hasWork()) {                log.info("无表同步处理器,不启动Canal");                return;            }            // 启动所有配置的Canal实例            canalProperties.getInstances().forEach((instanceKey, config) -> {                CanalRunner runner = new CanalRunner(                        config.getHost(),                        config.getPort(),                        config.getName(),                        config.getBatchSize(),                        config.getSleepTime(),                        jsonMessageParser,                        workRegistry                );                runner.start();            });        };    }}
 6、JsonMessageParser解析数据:复制代码import com.alibaba.fastjson2.JSON;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.iven.canal.entity.JsonMessageType;import com.iven.canal.handle.CanalWork;import com.iven.canal.handle.CanalWorkRegistry;import com.iven.canal.utils.JsonMessageParser;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;import java.util.List;import java.util.Map;import java.util.stream.Collectors;/** * Canal运行器 * 手动管理生命周期 * * 1、启动Canal实例 * 2、处理解析后的数据 */@Slf4jpublic class CanalRunner {    private Thread thread;    private final String canalIp;    private final Integer canalPort;    private final String canalInstance;    private final Integer batchSize;    private final Integer sleepTime;    private final JsonMessageParser jsonMessageParser;    private final CanalWorkRegistry workRegistry;    public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize,                       Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) {        this.canalIp = canalIp;        this.canalPort = canalPort;        this.canalInstance = canalInstance;        this.batchSize = batchSize;        this.sleepTime = sleepTime;        this.jsonMessageParser = jsonMessageParser;        this.workRegistry = workRegistry;    }    /**     * 启动Canal实例     */    public void start() {        if (thread == null || !thread.isAlive()) {            thread = new Thread(this::run, "canal-runner-" + canalInstance);            thread.start();            log.info("Canal实例[{}]启动成功", canalInstance);        }    }    /**     * 停止Canal实例     */    public void stop() {        if (thread != null && !thread.isInterrupted()) {            thread.interrupt();        }    }    private void run() {        log.info("Canal实例[{}]启动中...", canalInstance);        CanalConnector connector = CanalConnectors.newSingleConnector(                new InetSocketAddress(canalIp, canalPort), canalInstance, "", "");        try {            connector.connect();            // 订阅所有表(后续通过Work过滤)            connector.subscribe();            connector.rollback();            while (!thread.isInterrupted()) {                Message message = connector.getWithoutAck(batchSize);                long batchId = message.getId();                List<CanalEntry.Entry> entries = message.getEntries();                if (batchId == -1 || entries.isEmpty()) {                    Thread.sleep(sleepTime);                } else {                    // 解析数据并处理                    Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries);                    processParsedData(parsedData);                    // 确认处理成功                    connector.ack(batchId);                }            }        } catch (InterruptedException e) {            log.info("Canal实例[{}]被中断", canalInstance);        } catch (Exception e) {            log.error("Canal实例[{}]运行异常", canalInstance, e);            // 处理失败回滚            connector.rollback();        } finally {            connector.disconnect();            log.info("Canal实例[{}]已停止", canalInstance);        }    }    /**     * 调用Work处理解析后的数据     *     * @param parsedData     */    private void processParsedData(Map<String, List<JsonMessageType>> parsedData) {        parsedData.forEach((tableKey, dataList) -> {            // 获取该表的所有Work            List<CanalWork> works = workRegistry.getWorksByTable(tableKey);            if (!works.isEmpty() && !dataList.isEmpty()) {                // 转换数据格式(Json字符串 -> Map)                List<Map<String, Object>> dataMaps = dataList.stream()                        .map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class))                        .collect(Collectors.toList());                String schemaName = dataList.get(0).getSchemaName();                // 调用每个Work的处理方法                works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName));            }        });    }    }
 MessageParser
 JsonMessageParser复制代码import com.alibaba.otter.canal.protocol.CanalEntry;import java.util.List;/** * 消息解析器接口 * */public interface MessageParser<T> {    T parse(List<CanalEntry.Entry> canalEntryList);}
7、CanalWorkRegistry匹配处理器:复制代码import com.alibaba.fastjson2.JSON;import com.alibaba.otter.canal.protocol.CanalEntry;import com.iven.canal.entity.DataEventTypeEnum;import com.iven.canal.entity.JsonMessageType;import com.iven.canal.handle.CanalWorkRegistry;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import java.util.*;/** * Json消息解析器 * * 1、遍历原始数据列表接收 * 2、解析行级变更数据 * 3、封装为 JsonParseType */@Slf4j@Component@RequiredArgsConstructorpublic class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> {    private final CanalWorkRegistry workRegistry;    @Override    public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) {        Map<String, List<JsonMessageType>> dataMap = new HashMap<>();        for (CanalEntry.Entry entry : canalEntryList) {            if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {                continue;            }            // 1. 获取库名、表名、带库名的表标识            String schemaName = entry.getHeader().getSchemaName();            String tableName = entry.getHeader().getTableName();            String fullTableName = schemaName + "." + tableName;            // 2. 检查是否有对应的处理器(支持两种格式)            boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName));            boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName));            if (!hasFullTableWork && !hasSimpleTableWork) {                log.debug("表[{}]和[{}]均无同步处理器,跳过", fullTableName, tableName);                continue;            }            try {                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                rowChange.getRowDatasList().forEach(rowData -> {                    JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData);                    if (jsonMessageType != null) {                        // 3. 按存在的处理器类型,分别添加到数据映射中                        if (hasFullTableWork) {                            dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType);                        }                        if (hasSimpleTableWork) {                            dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType);                        }                    }                });            } catch (Exception e) {                log.error("解析数据失败", e);            }        }        return dataMap;    }    private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType,                                         CanalEntry.RowData rowData) {        // 获取库名        String schemaName = header.getSchemaName();        // 获取表名        String tableName = header.getTableName();        if (eventType == CanalEntry.EventType.DELETE) {            return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList());        } else if (eventType == CanalEntry.EventType.INSERT) {            return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList());        } else if (eventType == CanalEntry.EventType.UPDATE) {            return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList());        }        return null;    }    private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType,                                        List<CanalEntry.Column> columns) {        Map<String, String> data = new HashMap<>();        columns.forEach(column -> data.put(column.getName(), column.getValue()));        JsonMessageType result = new JsonMessageType();        result.setSchemaName(schemaName);        result.setTableName(tableName);        result.setEventType(eventType);        result.setData(JSON.toJSONString(data));        return result;    }}
 CanalWork
 CanalWorkRegistry复制代码import java.util.List;import java.util.Map;/** * Canal-Work处理器 * */public interface CanalWork {    /**     * 返回需要处理的表名(如:tb_user)     */    String getTableName();    /**     * 处理表数据的方法     * @param dataList 表数据列表(每条数据是字段名-值的Map)     * @param eventType 事件类型(INSERT/UPDATE/DELETE)     * @param schemaName 库名(用于区分不同库的表)     */    void handle(List<Map<String, Object>> dataList, String eventType, String schemaName);}
8、CanalWork实现类处理数据:复制代码import lombok.extern.slf4j.Slf4j;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;/** * 处理器注册器, * 扫描并缓存所有CanalWork实现类,按表名分组管理,提供查询表对应处理器的方法 */@Slf4j@Componentpublic class CanalWorkRegistry implements ApplicationContextAware {    /**     * 表名 -> Work列表(支持一个表多个Work)     */    private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>();    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        // 扫描所有CanalWork实现类        Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class);        // 按表名分组        tableWorkMap.putAll(workMap.values().stream()                .collect(Collectors.groupingBy(CanalWork::getTableName)));        log.info("已注册的表同步处理器: {}", tableWorkMap.keySet());    }    /**     * 获取指定表的Work列表     *     * @param tableName     * @return     */    public List<CanalWork> getWorksByTable(String tableName) {        return tableWorkMap.getOrDefault(tableName, Collections.emptyList());    }    /**     * 判断是否有表需要处理     *     * @return     */    public boolean hasWork() {        return !tableWorkMap.isEmpty();    }}
 复制代码import com.iven.canal.entity.DataEventTypeEnum;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import java.util.List;import java.util.Map;/** * tb_user表数据处理 * * Canal服务 → 变更数据 → CanalRunner 拉取 → JsonMessageParser 解析 → * 筛选出 tb_user 数据 → CanalWorkRegistry 获取 TbUserCanalWorkHandle → * 调用 handle 方法 → 按事件类型(INSERT/UPDATE/DELETE)执行对应逻辑 */@Slf4j@Componentpublic class TbUserCanalWorkHandle implements CanalWork {    @Override    public String getTableName() {        return "demo.tb_user";    }    @Override    public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) {        log.info("开始处理[{}库]的tb_user表数据,事件类型:{},数据量:{}", schemaName, eventType, dataList.size());        DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType);                // 根据事件类型分别处理        switch (dataEventTypeEnum) {            case INSERT:                handleInsert(dataList, schemaName);                break;            case UPDATE:                handleUpdate(dataList, schemaName);                break;            case DELETE:                handleDelete(dataList, schemaName);                break;            default:                log.warn("未处理的事件类型:{}", eventType);        }    }    /**     * 处理新增数据     */    private void handleInsert(List<Map<String, Object>> dataList, String schemaName) {        log.info("处理[{}库]的tb_user新增数据,共{}条", schemaName, dataList.size());        dataList.forEach(data -> {            Object userId = data.get("id");            Object username = data.get("name");            // 新增逻辑:如同步到ES、缓存初始化等            log.info("新增用户 - ID: {}, 用户名: {}", userId, username);        });    }    /**     * 处理更新数据     */    private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) {        log.info("处理[{}库]的tb_user更新数据,共{}条", schemaName, dataList.size());        dataList.forEach(data -> {            Object userId = data.get("id");            Object newPhone = data.get("phone"); // 假设更新了手机号            // 更新逻辑:如更新ES文档、刷新缓存等            log.info("更新用户 - ID: {}, 新手机号: {}", userId, newPhone);        });    }    /**     * 处理删除数据     */    private void handleDelete(List<Map<String, Object>> dataList, String schemaName) {        log.info("处理[{}库]的tb_user删除数据,共{}条", schemaName, dataList.size());        dataList.forEach(data -> {            Object userId = data.get("id");            // 删除逻辑:如从ES删除、清除缓存等            log.info("删除用户 - ID: {}", userId);        });    }}
 调度流程:
 整个流程通过注册器管理处理器、解析器转换数据格式、运行器控制 Canal 客户端生命周期,最终将数据库变更事件分发到对应表的处理器,实现了变更数据的监听与业务处理解耦。用户只需实现CanalWork接口,即可自定义任意表的变更处理逻辑。
 (1)、初始化阶段
 1)、Spring 容器启动时,CanalWorkRegistry 扫描所有 CanalWork 实现类(如 TbUserCanalWorkHandle),按表名分组缓存到 tableWorkMap 中。
 2)、CanalRunnerAutoConfig 检查配置(CanalProperties),若开启自动同步且存在 CanalWork,则为每个 Canal 实例创建 CanalRunner 并启动。
 (2)、运行阶段
 1)、CanalRunner 建立与 Canal 服务的连接,订阅数据库变更事件。
 2)、循环拉取变更数据(Message),通过 JsonMessageParser 解析为表名 - 数据列表的映射(Map)。
 3)、调用 processParsedData 方法,根据表名从 CanalWorkRegistry 获取对应的 CanalWork 列表,执行 handle 方法处理数据。
 (3)、销毁阶段
 程序停止时,CanalRunner 中断线程,断开与 Canal 服务的连接。
 
 
 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
 |