圄旧剖 发表于 2025-10-1 18:10:07

通过延时队列和mysql uodate原子性实现延迟任务

通过延时队列和mysql uodate原子性实现延迟任务

背景:公司内部通过at群聊机器人来实现一些功能,例如:@机器人 查询 user 1 这个命令就是查询用户表 id为1的操作,后端对接了机器人,由后端来解析命令 再进行相应的操作。
新需求:延迟执行命令任务,例如:@机器人 延迟执行 1s/1m/1h 查询 user 1 通过延迟执行命令来延迟执行后面的命令。
解决方案【简单版】

使用Java自带的延迟队列和mysql的update原子性来实现

[*]延迟队列:延迟执行任务
[*]mysql update原子性:服务多实例部署的时候 保证只有一个实例能执行任务
mysql任务表

create table core_delay_task
(
    id                  int auto_increment comment '主键' primary key,
    task_param          varchar(4096) null comment '任务参数',
    task_status         int default 0 null comment '任务状态 0未执行 1执行中 2执行成功 3执行失败',
    task_execute_time   datetime      null comment '任务执行时间',
    task_create_time    datetime      null comment '任务创建时间',
    task_update_time    datetime      null comment '任务更新时间',
    task_execute_result varchar(2048) null comment '任务执行结果'
) comment '延迟任务表';业务代码

package xxx.service.impl;

import com.alibaba.fastjson2.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

@Cmd(value = {"延迟执行"}, order = 301, admin = true)
public class DelayTaskImpl implements CmdService, InitializingBean {

    @Autowired
    private CoreDelayTaskMapper coreDelayTaskMapper;

    private static final Logger LOGGER = LoggerFactory.getLogger(DelayTaskImpl.class);

    private final static DelayQueue<CmdServiceDelay> DELAY_QUEUE = new DelayQueue<>();

    @Override
    public RobotResp execute(RobotReq req) {

      ArgsUtil argsUtil = ArgsUtil.of(req);
      String[] args = req.getText().split("\\s+");
      args = null;
      args = null;
      String targetText = Arrays.stream(args).filter(e -> !ObjectUtils.isEmpty(e)).collect(Collectors.joining(" "));
      req.setText(targetText);

      String delay = argsUtil.index(2);
      String delayValue = delay.substring(0, delay.length() - 1);
      Long delayLongValue = 0L;
      if (delay.endsWith("s")) {
            delayLongValue = Long.parseLong(delayValue) * 1000;
      } else if (delay.endsWith("m")) {
            delayLongValue = Long.parseLong(delayValue) * 1000 * 60;
      } else if (delay.endsWith("h")) {
            delayLongValue = Long.parseLong(delayValue) * 1000 * 60 * 60;
      } else {
            return RobotResp.ok("仅支持 秒 分钟 小时");
      }

      CoreDelayTask coreDelayTask = new CoreDelayTask();
      coreDelayTask.setTaskParam(JSON.toJSONString(req));
      coreDelayTask.setTaskStatus(0);
      coreDelayTask.setTaskExecuteTime(LocalDateTime.now().plus(delayLongValue, ChronoUnit.MILLIS));
      coreDelayTask.setTaskCreateTime(LocalDateTime.now());
      coreDelayTask.setTaskUpdateTime(LocalDateTime.now());

      coreDelayTaskMapper.insertSelective(coreDelayTask);

      DELAY_QUEUE.add(new CmdServiceDelay(coreDelayTask.getId(), System.currentTimeMillis() + delayLongValue, req));

      return RobotResp.md(String.format("命令 [%s] 将在 [%s] 开始执行", targetText, coreDelayTask.getTaskExecuteTime()));
    }

    @Override
    public String help() {
      return "延迟执行 时间 命令";
    }

    @Override
    public void afterPropertiesSet() throws Exception {
      // bean初始化完毕后 查询所有等待执行的任务
      List<CoreDelayTask> coreDelayTasks = coreDelayTaskMapper.listCoreDelayTaskForWaitExecute();
      // 将等待执行的任务放入到延迟队列
      for (CoreDelayTask coreDelayTask : coreDelayTasks) {
            DELAY_QUEUE.add(new CmdServiceDelay(coreDelayTask.getId(), coreDelayTask.getTaskExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), JSON.parseObject(coreDelayTask.getTaskParam(), RobotReq.class)));
      }

      ExecutorService executor = Executors.newFixedThreadPool(1);
      executor.submit(() -> {
            while (true) {
                try {
                  CmdServiceDelay cmdServiceDelay = DELAY_QUEUE.take();
                  Integer taskId = cmdServiceDelay.getTaskId();
                  // 利用update原子性来保证只有一个实例能获取到执行任务的锁,然后执行任务
                  int i = coreDelayTaskMapper.updateCoreDelayTaskForExecute(taskId, LocalDateTime.now());
                  if (i > 0) {
                        LOGGER.info("抢到锁 开始执行延迟任务 任务ID {}", taskId);
                        RobotReq robotReq = cmdServiceDelay.getRobotReq();
                        CoreDelayTask coreDelayTask = new CoreDelayTask();
                        coreDelayTask.setId(taskId);

                        try {
                            // 这里可以执行具体的任务
                            coreDelayTask.setTaskStatus(DelayTaskStatusEnum.EXECUTE_SUCCESS.getCode());
                        } catch (Exception e) {
                            coreDelayTask.setTaskStatus(DelayTaskStatusEnum.EXECUTE_FAILED.getCode());
                            coreDelayTask.setTaskExecuteResult(e.getMessage());
                        }

                        coreDelayTask.setTaskUpdateTime(LocalDateTime.now());
                     coreDelayTaskMapper.updateByPrimaryKeySelective(coreDelayTask);
                  } else {
                        LOGGER.info("未抢到锁 放弃延迟任务 {}", taskId);
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      });
    }
}最后

上述代码基本上可以完成延迟执行任务的需求了,可以再加一点防御性代码保证代码的健壮性。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 通过延时队列和mysql uodate原子性实现延迟任务