通过延时队列和mysql uodate原子性实现延迟任务
背景:公司内部通过at群聊机器人来实现一些功能,例如:@机器人 查询 user 1 这个命令就是查询用户表 id为1的操作,后端对接了机器人,由后端来解析命令 再进行相应的操作。
新需求:延迟执行命令任务,例如:@机器人 延迟执行 1s/1m/1h 查询 user 1 通过延迟执行命令来延迟执行后面的命令。
解决方案【简单版】
使用Java自带的延迟队列和mysql的update原子性来实现
- 延迟队列:延迟执行任务
- mysql update原子性:服务多实例部署的时候 保证只有一个实例能执行任务
mysql任务表
- create table core_delay_task
- (
- id int auto_increment comment '主键' primary key,
- task_param varchar(4096) null comment '任务参数',
- task_status int default 0 null comment '任务状态 0未执行 1执行中 2执行成功 3执行失败',
- task_execute_time datetime null comment '任务执行时间',
- task_create_time datetime null comment '任务创建时间',
- task_update_time datetime null comment '任务更新时间',
- task_execute_result varchar(2048) null comment '任务执行结果'
- ) comment '延迟任务表';
复制代码 业务代码
- package xxx.service.impl;
- import com.alibaba.fastjson2.JSON;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.ObjectUtils;
- import java.time.LocalDateTime;
- import java.time.ZoneId;
- import java.time.temporal.ChronoUnit;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.DelayQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.stream.Collectors;
- @Cmd(value = {"延迟执行"}, order = 301, admin = true)
- public class DelayTaskImpl implements CmdService, InitializingBean {
- @Autowired
- private CoreDelayTaskMapper coreDelayTaskMapper;
- private static final Logger LOGGER = LoggerFactory.getLogger(DelayTaskImpl.class);
- private final static DelayQueue<CmdServiceDelay> DELAY_QUEUE = new DelayQueue<>();
- @Override
- public RobotResp execute(RobotReq req) {
- ArgsUtil argsUtil = ArgsUtil.of(req);
- String[] args = req.getText().split("\\s+");
- args[1] = null;
- args[2] = null;
- String targetText = Arrays.stream(args).filter(e -> !ObjectUtils.isEmpty(e)).collect(Collectors.joining(" "));
- req.setText(targetText);
- String delay = argsUtil.index(2);
- String delayValue = delay.substring(0, delay.length() - 1);
- Long delayLongValue = 0L;
- if (delay.endsWith("s")) {
- delayLongValue = Long.parseLong(delayValue) * 1000;
- } else if (delay.endsWith("m")) {
- delayLongValue = Long.parseLong(delayValue) * 1000 * 60;
- } else if (delay.endsWith("h")) {
- delayLongValue = Long.parseLong(delayValue) * 1000 * 60 * 60;
- } else {
- return RobotResp.ok("仅支持 秒 分钟 小时");
- }
- CoreDelayTask coreDelayTask = new CoreDelayTask();
- coreDelayTask.setTaskParam(JSON.toJSONString(req));
- coreDelayTask.setTaskStatus(0);
- coreDelayTask.setTaskExecuteTime(LocalDateTime.now().plus(delayLongValue, ChronoUnit.MILLIS));
- coreDelayTask.setTaskCreateTime(LocalDateTime.now());
- coreDelayTask.setTaskUpdateTime(LocalDateTime.now());
- coreDelayTaskMapper.insertSelective(coreDelayTask);
- DELAY_QUEUE.add(new CmdServiceDelay(coreDelayTask.getId(), System.currentTimeMillis() + delayLongValue, req));
- return RobotResp.md(String.format("命令 [%s] 将在 [%s] 开始执行", targetText, coreDelayTask.getTaskExecuteTime()));
- }
- @Override
- public String help() {
- return "延迟执行 时间 命令";
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- // bean初始化完毕后 查询所有等待执行的任务
- List<CoreDelayTask> coreDelayTasks = coreDelayTaskMapper.listCoreDelayTaskForWaitExecute();
- // 将等待执行的任务放入到延迟队列
- for (CoreDelayTask coreDelayTask : coreDelayTasks) {
- DELAY_QUEUE.add(new CmdServiceDelay(coreDelayTask.getId(), coreDelayTask.getTaskExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), JSON.parseObject(coreDelayTask.getTaskParam(), RobotReq.class)));
- }
- ExecutorService executor = Executors.newFixedThreadPool(1);
- executor.submit(() -> {
- while (true) {
- try {
- CmdServiceDelay cmdServiceDelay = DELAY_QUEUE.take();
- Integer taskId = cmdServiceDelay.getTaskId();
- // 利用update原子性来保证只有一个实例能获取到执行任务的锁,然后执行任务
- int i = coreDelayTaskMapper.updateCoreDelayTaskForExecute(taskId, LocalDateTime.now());
- if (i > 0) {
- LOGGER.info("抢到锁 开始执行延迟任务 任务ID {}", taskId);
- RobotReq robotReq = cmdServiceDelay.getRobotReq();
- CoreDelayTask coreDelayTask = new CoreDelayTask();
- coreDelayTask.setId(taskId);
- try {
- // 这里可以执行具体的任务
- coreDelayTask.setTaskStatus(DelayTaskStatusEnum.EXECUTE_SUCCESS.getCode());
- } catch (Exception e) {
- coreDelayTask.setTaskStatus(DelayTaskStatusEnum.EXECUTE_FAILED.getCode());
- coreDelayTask.setTaskExecuteResult(e.getMessage());
- }
- coreDelayTask.setTaskUpdateTime(LocalDateTime.now());
- coreDelayTaskMapper.updateByPrimaryKeySelective(coreDelayTask);
- } else {
- LOGGER.info("未抢到锁 放弃延迟任务 {}", taskId);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
复制代码 最后
上述代码基本上可以完成延迟执行任务的需求了,可以再加一点防御性代码保证代码的健壮性。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |