找回密码
 立即注册
首页 业界区 业界 基于Redission实现分布式调度任务选主

基于Redission实现分布式调度任务选主

蝌棚煌 2025-6-2 21:45:31
在Spring Cloud微服务架构中,基于Redisson实现分布式调度任务的选主和心跳监听,可以通过以下完整方案实现。该方案结合了Redisson的分布式锁和发布/订阅功能,确保高可靠性和实时性:
一、整体架构设计

graph TD    A[微服务实例1] -->|竞争锁| B(Redisson)    C[微服务实例2] -->|竞争锁| B    D[微服务实例3] -->|竞争锁| B    B -->|心跳事件| E[Redis Pub/Sub]    E --> A    E --> C    E --> D二、核心实现步骤

1. 添加依赖
  1. <dependency>
  2.     <groupId>org.redisson</groupId>
  3.     redisson-spring-boot-starter</artifactId>
  4.     <version>3.23.2</version>
  5. </dependency>
复制代码
2. 选主服务实现
  1. import org.redisson.api.RLock;
  2. import org.redisson.api.RedissonClient;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. import javax.annotation.PostConstruct;
  6. import javax.annotation.PreDestroy;
  7. import java.util.concurrent.TimeUnit;
  8. @Service
  9. public class LeaderElectionService {
  10.     private static final String LEADER_LOCK_KEY = "scheduler:leader:lock";
  11.     private static final String HEARTBEAT_CHANNEL = "scheduler:heartbeat";
  12.     @Autowired
  13.     private RedissonClient redisson;
  14.    
  15.     private volatile boolean isLeader = false;
  16.     private RLock leaderLock;
  17.     private Thread heartbeatThread;
  18.     @PostConstruct
  19.     public void init() {
  20.         leaderLock = redisson.getLock(LEADER_LOCK_KEY);
  21.         startElection();
  22.         startHeartbeatListener();
  23.     }
  24.     private void startElection() {
  25.         // 尝试获取领导权(非阻塞式)
  26.         new Thread(() -> {
  27.             while (!Thread.currentThread().isInterrupted()) {
  28.                 try {
  29.                     // 尝试获取锁,锁过期时间30秒
  30.                     boolean acquired = leaderLock.tryLock(0, 30, TimeUnit.SECONDS);
  31.                     if (acquired) {
  32.                         isLeader = true;
  33.                         System.out.println("当前节点当选为Leader");
  34.                         startHeartbeatTask(); // 启动心跳任务
  35.                         break;
  36.                     }
  37.                     Thread.sleep(5000); // 每5秒重试一次
  38.                 } catch (InterruptedException e) {
  39.                     Thread.currentThread().interrupt();
  40.                 }
  41.             }
  42.         }).start();
  43.     }
  44.     private void startHeartbeatTask() {
  45.         heartbeatThread = new Thread(() -> {
  46.             while (isLeader && !Thread.currentThread().isInterrupted()) {
  47.                 try {
  48.                     // 1. 续期锁(看门狗机制会自动处理)
  49.                     // 2. 发布心跳
  50.                     redisson.getTopic(HEARTBEAT_CHANNEL)
  51.                            .publish(System.currentTimeMillis());
  52.                     
  53.                     Thread.sleep(10000); // 每10秒发送一次心跳
  54.                 } catch (InterruptedException e) {
  55.                     Thread.currentThread().interrupt();
  56.                 }
  57.             }
  58.         });
  59.         heartbeatThread.start();
  60.     }
  61.     private void startHeartbeatListener() {
  62.         // 监听Leader心跳
  63.         redisson.getTopic(HEARTBEAT_CHANNEL)
  64.                .addListener(Long.class, (channel, heartbeatTime) -> {
  65.                    System.out.println("收到Leader心跳: " + heartbeatTime);
  66.                    // 可在此更新最后一次心跳时间
  67.                });
  68.     }
  69.     @PreDestroy
  70.     public void shutdown() {
  71.         if (isLeader && leaderLock.isHeldByCurrentThread()) {
  72.             leaderLock.unlock();
  73.             isLeader = false;
  74.             if (heartbeatThread != null) {
  75.                 heartbeatThread.interrupt();
  76.             }
  77.         }
  78.     }
  79.     public boolean isLeader() {
  80.         return isLeader;
  81.     }
  82. }
