找回密码
 立即注册
首页 业界区 业界 聊聊动态数据源

聊聊动态数据源

诈知 2025-11-23 12:00:51
前言

咱们星球中的商城系统中使用了动态数据源的功能,实现了分库分表的订单库的读库和写库的自动切换。
有球友反馈说,对动态数据源不太熟悉。
今天这篇文章就专门跟大家一起聊聊动态数据源,希望对你会有所帮助。
一、为什么需要动态数据源?

有些小伙伴在开发中可能会遇到这样的场景:一个系统需要同时访问多个数据库,或者需要根据业务参数动态选择数据源。这
时候,传统的单数据源配置就显得力不从心了。
1.1 传统多数据源的问题

传统方式的多个数据源配置,硬编码,不灵活。
例如下面这样:
  1. @Configuration
  2. public class TraditionalDataSourceConfig {
  3.    
  4.     @Bean
  5.     @Primary
  6.     public DataSource primaryDataSource() {
  7.         HikariDataSource dataSource = new HikariDataSource();
  8.         dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db1");
  9.         dataSource.setUsername("user1");
  10.         dataSource.setPassword("pass1");
  11.         return dataSource;
  12.     }
  13.    
  14.     @Bean
  15.     public DataSource secondaryDataSource() {
  16.         HikariDataSource dataSource = new HikariDataSource();
  17.         dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db2");
  18.         dataSource.setUsername("user2");
  19.         dataSource.setPassword("pass2");
  20.         return dataSource;
  21.     }
  22. }
复制代码
使用时需要手动管理数据源。
  1. @Repository
  2. public class TraditionalUserDao {
  3.    
  4.     @Autowired
  5.     @Qualifier("primaryDataSource")
  6.     private DataSource primaryDataSource;
  7.    
  8.     @Autowired
  9.     @Qualifier("secondaryDataSource")
  10.     private DataSource secondaryDataSource;
  11.    
  12.     public User findUserFromPrimary(Long id) {
  13.         // 需要手动获取连接、处理异常、关闭连接
  14.         try (Connection conn = primaryDataSource.getConnection();
  15.              PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?")) {
  16.             stmt.setLong(1, id);
  17.             ResultSet rs = stmt.executeQuery();
  18.             // 处理结果集...
  19.         } catch (SQLException e) {
  20.             throw new RuntimeException("查询失败", e);
  21.         }
  22.     }
  23.   
  24. }
复制代码
每个方法都要重复这样的模板代码,需要手动指定数据源,很麻烦。
那么,如何做优化呢?
1.2 动态数据源的优势

接下来,我们一起看看使用动态数据源后的优雅代码。
  1. @Service
  2. public class UserService {
  3.    
  4.     @Autowired
  5.     private UserMapper userMapper;
  6.    
  7.     // 根据租户ID自动选择数据源
  8.     public User findUserByTenant(Long userId, String tenantId) {
  9.         // 设置数据源上下文
  10.         DataSourceContextHolder.setDataSource(tenantId);
  11.         try {
  12.             return userMapper.selectById(userId);
  13.         } finally {
  14.             // 清理上下文
  15.             DataSourceContextHolder.clear();
  16.         }
  17.     }
  18.    
  19.     // 多租户数据聚合查询
  20.     public UserAggregateInfo getUserAggregateInfo(Long userId) {
  21.         UserAggregateInfo result = new UserAggregateInfo();
  22.         
  23.         // 查询主库
  24.         DataSourceContextHolder.setDataSource("master");
  25.         result.setBaseInfo(userMapper.selectById(userId));
  26.         
  27.         // 查询归档库
  28.         DataSourceContextHolder.setDataSource("archive");
  29.         result.setHistory(userMapper.selectHistory(userId));
  30.         
  31.         // 查询统计库
  32.         DataSourceContextHolder.setDataSource("stats");
  33.         result.setStatistics(userMapper.selectStats(userId));
  34.         
  35.         return result;
  36.     }
  37. }
复制代码
代码中能根据租户ID自动选择数据源。
代码一下子变得更优雅了。
二、动态数据源的原理

有些小伙伴在使用动态数据源时,可能只是简单配置使用,并不清楚其底层工作原理。
理解核心原理对于排查问题和性能优化至关重要。
下面跟大家一起聊聊动态数据源的核心原理,希望对你会有所帮助。
数据源路由的核心机制

