找回密码
 立即注册
首页 业界区 业界 MySQL同步ES的 5 种方案

MySQL同步ES的 5 种方案

毕余馥 2025-10-1 19:01:51
前言

有些小伙伴在工作中可能遇到过数据库查询慢的问题,特别是模糊查询和复杂聚合查询,这时候引入ES(Elasticsearch)作为搜索引擎是个不错的选择。
今天我们来聊聊MySQL同步到ES(Elasticsearch)的5种常见方案。
希望对你会有所帮助。
一、为什么需要MySQL同步到ES?

在我们深入讨论方案之前,先明确一下为什么需要将MySQL数据同步到ES:

  • 全文搜索能力:ES提供强大的全文搜索功能,远超MySQL的LIKE查询。
  • 复杂聚合分析:ES支持复杂的聚合查询,适合大数据分析。
  • 高性能查询:ES的倒排索引设计使查询速度极快。
  • 水平扩展:ES天生支持分布式,易于水平扩展。
先来看一下整体的同步架构图:
1.png

接下来,我们详细分析每种方案的实现原理和优缺点。
二、方案一:双写方案

双写方案是最直接的同步方式,即在业务代码中同时向MySQL和ES写入数据。
示例代码:
  1. @Service
  2. public class UserService {
  3.     @Autowired
  4.     private UserMapper userMapper;
  5.    
  6.     @Autowired
  7.     private ElasticsearchTemplate elasticsearchTemplate;
  8.    
  9.     @Transactional
  10.     public void addUser(User user) {
  11.         // 写入MySQL
  12.         userMapper.insert(user);
  13.         
  14.         // 写入Elasticsearch
  15.         IndexQuery indexQuery = new IndexQueryBuilder()
  16.             .withObject(user)
  17.             .withId(user.getId().toString())
  18.             .build();
  19.         elasticsearchTemplate.index(indexQuery);
  20.     }
  21.    
  22.     @Transactional
  23.     public void updateUser(User user) {
  24.         // 更新MySQL
  25.         userMapper.updateById(user);
  26.         
  27.         // 更新Elasticsearch
  28.         IndexRequest request = new IndexRequest("user_index")
  29.             .id(user.getId().toString())
  30.             .source(JSON.toJSONString(user), XContentType.JSON);
  31.         elasticsearchTemplate.getClient().index(request, RequestOptions.DEFAULT);
  32.     }
  33. }
复制代码
优缺点分析

优点:

  • 实现简单,不需要引入额外组件
  • 实时性高,数据立即同步
缺点:

  • 数据一致性难保证,需要处理分布式事务问题
  • 代码侵入性强,业务逻辑复杂
  • 性能受影响,每次写操作都要等待ES响应
适用场景

适合数据量不大,对实时性要求高,且能够接受一定数据不一致的业务场景。
三、方案二:定时任务方案

定时任务方案通过定期扫描MySQL数据变化来同步到ES。
示例代码:
  1. @Component
  2. public class UserSyncTask {
  3.     @Autowired
  4.     private UserMapper userMapper;
  5.    
  6.     @Autowired
  7.     private UserESRepository userESRepository;
  8.    
  9.     // 每5分钟执行一次
  10.     @Scheduled(fixedRate = 5 * 60 * 1000)
  11.     public void syncUserToES() {
  12.         // 查询最近更新的数据
  13.         Date lastSyncTime = getLastSyncTime();
  14.         List<User> updatedUsers = userMapper.selectUpdatedAfter(lastSyncTime);
  15.         
  16.         // 同步到ES
  17.         for (User user : updatedUsers) {
  18.             userESRepository.save(user);
  19.         }
  20.         
  21.         // 更新最后同步时间
  22.         updateLastSyncTime(new Date());
  23.     }
  24.    
  25.     // 获取最后同步时间
  26.     private Date getLastSyncTime() {
  27.         // 从数据库或Redis中获取
  28.         // ...
  29.     }
  30. }
复制代码
数据更新追踪策略

为了提高同步效率,通常需要设计良好的数据变更追踪机制:
2.png

优缺点分析

优点:

  • 实现简单,不需要修改现有业务代码
  • 对数据库压力可控,可以调整同步频率
缺点:

  • 实时性差,数据同步有延迟
  • 可能遗漏数据,如果系统崩溃会丢失部分数据
  • 扫描全表可能对数据库造成压力
