赏勿 发表于 2025-9-24 20:53:23

封装CompletionService的并发任务分发器(优化版)

这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:

[*]整体的超时控制
[*]超时、异常处理和封装
[*]取消未完成的任务
核心代码
public class TaskDispatcher<T> {

    private final CompletionService<T> completionService;

    /**
   * 待处理任务
   */
    private final Set<Future<T>> pending = Sets.newHashSet();

    /**
   * 超时时间, 单位: s
   */
    private long timeout = 10000;

    public TaskDispatcher(Executor executor, long timeout) {
      completionService = new ExecutorCompletionService<>(executor);
      if (timeout > 0) {
            this.timeout = timeout;
      }
    }

    public void submit(Callable<T> task) {
      Future<T> future = completionService.submit(task);
      pending.add(future);
    }

    /**
   * 仅获取执行的任务结果
   *
   * @param ignoreException 忽略执行时发生的异常
   * @return
   */
    public List<T> taskCompletedResult(boolean ignoreException) {
      List<TaskResult<T>> taskResultList = taskCompleted();
      List<T> res = Lists.newArrayList();
      if (CollectionUtils.isEmpty(taskResultList)) {
            return res;
      }
      boolean hasError = false;
      for (TaskResult<T> taskResult : taskResultList) {
            if (!taskResult.isTimeout() && taskResult.getError() == null) {
                res.add(taskResult.getValue());
            } else if (taskResult.isTimeout() && !ignoreException) {
                LoggerUtils.error("执行任务时超时");
                hasError = true;
            } else if (taskResult.getError() != null && !ignoreException) {
                LoggerUtils.error("执行任务时发生异常", taskResult.getError());
                hasError = true;
            }
      }
      if (hasError) {
            throw new ZHException("任务并发处理时发生异常");
      }
      return res;
    }

    /**
   * 获取执行的任务
   *
   * @return
   */
    public List<TaskResult<T>> taskCompleted() {
      long deadline = System.currentTimeMillis() + timeout;
      List<TaskResult<T>> results = Lists.newArrayList();
      int totalTasks = pending.size();

      try {
            for (int i = 0; i < totalTasks; i++) {
                long remaining = Math.max(0, deadline - System.currentTimeMillis());
                Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
                TaskResult<T> result = new TaskResult<>();
                if (future == null) {
                  result.setTimeout(true);
                } else {
                  pending.remove(future);
                  try {
                        result.setValue(future.get());
                  } catch (ExecutionException e) {
                        result.setError(e.getCause());
                  }
                }
                results.add(result);
            }
      } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("任务结果收集中断", e);
      } finally {
            pending.forEach(f -> f.cancel(true));
            pending.clear();
      }
      return results;
    }

    @Data
    static class TaskResult<T> {
      private T value;
      private Throwable error;
      private boolean isTimeout;
    }
}需要自己声明线程池bean,使用方式如下
      TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
      for (long index: indexList) {
            taskDispatcher.submit(() -> xxxService.count(index));
      }为了便于在计数求和场景使用,进一步实现了一个子类
public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
    public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
      super(executor, timeout);
    }

    /**
   * 对所有结果求和
   *
   * @return
   */
    public int takeCompletedSum() {
      List<Integer> countResList = taskCompletedResult(true);
      int count = 0;
      for (Integer countSingle : countResList) {
            if (countSingle == null) {
                continue;
            }
            count += countSingle;
      }
      return count;
    }
}
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 封装CompletionService的并发任务分发器(优化版)