动态数据源的核心在于AbstractRoutingDataSource,它是Spring框架提供的抽象类:
  1. // Spring AbstractRoutingDataSource 源码分析
  2. public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
  3.    
  4.     // 目标数据源映射表
  5.     private Map<Object, Object> targetDataSources;
  6.    
  7.     // 默认数据源
  8.     private Object defaultTargetDataSource;
  9.    
  10.     // 解析后的数据源映射
  11.     private Map<Object, DataSource> resolvedDataSources;
  12.    
  13.     // 解析后的默认数据源
  14.     private DataSource resolvedDefaultDataSource;
  15.    
  16.     // 关键方法:确定当前查找键
  17.     protected abstract Object determineCurrentLookupKey();
  18.    
  19.     // 获取连接时选择数据源
  20.     @Override
  21.     public Connection getConnection() throws SQLException {
  22.         return determineTargetDataSource().getConnection();
  23.     }
  24.    
  25.     @Override
  26.     public Connection getConnection(String username, String password) throws SQLException {
  27.         return determineTargetDataSource().getConnection(username, password);
  28.     }
  29.    
  30.     // 确定目标数据源
  31.     protected DataSource determineTargetDataSource() {
  32.         // 获取查找键
  33.         Object lookupKey = determineCurrentLookupKey();
  34.         
  35.         // 根据查找键获取数据源
  36.         DataSource dataSource = this.resolvedDataSources.get(lookupKey);
  37.         if (dataSource == null && (this.resolvedDefaultDataSource != null || lookupKey == null)) {
  38.             dataSource = this.resolvedDefaultDataSource;
  39.         }
  40.         if (dataSource == null) {
  41.             throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
  42.         }
  43.         return dataSource;
  44.     }
  45. }
复制代码
线程安全的数据源上下文管理
  1. /**
  2. * 数据源上下文管理器 - 核心组件
  3. * 使用ThreadLocal保证线程安全
  4. */
  5. public class DataSourceContextHolder {
  6.    
  7.     // 使用ThreadLocal保证线程隔离
  8.     private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
  9.    
  10.     // 支持嵌套数据源切换的栈
  11.     private static final ThreadLocal<Deque<String>> DATASOURCE_STACK = ThreadLocal.withInitial(ArrayDeque::new);
  12.    
  13.     // 设置数据源
  14.     public static void setDataSource(String dataSource) {
  15.         if (dataSource == null) {
  16.             throw new IllegalArgumentException("数据源不能为null");
  17.         }
  18.         CONTEXT_HOLDER.set(dataSource);
  19.         
  20.         // 同时压入栈,支持嵌套调用
  21.         DATASOURCE_STACK.get().push(dataSource);
  22.     }
  23.    
  24.     // 获取当前数据源
  25.     public static String getDataSource() {
  26.         return CONTEXT_HOLDER.get();
  27.     }
  28.    
  29.     // 清除数据源
  30.     public static void clear() {
  31.         CONTEXT_HOLDER.remove();
  32.         Deque<String> stack = DATASOURCE_STACK.get();
  33.         if (!stack.isEmpty()) {
  34.             stack.pop();
  35.             // 如果栈中还有元素,恢复到上一个数据源
  36.             if (!stack.isEmpty()) {
  37.                 CONTEXT_HOLDER.set(stack.peek());
  38.             }
  39.         }
  40.     }
  41.    
  42.     // 强制清除所有上下文(用于线程池场景)
  43.     public static void clearCompletely() {
  44.         CONTEXT_HOLDER.remove();
  45.         DATASOURCE_STACK.get().clear();
  46.     }
  47.    
  48.     // 判断是否已设置数据源
  49.     public static boolean hasDataSource() {
  50.         return CONTEXT_HOLDER.get() != null;
  51.     }
  52. }
  53. /**
  54. * 自定义路由数据源
  55. */
  56. @Component
  57. public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
  58.    
  59.     // 所有可用的数据源
  60.     private final Map<Object, Object> targetDataSources = new ConcurrentHashMap<>();
  61.    
  62.     @Override
  63.     protected Object determineCurrentLookupKey() {
  64.         String dataSourceKey = DataSourceContextHolder.getDataSource();
  65.         
  66.         if (dataSourceKey == null) {
  67.             // 返回默认数据源
  68.             return "default";
  69.         }
  70.         
  71.         // 验证数据源是否存在
  72.         if (!targetDataSources.containsKey(dataSourceKey)) {
  73.             throw new IllegalArgumentException("数据源 " + dataSourceKey + " 不存在");
  74.         }
  75.         
  76.         logger.debug("当前使用数据源: {}", dataSourceKey);
  77.         return dataSourceKey;
  78.     }
  79.    
  80.     // 添加数据源
  81.     public void addDataSource(String key, DataSource dataSource) {
  82.         this.targetDataSources.put(key, dataSource);
  83.         // 更新目标数据源映射
  84.         setTargetDataSources(new HashMap<>(this.targetDataSources));
  85.         // 重新初始化
  86.         afterPropertiesSet();
  87.     }
  88.    
  89.     // 移除数据源
  90.     public void removeDataSource(String key) {
  91.         if (this.targetDataSources.containsKey(key)) {
  92.             DataSource dataSource = (DataSource) this.targetDataSources.remove(key);
  93.             // 关闭数据源连接池
  94.             closeDataSource(dataSource);
  95.             // 更新目标数据源映射
  96.             setTargetDataSources(new HashMap<>(this.targetDataSources));
  97.             afterPropertiesSet();
  98.         }
  99.     }
  100.    
  101.     // 获取所有数据源
  102.     public Map<Object, Object> getTargetDataSources() {
  103.         return Collections.unmodifiableMap(targetDataSources);
  104.     }
  105.    
  106.     private void closeDataSource(DataSource dataSource) {
  107.         if (dataSource instanceof HikariDataSource) {
  108.             ((HikariDataSource) dataSource).close();
  109.         } else if (dataSource instanceof org.apache.tomcat.jdbc.pool.DataSource) {
  110.             ((org.apache.tomcat.jdbc.pool.DataSource) dataSource).close();
  111.         }
  112.         // 其他类型的数据源关闭逻辑...
  113.     }
  114. }
复制代码
动态数据源执行流程

1.png

三、基于Spring Boot的完整实现

有些小伙伴在配置动态数据源时可能会遇到各种问题,下面我提供一个生产级别的完整实现。
完整配置实现
  1. /**
  2. * 动态数据源配置类
  3. */
  4. @Configuration
  5. @EnableTransactionManagement
  6. @EnableConfigurationProperties(DynamicDataSourceProperties.class)
  7. public class DynamicDataSourceConfig {
  8.    
  9.     @Autowired
  10.     private DynamicDataSourceProperties properties;
  11.    
  12.     /**
  13.      * 主数据源(默认数据源)
  14.      */
  15.     @Bean
  16.     @ConfigurationProperties(prefix = "spring.datasource.master")
  17.     public DataSource masterDataSource() {
  18.         return DataSourceBuilder.create().build();
  19.     }
  20.    
  21.     /**
  22.      * 从数据源1
  23.      */
  24.     @Bean
  25.     @ConfigurationProperties(prefix = "spring.datasource.slave1")
  26.     public DataSource slave1DataSource() {
  27.         return DataSourceBuilder.create().build();
  28.     }
  29.    
  30.     /**
  31.      * 从数据源2
  32.      */
  33.     @Bean
  34.     @ConfigurationProperties(prefix = "spring.datasource.slave2")
  35.     public DataSource slave2DataSource() {
  36.         return DataSourceBuilder.create().build();
  37.     }
  38.    
  39.     /**
  40.      * 动态数据源
  41.      */
  42.     @Bean
  43.     @Primary
  44.     public DataSource dynamicDataSource(DataSource masterDataSource,
  45.                                        DataSource slave1DataSource,
  46.                                        DataSource slave2DataSource) {
  47.         
  48.         Map<Object, Object> targetDataSources = new HashMap<>(8);
  49.         targetDataSources.put("master", masterDataSource);
  50.         targetDataSources.put("slave1", slave1DataSource);
  51.         targetDataSources.put("slave2", slave2DataSource);
  52.         
  53.         DynamicRoutingDataSource dynamicDataSource = new DynamicRoutingDataSource();
  54.         // 设置默认数据源
  55.         dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
  56.         // 设置目标数据源
  57.         dynamicDataSource.setTargetDataSources(targetDataSources);
  58.         
  59.         return dynamicDataSource;
  60.     }
  61.    
  62.     /**
  63.      * 事务管理器
  64.      */
  65.     @Bean
  66.     public PlatformTransactionManager transactionManager(DataSource dynamicDataSource) {
  67.         return new DataSourceTransactionManager(dynamicDataSource);
  68.     }
  69.    
  70.     /**
  71.      * MyBatis配置
  72.      */
  73.     @Bean
  74.     public SqlSessionFactory sqlSessionFactory(DataSource dynamicDataSource) throws Exception {
  75.         SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
  76.         sessionFactory.setDataSource(dynamicDataSource);
  77.         
  78.         // 配置MyBatis
  79.         org.apache.ibatis.session.Configuration configuration =
  80.             new org.apache.ibatis.session.Configuration();
  81.         configuration.setMapUnderscoreToCamelCase(true);
  82.         configuration.setCacheEnabled(true);
  83.         configuration.setLazyLoadingEnabled(false);
  84.         configuration.setAggressiveLazyLoading(false);
  85.         sessionFactory.setConfiguration(configuration);
  86.         
  87.         return sessionFactory.getObject();
  88.     }
  89. }
  90. /**
  91. * 数据源配置属性类
  92. */
  93. @ConfigurationProperties(prefix = "spring.datasource")
  94. @Data
  95. public class DynamicDataSourceProperties {
  96.    
  97.     /**
  98.      * 主数据源配置
  99.      */
  100.     private HikariConfig master = new HikariConfig();
  101.    
  102.     /**
  103.      * 从数据源1配置
  104.      */
  105.     private HikariConfig slave1 = new HikariConfig();
  106.    
  107.     /**
  108.      * 从数据源2配置
  109.      */
  110.     private HikariConfig slave2 = new HikariConfig();
  111.    
  112.     /**
  113.      * 动态数据源配置
  114.      */
  115.     private DynamicConfig dynamic = new DynamicConfig();
  116.    
  117.     @Data
  118.     public static class DynamicConfig {
  119.         /**
  120.          * 默认数据源
  121.          */
  122.         private String primary = "master";
  123.         
  124.         /**
  125.          * 是否开启严格模式
  126.          */
  127.         private boolean strict = false;
  128.         
  129.         /**
  130.          * 数据源健康检查间隔(秒)
  131.          */
  132.         private long healthCheckInterval = 30;
  133.     }
  134. }
复制代码
应用配置文件
  1. # application.yml
  2. spring:
  3.   datasource:
  4.     # 动态数据源配置
  5.     dynamic:
  6.       primary: master
  7.       strict: true
  8.       health-check-interval: 30
  9.    
  10.     # 主数据源
  11.     master:
  12.       jdbc-url: jdbc:mysql://localhost:3306/master_db?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
  13.       username: root
  14.       password: master_password
  15.       driver-class-name: com.mysql.cj.jdbc.Driver
  16.       maximum-pool-size: 20
  17.       minimum-idle: 5
  18.       connection-timeout: 30000
  19.       idle-timeout: 600000
  20.       max-lifetime: 1800000
  21.       pool-name: MasterHikariPool
  22.    
  23.     # 从数据源1
  24.     slave1:
  25.       jdbc-url: jdbc:mysql://slave1:3306/slave_db?useUnicode=true&characterEncoding=utf8
  26.       username: root
  27.       password: slave1_password
  28.       driver-class-name: com.mysql.cj.jdbc.Driver
  29.       maximum-pool-size: 15
  30.       minimum-idle: 3
  31.       connection-timeout: 30000
  32.       idle-timeout: 600000
  33.       max-lifetime: 1800000
  34.       pool-name: Slave1HikariPool
  35.    
  36.     # 从数据源2
  37.     slave2:
  38.       jdbc-url: jdbc:mysql://slave2:3306/slave_db?useUnicode=true&characterEncoding=utf8
  39.       username: root
  40.       password: slave2_password
  41.       driver-class-name: com.mysql.cj.jdbc.Driver
  42.       maximum-pool-size: 15
  43.       minimum-idle: 3
  44.       connection-timeout: 30000
  45.       idle-timeout: 600000
  46.       max-lifetime: 1800000
  47.       pool-name: Slave2HikariPool
  48. # MyBatis配置
  49. mybatis:
  50.   configuration:
  51.     map-underscore-to-camel-case: true
  52.     cache-enabled: true
  53.     lazy-loading-enabled: false
  54.     aggressive-lazy-loading: false
复制代码
注解式数据源切换
  1. /**
  2. * 数据源注解
  3. */
  4. @Target({ElementType.METHOD, ElementType.TYPE})
  5. @Retention(RetentionPolicy.RUNTIME)
  6. @Documented
  7. public @interface DataSource {
  8.    
  9.     /**
  10.      * 数据源名称
  11.      */
  12.     String value() default "master";
  13.    
  14.     /**
  15.      * 是否在方法执行后清除数据源(默认清除)
  16.      */
  17.     boolean clear() default true;
  18. }
  19. /**
  20. * 数据源切面
  21. */
  22. @Aspect
  23. @Component
  24. @Slf4j
  25. public class DataSourceAspect {
  26.    
  27.     /**
  28.      * 定义切点:所有标注@DataSource注解的方法
  29.      */
  30.     @Pointcut("@annotation(com.example.annotation.DataSource)")
  31.     public void dataSourcePointCut() {}
  32.    
  33.     /**
  34.      * 环绕通知:在方法执行前后切换数据源
  35.      */
  36.     @Around("dataSourcePointCut()")
  37.     public Object around(ProceedingJoinPoint point) throws Throwable {
  38.         MethodSignature signature = (MethodSignature) point.getSignature();
  39.         Method method = signature.getMethod();
  40.         
  41.         DataSource dataSourceAnnotation = method.getAnnotation(DataSource.class);
  42.         if (dataSourceAnnotation == null) {
  43.             // 类级别注解
  44.             dataSourceAnnotation = point.getTarget().getClass().getAnnotation(DataSource.class);
  45.         }
  46.         
  47.         if (dataSourceAnnotation != null) {
  48.             String dataSourceKey = dataSourceAnnotation.value();
  49.             boolean clearAfter = dataSourceAnnotation.clear();
  50.             
  51.             try {
  52.                 log.debug("切换数据源到: {}", dataSourceKey);
  53.                 DataSourceContextHolder.setDataSource(dataSourceKey);
  54.                
  55.                 // 执行原方法
  56.                 return point.proceed();
  57.                
  58.             } finally {
  59.                 if (clearAfter) {
  60.                     DataSourceContextHolder.clear();
  61.                     log.debug("清除数据源上下文");
  62.                 }
  63.             }
  64.         }
  65.         
  66.         // 没有注解,使用默认数据源
  67.         return point.proceed();
  68.     }
  69. }
复制代码
四、高级特性

有些小伙伴在基础功能实现后,可能会遇到一些高级场景的需求。
下面介绍几个生产环境中常用的高级特性。
读写分离自动路由
  1. /**
  2. * 读写分离数据源路由器
  3. */
  4. @Component
  5. @Slf4j
  6. public class ReadWriteDataSourceRouter {
  7.    
  8.     // 读数据源列表
  9.     private final List<String> readDataSources = Arrays.asList("slave1", "slave2");
  10.    
  11.     // 轮询计数器
  12.     private final AtomicInteger counter = new AtomicInteger(0);
  13.    
  14.     /**
  15.      * 根据SQL自动选择数据源
  16.      */
  17.     public String determineDataSource(boolean isReadOperation) {
  18.         if (isReadOperation && !readDataSources.isEmpty()) {
  19.             // 读操作:轮询选择从库
  20.             int index = counter.getAndIncrement() % readDataSources.size();
  21.             if (counter.get() > 9999) {
  22.                 counter.set(0); // 防止溢出
  23.             }
  24.             String readDataSource = readDataSources.get(index);
  25.             log.debug("读操作选择数据源: {}", readDataSource);
  26.             return readDataSource;
  27.         } else {
  28.             // 写操作:选择主库
  29.             log.debug("写操作选择数据源: master");
  30.             return "master";
  31.         }
  32.     }
  33.    
  34.     /**
  35.      * 根据SQL语句判断是否为读操作
  36.      */
  37.     public boolean isReadOperation(String sql) {
  38.         if (sql == null) {
  39.             return true; // 默认为读操作
  40.         }
  41.         
  42.         String trimmedSql = sql.trim().toLowerCase();
  43.         return trimmedSql.startsWith("select") ||
  44.                trimmedSql.startsWith("show") ||
  45.                trimmedSql.startsWith("explain");
  46.     }
  47. }
  48. /**
  49. * MyBatis拦截器 - 自动读写分离
  50. */
  51. @Intercepts({
  52.     @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
  53.     @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
  54. })
  55. @Component
  56. @Slf4j
  57. public class ReadWriteInterceptor implements Interceptor {
  58.    
  59.     @Autowired
  60.     private ReadWriteDataSourceRouter dataSourceRouter;
  61.    
  62.     @Override
  63.     public Object intercept(Invocation invocation) throws Throwable {
  64.         String methodName = invocation.getMethod().getName();
  65.         MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
  66.         
  67.         boolean isReadOperation = "query".equals(methodName);
  68.         String sql = getSql(ms, invocation.getArgs()[1]);
  69.         
  70.         // 如果当前没有手动设置数据源,则自动选择
  71.         if (!DataSourceContextHolder.hasDataSource()) {
  72.             String dataSource = dataSourceRouter.determineDataSource(isReadOperation);
  73.             DataSourceContextHolder.setDataSource(dataSource);
  74.             
  75.             try {
  76.                 return invocation.proceed();
  77.             } finally {
  78.                 DataSourceContextHolder.clear();
  79.             }
  80.         }
  81.         
  82.         return invocation.proceed();
  83.     }
  84.    
  85.     private String getSql(MappedStatement ms, Object parameter) {
  86.         BoundSql boundSql = ms.getBoundSql(parameter);
  87.         return boundSql.getSql();
  88.     }
  89. }
复制代码
多租户数据源管理
  1. /**
  2. * 多租户数据源管理器
  3. */
  4. @Component
  5. @Slf4j
  6. public class TenantDataSourceManager {
  7.    
  8.     @Autowired
  9.     private DynamicRoutingDataSource dynamicRoutingDataSource;
  10.    
  11.     @Autowired
  12.     private DataSourceProperties dataSourceProperties;
  13.    
  14.     // 租户数据源配置缓存
  15.     private final Map<String, TenantDataSourceConfig> tenantConfigCache = new ConcurrentHashMap<>();
  16.    
  17.     /**
  18.      * 根据租户ID获取数据源
  19.      */
  20.     public DataSource getDataSourceForTenant(String tenantId) {
  21.         String dataSourceKey = "tenant_" + tenantId;
  22.         
  23.         // 检查是否已存在数据源
  24.         if (dynamicRoutingDataSource.getTargetDataSources().containsKey(dataSourceKey)) {
  25.             return (DataSource) dynamicRoutingDataSource.getTargetDataSources().get(dataSourceKey);
  26.         }
  27.         
  28.         // 动态创建数据源
  29.         synchronized (this) {
  30.             if (!dynamicRoutingDataSource.getTargetDataSources().containsKey(dataSourceKey)) {
  31.                 DataSource dataSource = createTenantDataSource(tenantId);
  32.                 dynamicRoutingDataSource.addDataSource(dataSourceKey, dataSource);
  33.                 log.info("为租户 {} 创建数据源: {}", tenantId, dataSourceKey);
  34.             }
  35.         }
  36.         
  37.         return (DataSource) dynamicRoutingDataSource.getTargetDataSources().get(dataSourceKey);
  38.     }
  39.    
  40.     /**
  41.      * 动态创建租户数据源
  42.      */
  43.     private DataSource createTenantDataSource(String tenantId) {
  44.         TenantDataSourceConfig config = getTenantConfig(tenantId);
  45.         
  46.         HikariDataSource dataSource = new HikariDataSource();
  47.         dataSource.setJdbcUrl(buildJdbcUrl(config));
  48.         dataSource.setUsername(config.getUsername());
  49.         dataSource.setPassword(config.getPassword());
  50.         dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
  51.         dataSource.setMaximumPoolSize(10);
  52.         dataSource.setMinimumIdle(2);
  53.         dataSource.setConnectionTimeout(30000);
  54.         dataSource.setIdleTimeout(600000);
  55.         dataSource.setMaxLifetime(1800000);
  56.         dataSource.setPoolName("TenantPool_" + tenantId);
  57.         
  58.         return dataSource;
  59.     }
  60.    
  61.     /**
  62.      * 获取租户数据源配置(可从配置中心或数据库获取)
  63.      */
  64.     private TenantDataSourceConfig getTenantConfig(String tenantId) {
  65.         return tenantConfigCache.computeIfAbsent(tenantId, key -> {
  66.             // 这里可以从配置中心、数据库或缓存中获取租户配置
  67.             // 简化实现,实际项目中需要完善
  68.             TenantDataSourceConfig config = new TenantDataSourceConfig();
  69.             config.setHost("tenant-" + tenantId + ".db.example.com");
  70.             config.setPort(3306);
  71.             config.setDatabase("tenant_" + tenantId);
  72.             config.setUsername("tenant_" + tenantId);
  73.             config.setPassword("password_" + tenantId);
  74.             return config;
  75.         });
  76.     }
  77.    
  78.     private String buildJdbcUrl(TenantDataSourceConfig config) {
  79.         return String.format("jdbc:mysql://%s:%d/%s?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true",
  80.                 config.getHost(), config.getPort(), config.getDatabase());
  81.     }
  82.    
  83.     @Data
  84.     public static class TenantDataSourceConfig {
  85.         private String host;
  86.         private int port;
  87.         private String database;
  88.         private String username;
  89.         private String password;
  90.     }
  91. }