适用场景

适合对实时性要求不高,数据变更不频繁的场景。
四、方案三:Binlog同步方案

Binlog是MySQL的二进制日志,记录了所有数据变更操作。
通过解析Binlog可以实现数据同步。
示例代码:
  1. public class BinlogSyncService {
  2.     public void startSync() {
  3.         BinaryLogClient client = new BinaryLogClient("localhost", 3306, "username", "password");
  4.         
  5.         client.registerEventListener(new BinaryLogClient.EventListener() {
  6.             @Override
  7.             public void onEvent(Event event) {
  8.                 EventData eventData = event.getData();
  9.                
  10.                 if (eventData instanceof WriteRowsEventData) {
  11.                     // 插入操作
  12.                     WriteRowsEventData writeData = (WriteRowsEventData) eventData;
  13.                     processInsertEvent(writeData);
  14.                 } else if (eventData instanceof UpdateRowsEventData) {
  15.                     // 更新操作
  16.                     UpdateRowsEventData updateData = (UpdateRowsEventData) eventData;
  17.                     processUpdateEvent(updateData);
  18.                 } else if (eventData instanceof DeleteRowsEventData) {
  19.                     // 删除操作
  20.                     DeleteRowsEventData deleteData = (DeleteRowsEventData) eventData;
  21.                     processDeleteEvent(deleteData);
  22.                 }
  23.             }
  24.         });
  25.         
  26.         client.connect();
  27.     }
  28.    
  29.     private void processInsertEvent(WriteRowsEventData eventData) {
  30.         // 处理插入事件,同步到ES
  31.         for (Serializable[] row : eventData.getRows()) {
  32.             User user = convertRowToUser(row);
  33.             syncToElasticsearch(user, "insert");
  34.         }
  35.     }
  36.    
  37.     private void syncToElasticsearch(User user, String operation) {
  38.         // 同步到ES的实现
  39.         // ...
  40.     }
  41. }
复制代码
优缺点分析

优点:

  • 实时性高,几乎实时同步
  • 对业务代码无侵入,不需要修改现有代码
  • 性能好,不影响数据库性能
缺点:

  • 实现复杂,需要解析Binlog格式
  • 需要考虑Binlog格式变更的兼容性问题
  • 主从切换时可能需要重新同步
适用场景

适合对实时性要求高,数据量大的场景。
五、方案四:Canal方案

Canal是阿里巴巴开源的MySQL Binlog增量订阅&消费组件,简化了Binlog同步的复杂性。
示例代码:
  1. # canal.properties 配置
  2. canal.instance.master.address = 127.0.0.1:3306
  3. canal.instance.dbUsername = username
  4. canal.instance.dbPassword = password
  5. canal.instance.connectionCharset = UTF-8
  6. canal.instance.filter.regex = .*\\..*
复制代码
  1. public class CanalClientExample {
  2.     public static void main(String[] args) {
  3.         // 创建Canal连接
  4.         CanalConnector connector = CanalConnectors.newSingleConnector(
  5.             new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
  6.         
  7.         try {
  8.             connector.connect();
  9.             connector.subscribe(".*\\..*");
  10.             
  11.             while (true) {
  12.                 Message message = connector.getWithoutAck(100);
  13.                 long batchId = message.getId();
  14.                
  15.                 if (batchId != -1 && !message.getEntries().isEmpty()) {
  16.                     processEntries(message.getEntries());
  17.                     connector.ack(batchId); // 提交确认
  18.                 }
  19.                
  20.                 Thread.sleep(1000);
  21.             }
  22.         } finally {
  23.             connector.disconnect();
  24.         }
  25.     }
  26.    
  27.     private static void processEntries(List<CanalEntry.Entry> entries) {
  28.         for (CanalEntry.Entry entry : entries) {
  29.             if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
  30.                 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  31.                
  32.                 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
  33.                     if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
  34.                         processInsert(rowData);
  35.                     } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
  36.                         processUpdate(rowData);
  37.                     } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
  38.                         processDelete(rowData);
  39.                     }
  40.                 }
  41.             }
  42.         }
  43.     }
  44. }
复制代码
架构设计

Calan方案的架构如下:
3.png

优缺点分析

优点:

  • 实时性高,延迟低
  • 对业务系统无侵入
  • 阿里巴巴开源项目,社区活跃
缺点:

  • 需要部署维护Canal服务器
  • 需要处理网络分区和故障恢复
  • 可能产生数据重复同步问题
适用场景

适合大数据量、高实时性要求的场景,且有专门团队维护中间件。
六、方案五:MQ异步方案

MQ异步方案通过消息队列解耦MySQL和ES的同步过程,提高系统的可靠性和扩展性。
示例代码:
  1. @Service
  2. public class UserService {
  3.     @Autowired
  4.     private UserMapper userMapper;
  5.    
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.    
  9.     @Transactional
  10.     public void addUser(User user) {
  11.         // 写入MySQL
  12.         userMapper.insert(user);
  13.         
  14.         // 发送消息到MQ
  15.         rabbitTemplate.convertAndSend("user.exchange", "user.add", user);
  16.     }
  17.    
  18.     @Transactional
  19.     public void updateUser(User user) {
  20.         // 更新MySQL
  21.         userMapper.updateById(user);
  22.         
  23.         // 发送消息到MQ
  24.         rabbitTemplate.convertAndSend("user.exchange", "user.update", user);
  25.     }
  26. }
  27. @Component
  28. public class UserMQConsumer {
  29.     @Autowired
  30.     private UserESRepository userESRepository;
  31.    
  32.     @RabbitListener(queues = "user.queue")
  33.     public void processUserAdd(User user) {
  34.         userESRepository.save(user);
  35.     }
  36.    
  37.     @RabbitListener(queues = "user.queue")
  38.     public void processUserUpdate(User user) {
  39.         userESRepository.save(user);
  40.     }
  41.    
  42.     @RabbitListener(queues = "user.queue")
  43.     public void processUserDelete(Long userId) {
  44.         userESRepository.deleteById(userId);
  45.     }
  46. }
复制代码
消息队列选型对比

不同的消息队列产品有不同特点,下面是常见MQ的对比:
4.png

优缺点分析

优点:

  • 完全解耦,MySQL和ES同步过程相互独立
  • 高可用,MQ本身提供消息持久化和重试机制
  • 可扩展,可以方便地增加消费者处理消息
缺点:

  • 系统复杂度增加,需要维护MQ集群
  • 可能产生消息顺序问题,需要处理消息顺序性
  • 数据一致性延迟,依赖于消息消费速度
适用场景

适合大型分布式系统,对可靠性和扩展性要求高的场景。
七、5种方案对比

为了更直观地比较这5种方案,我们来看一个综合对比表格:
方案名称实时性数据一致性系统复杂度性能影响适用场景双写方案高难保证低高小规模应用定时任务低最终一致低中非实时场景Binlog方案高最终一致高低大数据量高实时Canal方案高最终一致中低大数据量高实时MQ异步方案中最终一致高低分布式大型系统选择建议

有些小伙伴在工作中可能会纠结选择哪种方案,这里给出一些建议:

  • 初创项目或小规模系统:可以选择双写方案或定时任务方案,实现简单。
  • 中大型系统:建议使用Canal方案或MQ异步方案,保证系统的可靠性和扩展性。
  • 大数据量高实时要求:Binlog方案或Canal方案是最佳选择。
  • 已有MQ基础设施:优先考虑MQ异步方案,充分利用现有资源。
注意事项

无论选择哪种方案,都需要注意以下几点:

  • 幂等性处理:同步过程需要保证幂等性,防止重复数据。
  • 监控告警:建立完善的监控体系,及时发现同步延迟或失败。
  • 数据校验:定期校验MySQL和ES中的数据一致性。
  • 容错机制:设计良好的故障恢复机制,避免数据丢失。
总结

MySQL同步到ES(Elasticsearch)是现代应用开发中常见的需求,选择合适的同步方案对系统性能和可靠性至关重要。
本文介绍了5种常见方案,各有优缺点,适用于不同场景。
在实际项目中,可能需要根据具体需求组合使用多种方案,或者对某种方案进行定制化改造。
重要的是要理解每种方案的原理和特点,才能做出合理的技术选型。
希望这篇文章对大家有所帮助,如果有任何问题或建议,欢迎在评论区留言讨论!
最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。
本文收录于我的技术网站:http://www.susan.net.cn

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册