复制代码
3. 健康检查增强
  1. @Service
  2. public class HealthCheckService {
  3.    
  4.     @Autowired
  5.     private RedissonClient redisson;
  6.    
  7.     private volatile long lastHeartbeatTime = 0;
  8.    
  9.     @PostConstruct
  10.     public void init() {
  11.         // 定时检查Leader状态
  12.         Executors.newSingleThreadScheduledExecutor()
  13.                 .scheduleAtFixedRate(this::checkLeaderStatus, 0, 5, TimeUnit.SECONDS);
  14.     }
  15.    
  16.     private void checkLeaderStatus() {
  17.         Long currentTime = redisson.getBucket("scheduler:leader:heartbeat").get();
  18.         if (currentTime != null) {
  19.             lastHeartbeatTime = currentTime;
  20.         }
  21.         
  22.         // 超过30秒未收到心跳认为Leader失效
  23.         if (System.currentTimeMillis() - lastHeartbeatTime > 30000) {
  24.             System.out.println("Leader可能已宕机,触发重新选举");
  25.             // 可在此触发主动抢锁逻辑
  26.         }
  27.     }
  28. }
复制代码
三、关键优化点

1. 多级故障检测

检测方式触发条件恢复动作Redisson看门狗超时锁续期失败(默认30秒)自动释放锁,其他节点可竞争主动心跳超时自定义阈值(如30秒)强制释放锁并重新选举Redis连接断开ConnectionState.LOST暂停选举直到连接恢复2. 选举性能优化配置
  1. # application.yml
  2. redisson:
  3.   lock:
  4.     watchdog-timeout: 30000 # 看门狗超时时间(ms)
  5.   threads: 16 # 事件处理线程数
  6.   netty-threads: 32 # Netty工作线程数
复制代码
3. 脑裂防护方案
  1. // 使用Redisson的MultiLock实现多Redis节点锁
  2. RLock lock1 = redissonClient1.getLock(LEADER_LOCK_KEY);
  3. RLock lock2 = redissonClient2.getLock(LEADER_LOCK_KEY);
  4. RLock multiLock = redisson.getMultiLock(lock1, lock2);
  5. boolean acquired = multiLock.tryLock(0, 30, TimeUnit.SECONDS);
复制代码
四、生产环境部署建议

1. Redis架构选择

部署模式适用场景建议配置哨兵模式高可用要求高3哨兵+3Redis实例Cluster模式大数据量+高性能至少6节点(3主3从)单节点仅开发测试不推荐生产使用2. 监控指标
  1. // 暴露Redisson指标(配合Spring Boot Actuator)
  2. @Bean
  3. public RedissonMetricsBinder redissonMetrics(RedissonClient redisson) {
  4.     return new RedissonMetricsBinder(redisson);
  5. }
复制代码
监控关键指标:

  • redisson.executor.active_threads:活跃线程数
  • redisson.pubsub.subscriptions:订阅数量
  • redisson.connections.active:活跃连接数
3. 灾备方案


  • 双活数据中心:通过RedissonClient配置多区域端点
    1. Config config = new Config();
    2. config.useClusterServers()
    3.     .addNodeAddress("redis://dc1-node1:6379")
    4.     .addNodeAddress("redis://dc2-node1:6379");
    复制代码
  • 降级策略:本地缓存最后已知状态
    1. @Bean
    2. @Primary
    3. public LeaderService fallbackLeaderService() {
    4.     return new FallbackLeaderService(redisLeaderService, localCache);
    5. }
    复制代码
五、与Spring Cloud集成

1. 调度任务控制
  1. @Scheduled(fixedRate = 5000)
  2. public void scheduledTask() {
  3.     if (leaderElectionService.isLeader()) {
  4.         // 只有Leader执行的逻辑
  5.         processBatchData();
  6.     }
  7. }
复制代码
2. 动态配置更新
  1. @RefreshScope
  2. @RestController
  3. @RequestMapping("/leader")
  4. public class LeaderController {
  5.    
  6.     @Value("${election.timeout:30000}")
  7.     private long electionTimeout;
  8.    
  9.     @Autowired
  10.     private LeaderElectionService electionService;
  11.    
  12.     @PostMapping("/timeout")
  13.     public void updateTimeout(@RequestParam long timeout) {
  14.         // 动态调整选举超时
  15.         electionService.setElectionTimeout(timeout);
  16.     }
  17. }
复制代码
六、方案优势总结


  • 亚秒级故障检测:通过Redis Pub/Sub实现实时通知
  • 自动故障转移:Redisson看门狗机制保障锁释放
  • 弹性扩展:支持动态增减微服务实例
  • 最小依赖:仅需Redis集群,无需额外组件
  • 与Spring生态无缝集成:完美配合@Scheduled等组件

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册