迭婵椟 发表于 前天 21:15

大疆不同任务类型执行逻辑,上云API源码分析

大疆不同任务类型执行逻辑,上云API源码分析

大疆司空2中有不同的任务类型:立即任务、定时任务、条件任务。
最初我们实现时,选择的是用Quartz创建定时任务,调用API中executeFlightTask接口实现任务下发。
在功能实现之后,随着对API的深入了解,发现大疆API中有相关的任务下发逻辑。
当时只看了实现逻辑,因为活比较多,加上懒,就一直拖到现在。
本文是对源码的学习总结。
所有任务下发的逻辑都从publishFlightTask()方法开始。
com.dji.sample.wayline.service.impl.FlightTaskServiceImpl#publishFlightTask
立即任务


[*]fillImmediateTime(param)方法,如果是 “立即任务(IMMEDIATE)”,就把任务的执行日期taskDays与执行时间段taskPeriods强制设置为当前服务器时间。
[*]由于taskDays = ,taskPeriods = [ ],所以只会进入循环一次,并且beginTime = 当前毫秒时间点,endTime = beginTime。
[*]创建一条飞行任务记录存到数据库中---- waylineJobOpt。
[*]调用publishOneFlight()方法发布任务
publishFlightTask
    /**
   * 发布飞行任务的核心入口方法。
   * 功能:
   * 1. 处理立即任务(IMMEDIATE),强制以服务器当前时间作为任务执行时间;
   * 2. 根据用户提交的任务日期(taskDays)与时间段(taskPeriods),
   *    生成多个 beginTime/endTime 的任务实例;
   * 3. 为每个时间段创建一条 WaylineJob(航线任务记录);
   * 4. 若为条件任务,写入对应的触发条件;
   * 5. 立即调用 publishOneFlightTask 将任务下发给飞行设备(Dock);
   * 6. 如果任何一次下发失败,则返回失败并终止流程;
   * 7. 所有任务下发成功后返回成功响应。
   */
    @Override
    public HttpResultResponse publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
      fillImmediateTime(param);

      //立即任务只会进入循环一次
      for (Long taskDay : param.getTaskDays()) {
            LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault());
            for (List<Long> taskPeriod : param.getTaskPeriods()) {
                //立即任务的beginTime = endTime = 当前毫秒时间
                long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault()))
                        .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                long endTime = taskPeriod.size() > 1 ?
                        LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault()))
                              .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime;
                //立即任务直接跳过这个判断
                if (TaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) {
                  continue;
                }

                //创建一条飞行任务记录
                Optional<WaylineJobDTO> waylineJobOpt = waylineJobService.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime);
                if (waylineJobOpt.isEmpty()) {
                  throw new SQLException("Failed to create wayline job.");
                }
                WaylineJobDTO waylineJob = waylineJobOpt.get();
               
                //立即任务直接跳过这个方法
                // If it is a conditional task type, add conditions to the job parameters.
                addConditions(waylineJob, param, beginTime, endTime);

                //发布任务
                HttpResultResponse response = this.publishOneFlightTask(waylineJob);
                if (HttpResultResponse.CODE_SUCCESS != response.getCode()) {
                  return response;
                }
            }
      }
      return HttpResultResponse.success();
    }
   
    /**
   * 如果任务类型为 IMMEDIATE(立即任务),则忽略用户传入的任何日期和时间段,
   * 将任务的执行日期(taskDays)与执行时间段(taskPeriods)强制设置为当前服务器时间。
   * 立即任务必须由服务器当前时间触发,不允许由客户端自定义时间。
   */
    private void fillImmediateTime(CreateJobParam param) {
      if (TaskTypeEnum.IMMEDIATE != param.getTaskType()) {
            return;
      }
      long now = System.currentTimeMillis() / 1000;
      param.setTaskDays(List.of(now));
      param.setTaskPeriods(List.of(List.of(now)));
    }

[*]deviceRedisService.checkDeviceOnline判断机场是否在线
[*]调用prepareFlightTask方法,下发飞行准备指令
[*]调用executeFlightTask方法,下发飞行任务执行指令
publishOneFlight
    public HttpResultResponse publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException {

      //判断机场是否在线
      boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
      if (!isOnline) {
            throw new RuntimeException("Dock is offline.");
      }

      //下发飞行准备指令
      boolean isSuccess = this.prepareFlightTask(waylineJob);
      if (!isSuccess) {
            return HttpResultResponse.error("Failed to prepare job.");
      }

      //下发立即任务执行指令
      // Issue an immediate task execution command.
      if (TaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) {
            if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) {
                return HttpResultResponse.error("Failed to execute job.");
            }
      }

      ..............(省略部分代码)

      return HttpResultResponse.success();
    }定时任务