复制代码
数据源健康监控
  1. /**
  2. * 数据源健康监控器
  3. */
  4. @Component
  5. @Slf4j
  6. public class DataSourceHealthMonitor {
  7.    
  8.     @Autowired
  9.     private DynamicRoutingDataSource dynamicRoutingDataSource;
  10.    
  11.     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  12.    
  13.     // 健康状态缓存
  14.     private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();
  15.    
  16.     @PostConstruct
  17.     public void init() {
  18.         // 启动健康检查任务
  19.         scheduler.scheduleAtFixedRate(this::checkAllDataSources, 0, 30, TimeUnit.SECONDS);
  20.     }
  21.    
  22.     /**
  23.      * 检查所有数据源的健康状态
  24.      */
  25.     public void checkAllDataSources() {
  26.         Map<Object, Object> dataSources = dynamicRoutingDataSource.getTargetDataSources();
  27.         
  28.         for (Map.Entry<Object, Object> entry : dataSources.entrySet()) {
  29.             String dataSourceKey = (String) entry.getKey();
  30.             DataSource dataSource = (DataSource) entry.getValue();
  31.             
  32.             boolean isHealthy = checkDataSourceHealth(dataSource);
  33.             healthStatus.put(dataSourceKey, isHealthy);
  34.             
  35.             if (!isHealthy) {
  36.                 log.warn("数据源 {} 健康检查失败", dataSourceKey);
  37.                 // 可以发送告警通知
  38.             }
  39.         }
  40.     }
  41.    
  42.     /**
  43.      * 检查单个数据源健康状态
  44.      */
  45.     private boolean checkDataSourceHealth(DataSource dataSource) {
  46.         try (Connection conn = dataSource.getConnection();
  47.              Statement stmt = conn.createStatement()) {
  48.             
  49.             ResultSet rs = stmt.executeQuery("SELECT 1");
  50.             return rs.next() && rs.getInt(1) == 1;
  51.             
  52.         } catch (SQLException e) {
  53.             log.error("数据源健康检查异常", e);
  54.             return false;
  55.         }
  56.     }
  57.    
  58.     /**
  59.      * 获取数据源健康状态
  60.      */
  61.     public boolean isDataSourceHealthy(String dataSourceKey) {
  62.         return healthStatus.getOrDefault(dataSourceKey, true);
  63.     }
  64.    
  65.     /**
  66.      * 获取健康的数据源列表
  67.      */
  68.     public List<String> getHealthyDataSources() {
  69.         return healthStatus.entrySet().stream()
  70.                 .filter(Map.Entry::getValue)
  71.                 .map(Map.Entry::getKey)
  72.                 .collect(Collectors.toList());
  73.     }
  74.    
  75.     @PreDestroy
  76.     public void destroy() {
  77.         scheduler.shutdown();
  78.     }
  79. }
