找回密码
 立即注册
首页 业界区 业界 通过延时队列和mysql uodate原子性实现延迟任务 ...

通过延时队列和mysql uodate原子性实现延迟任务

圄旧剖 2025-10-1 18:10:07
通过延时队列和mysql uodate原子性实现延迟任务

背景:公司内部通过at群聊机器人来实现一些功能,例如:@机器人 查询 user 1 这个命令就是查询用户表 id为1的操作,后端对接了机器人,由后端来解析命令 再进行相应的操作。
新需求:延迟执行命令任务,例如:@机器人 延迟执行 1s/1m/1h 查询 user 1 通过延迟执行命令来延迟执行后面的命令。
解决方案【简单版】

使用Java自带的延迟队列和mysql的update原子性来实现

  • 延迟队列:延迟执行任务
  • mysql update原子性:服务多实例部署的时候 保证只有一个实例能执行任务
mysql任务表
  1. create table core_delay_task
  2. (
  3.     id                  int auto_increment comment '主键' primary key,
  4.     task_param          varchar(4096) null comment '任务参数',
  5.     task_status         int default 0 null comment '任务状态 0未执行 1执行中 2执行成功 3执行失败',
  6.     task_execute_time   datetime      null comment '任务执行时间',
  7.     task_create_time    datetime      null comment '任务创建时间',
  8.     task_update_time    datetime      null comment '任务更新时间',
  9.     task_execute_result varchar(2048) null comment '任务执行结果'
  10. ) comment '延迟任务表';
复制代码
业务代码
  1. package xxx.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.InitializingBean;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.util.ObjectUtils;
  8. import java.time.LocalDateTime;
  9. import java.time.ZoneId;
  10. import java.time.temporal.ChronoUnit;
  11. import java.util.Arrays;
  12. import java.util.List;
  13. import java.util.concurrent.DelayQueue;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.Executors;
  16. import java.util.stream.Collectors;
  17. @Cmd(value = {"延迟执行"}, order = 301, admin = true)
  18. public class DelayTaskImpl implements CmdService, InitializingBean {
  19.     @Autowired
  20.     private CoreDelayTaskMapper coreDelayTaskMapper;
  21.     private static final Logger LOGGER = LoggerFactory.getLogger(DelayTaskImpl.class);
  22.     private final static DelayQueue<CmdServiceDelay> DELAY_QUEUE = new DelayQueue<>();
  23.     @Override
  24.     public RobotResp execute(RobotReq req) {
  25.         ArgsUtil argsUtil = ArgsUtil.of(req);
  26.         String[] args = req.getText().split("\\s+");
  27.         args[1] = null;
  28.         args[2] = null;
  29.         String targetText = Arrays.stream(args).filter(e -> !ObjectUtils.isEmpty(e)).collect(Collectors.joining(" "));
  30.         req.setText(targetText);
  31.         String delay = argsUtil.index(2);
  32.         String delayValue = delay.substring(0, delay.length() - 1);
  33.         Long delayLongValue = 0L;
  34.         if (delay.endsWith("s")) {
  35.             delayLongValue = Long.parseLong(delayValue) * 1000;
  36.         } else if (delay.endsWith("m")) {
  37.             delayLongValue = Long.parseLong(delayValue) * 1000 * 60;
  38.         } else if (delay.endsWith("h")) {
  39.             delayLongValue = Long.parseLong(delayValue) * 1000 * 60 * 60;
  40.         } else {
  41.             return RobotResp.ok("仅支持 秒 分钟 小时");
  42.         }
  43.         CoreDelayTask coreDelayTask = new CoreDelayTask();
  44.         coreDelayTask.setTaskParam(JSON.toJSONString(req));
  45.         coreDelayTask.setTaskStatus(0);
  46.         coreDelayTask.setTaskExecuteTime(LocalDateTime.now().plus(delayLongValue, ChronoUnit.MILLIS));
  47.         coreDelayTask.setTaskCreateTime(LocalDateTime.now());
  48.         coreDelayTask.setTaskUpdateTime(LocalDateTime.now());
  49.         coreDelayTaskMapper.insertSelective(coreDelayTask);
  50.         DELAY_QUEUE.add(new CmdServiceDelay(coreDelayTask.getId(), System.currentTimeMillis() + delayLongValue, req));
  51.         return RobotResp.md(String.format("命令 [%s] 将在 [%s] 开始执行", targetText, coreDelayTask.getTaskExecuteTime()));
  52.     }
  53.     @Override
  54.     public String help() {
  55.         return "延迟执行 时间 命令";
  56.     }
  57.     @Override
  58.     public void afterPropertiesSet() throws Exception {
  59.         // bean初始化完毕后 查询所有等待执行的任务
  60.         List<CoreDelayTask> coreDelayTasks = coreDelayTaskMapper.listCoreDelayTaskForWaitExecute();
  61.         // 将等待执行的任务放入到延迟队列
  62.         for (CoreDelayTask coreDelayTask : coreDelayTasks) {
  63.             DELAY_QUEUE.add(new CmdServiceDelay(coreDelayTask.getId(), coreDelayTask.getTaskExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), JSON.parseObject(coreDelayTask.getTaskParam(), RobotReq.class)));
  64.         }
  65.         ExecutorService executor = Executors.newFixedThreadPool(1);
  66.         executor.submit(() -> {
  67.             while (true) {
  68.                 try {
  69.                     CmdServiceDelay cmdServiceDelay = DELAY_QUEUE.take();
  70.                     Integer taskId = cmdServiceDelay.getTaskId();
  71.                     // 利用update原子性来保证只有一个实例能获取到执行任务的锁,然后执行任务
  72.                     int i = coreDelayTaskMapper.updateCoreDelayTaskForExecute(taskId, LocalDateTime.now());
  73.                     if (i > 0) {
  74.                         LOGGER.info("抢到锁 开始执行延迟任务 任务ID {}", taskId);
  75.                         RobotReq robotReq = cmdServiceDelay.getRobotReq();
  76.                         CoreDelayTask coreDelayTask = new CoreDelayTask();
  77.                         coreDelayTask.setId(taskId);
  78.                         try {
  79.                             // 这里可以执行具体的任务
  80.                             coreDelayTask.setTaskStatus(DelayTaskStatusEnum.EXECUTE_SUCCESS.getCode());
  81.                         } catch (Exception e) {
  82.                             coreDelayTask.setTaskStatus(DelayTaskStatusEnum.EXECUTE_FAILED.getCode());
  83.                             coreDelayTask.setTaskExecuteResult(e.getMessage());
  84.                         }
  85.                         coreDelayTask.setTaskUpdateTime(LocalDateTime.now());
  86.                        coreDelayTaskMapper.updateByPrimaryKeySelective(coreDelayTask);
  87.                     } else {
  88.                         LOGGER.info("未抢到锁 放弃延迟任务 {}", taskId);
  89.                     }
  90.                 } catch (Exception e) {
  91.                     e.printStackTrace();
  92.                 }
  93.             }
  94.         });
  95.     }
  96. }
复制代码
最后

上述代码基本上可以完成延迟执行任务的需求了,可以再加一点防御性代码保证代码的健壮性。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册