[*]遍历定时任务设置中每一天的每一个时间段
[*]忽略已经过期的时间段
[*]创建waylineJob并存到数据库中。
[*]调用publishOneFlightTask()发布任务
publishFlightTask
    @Override
    public HttpResultResponse publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
      //定时任务在这个方法中直接返回,不会进行任何处理
      fillImmediateTime(param);

      //遍历每一天每一个时间段
      for (Long taskDay : param.getTaskDays()) {
            LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault());
            for (List<Long> taskPeriod : param.getTaskPeriods()) {
                long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault()))
                        .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                long endTime = taskPeriod.size() > 1 ?
                        LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault()))
                              .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime;
               
                //忽略已经过期的时间段
                if (TaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) {
                  continue;
                }

                //创建waylineJob
                Optional<WaylineJobDTO> waylineJobOpt = waylineJobService.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime);
                if (waylineJobOpt.isEmpty()) {
                  throw new SQLException("Failed to create wayline job.");
                }
                WaylineJobDTO waylineJob = waylineJobOpt.get();
               
                //不是条件任务,进入这个方法会直接返回。
                // If it is a conditional task type, add conditions to the job parameters.
                addConditions(waylineJob, param, beginTime, endTime);

                //发布任务
                HttpResultResponse response = this.publishOneFlightTask(waylineJob);
                if (HttpResultResponse.CODE_SUCCESS != response.getCode()) {
                  return response;
                }
            }
      }
      return HttpResultResponse.success();
    }
[*]deviceRedisService.checkDeviceOnline检查机场是否在线。
[*]调用prepareFlightTask方法,下发飞行准备指令.
[*]把定时任务添加到Redis的一个Sorted Set(有序集合)里,用任务的开始时间作为排序依据。
publishOneFlightTask
    public HttpResultResponse publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException {

      //检查机场是否在线
      boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
      if (!isOnline) {
            throw new RuntimeException("Dock is offline.");
      }

      //下发飞行准备指令
      boolean isSuccess = this.prepareFlightTask(waylineJob);
      if (!isSuccess) {
            return HttpResultResponse.error("Failed to prepare job.");
      }
      
      ..........(省略部分代码)
      
      //把定时任务添加到Redis有序集合里,用开始时间排序
      if (TaskTypeEnum.TIMED == waylineJob.getTaskType()) {
            // key: wayline_job_timed, value: {workspace_id}:{dock_sn}:{job_id}
            boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE,
                  waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(),
                  waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
            if (!isAdd) {
                return HttpResultResponse.error("Failed to create scheduled job.");
            }
      }

      return HttpResultResponse.success();
    }
[*]checkScheduledJob定时任务,每5s对 Redis 中的定时任务集合扫描一次。
[*]获取最早的定时任务,拆解任务信息。
[*]如果任务信息开始比现在早30s (offset)以上,认为任务过期,删除 Redis 队列中的任务,更新任务状态为失败。
[*]如果任务执行时间在now + offset这个时间区间内,调用executeFlightTask下发执行任务命令,且无论成功或失败,都从 Redis 队列中删除任务信息。
checkScheduledJob定时任务
    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)    public void checkScheduledJob() {                //获取最早的定时任务,并拆解任务信息      Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);      if (Objects.isNull(jobIdValue)) {            return;      }      log.info("Check the timed tasks of the wayline. {}", jobIdValue);      // format: {workspace_id}:{dock_sn}:{job_id}      String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);      double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);      long now = System.currentTimeMillis();      int offset = 30_000;                //任务信息开始比现在早30s以上,认为任务过期,删除Redis队列中的任务,更新任务状态为失败。      // Expired tasks are deleted directly.      if (time < now - offset) {            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);            waylineJobService.updateJob(WaylineJobDTO.builder()                  .jobId(jobArr)                  .status(WaylineJobStatusEnum.FAILED.getVal())                  .executeTime(LocalDateTime.now())                  .completedTime(LocalDateTime.now())                  .code(HttpStatus.SC_REQUEST_TIMEOUT).build());            return;      }      //判断任务执行时间在now + offset这个时间区间内,下发执行任务命令。      if (now
页: [1]
查看完整版本: 大疆不同任务类型执行逻辑,上云API源码分析