复制代码
五、动态数据源的应用场景

让我们通过架构图来理解动态数据源的典型应用场景:
2.png

六、优缺点

优点


  • 灵活性高:支持运行时动态添加、移除数据源
  • 解耦性好:业务代码与具体数据源解耦
  • 扩展性强:易于实现读写分离、多租户等复杂场景
  • 维护方便:数据源配置集中管理,便于维护
缺点


  • 复杂度增加:系统架构变得更加复杂
  • 事务管理复杂:跨数据源事务需要特殊处理
  • 连接池开销:每个数据源都需要独立的连接池
  • 调试困难:数据源切换增加了调试复杂度
七、生产环境注意事项

事务管理策略
  1. /**
  2. * 多数据源事务管理器
  3. */
  4. @Component
  5. @Slf4j
  6. public class MultiDataSourceTransactionManager {
  7.    
  8.     /**
  9.      * 在多个数据源上执行事务性操作
  10.      */
  11.     @Transactional(rollbackFor = Exception.class)
  12.     public void executeInTransaction(Runnable task, String... dataSources) {
  13.         if (dataSources.length == 1) {
  14.             // 单数据源事务
  15.             DataSourceContextHolder.setDataSource(dataSources[0]);
  16.             try {
  17.                 task.run();
  18.             } finally {
  19.                 DataSourceContextHolder.clear();
  20.             }
  21.         } else {
  22.             // 多数据源伪事务(最终一致性)
  23.             executeWithCompensation(task, dataSources);
  24.         }
  25.     }
  26.    
  27.     /**
  28.      * 使用补偿机制实现多数据源"事务"
  29.      */
  30.     private void executeWithCompensation(Runnable task, String[] dataSources) {
  31.         List<Runnable> compensationTasks = new ArrayList<>();
  32.         
  33.         try {
  34.             // 按顺序执行各个数据源的操作
  35.             for (String dataSource : dataSources) {
  36.                 DataSourceContextHolder.setDataSource(dataSource);
  37.                 try {
  38.                     // 执行实际业务操作
  39.                     task.run();
  40.                     
  41.                     // 记录补偿操作
  42.                     compensationTasks.add(0, createCompensationTask(dataSource));
  43.                     
  44.                 } finally {
  45.                     DataSourceContextHolder.clear();
  46.                 }
  47.             }
  48.         } catch (Exception e) {
  49.             // 执行补偿操作
  50.             log.error("多数据源操作失败,执行补偿操作", e);
  51.             executeCompensation(compensationTasks);
  52.             throw e;
  53.         }
  54.     }
  55.    
  56.     private void executeCompensation(List<Runnable> compensationTasks) {
  57.         for (Runnable compensation : compensationTasks) {
  58.             try {
  59.                 compensation.run();
  60.             } catch (Exception ex) {
  61.                 log.error("补偿操作执行失败", ex);
  62.                 // 记录补偿失败,需要人工介入
  63.             }
  64.         }
  65.     }
  66. }
复制代码
性能优化建议


  • 连接池优化:根据业务特点调整各数据源连接池参数
  • 数据源预热:应用启动时预热常用数据源
  • 缓存策略:缓存数据源配置和路由信息
  • 监控告警:建立完善的数据源监控体系
总结

动态数据源是一个强大的技术方案,能够很好地解决多数据源管理的复杂性。
通过本文的详细解析,我们可以看到:

  • 核心原理:基于AbstractRoutingDataSource和ThreadLocal的上下文管理
  • 实现方式:注解+AOP的声明式数据源切换
  • 高级特性:读写分离、多租户、健康监控等生产级功能
  • 适用场景:多租户、读写分离、分库分表等复杂数据架构
在实际项目中,建议根据具体业务需求选择合适的实现方案,不要过度设计。
同时,要建立完善的监控和运维体系,确保动态数据源的稳定运行。
最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。
更多项目实战在我的技术网站:http://www.susan.net.cn/project

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

4 天前

举报

您需要登录后才可以回帖 登录 | 立即注册