找回密码
 立即注册
首页 业界区 安全 深入解析Apache DolphinScheduler容错机制

深入解析Apache DolphinScheduler容错机制

訾懵 2025-6-10 15:47:25
简述

Apache Dolphinscheduler Master和Worker都是支持多节点部署,无中心化的设计。

  • Master主要负责是流程DAG的切分,最终通过RPC将任务分发到Worker节点上以及Worker上任务状态的处理
  • Worker主要负责是真正任务的执行,最后将任务状态汇报给Master,Master进行状态处理
那问题来了:

  • Master掉了怎么办?它是负责流程实例的管理的。这样Worker就没有办法给它汇报任务状态,当然它也不能做状态处理了?
  • Worker掉了又怎么办?要知道Worker是真正任务执行的载体,它如果掉了。Master要怎么处理?
来来来,一张图说清楚它们。
容错

1.webp

总结

其实说白了就是如果Master掉了,其他Master分布式锁来对Master进行容错。也就是流程实例由之前的down掉的Master切换到要接管的Master上,这个时候是需要给Worker下发新Master的host的,让Worker可以重新给新Master上报信息
而Worker掉了就是任务的重试,但是任务重试之前是有前提的,那就是要kill掉正在运行YARN上的任务,当前DS做不到。为什么?因为对于在非客户端分离模式下,是需要ProcessBuilder的waitFor一直等待客户端进程退出的。而applicationId的解析是在客户端进程退出(也就是waitFor退出)之后做的。
那意思就是说只能等待程序运行完毕,我才能获取到applicationId。
org.apache.dolphinscheduler.server.master.service.WorkerFailoverService#killYarnTask
  1. private void killYarnTask(TaskInstance taskInstance, ProcessInstance processInstance) {
  2.     try {
  3.         if (!masterConfig.isKillApplicationWhenTaskFailover()) {
  4.             return;
  5.         }
  6.         if (StringUtils.isEmpty(taskInstance.getHost()) || StringUtils.isEmpty(taskInstance.getLogPath())) {
  7.             return;
  8.         }
  9.         TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
  10.                 .buildWorkflowInstanceHost(masterConfig.getMasterAddress())
  11.                 .buildTaskInstanceRelatedInfo(taskInstance)
  12.                 .buildProcessInstanceRelatedInfo(processInstance)
  13.                 .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
  14.                 .create();
  15.         // only kill yarn/k8s job if exists , the local thread has exited
  16.         log.info("TaskInstance failover begin kill the task related yarn or k8s job");
  17.         ILogService iLogService =
  18.                 SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
  19.         GetAppIdResponse getAppIdResponse =
  20.                 iLogService.getAppId(new GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath()));
  21.         ProcessUtils.killApplication(getAppIdResponse.getAppIds(), taskExecutionContext);
  22.     } catch (Exception ex) {
  23.         log.error("Kill yarn task error", ex);
  24.     }
  25. }
复制代码
怎么办?回顾 1.3.3 版本,是LoggerServer和Master是分离模式的,所以只要Master节点有yarn客户端,是可以通过master对yarn上的applicationId进行干掉的。而现在怎么办?
两种解决思路 :

  • Master上kill,使用yarn rest api
  1. curl -X PUT -d '{"state":"KILLED"}' \
  2.     -H "Content-Type: application/json" \
  3.     http://xx.xx.xx.xx:8088/ws/v1/cluster/apps/application_1694766249884_1098/state?user.name=hdfs
复制代码
注意 : 需要加用户。

  • Worker上kill
    这个是需要标识该任务是容错任务,然后在任务重试运行的时候,调度到指定的Worker上。需要先kill当前运行的applicationId,然后再任务重试。其实这里有一个优化点就是,是Worker掉了,但是任务还在,所以需要判断的是yarn上的状态,如果异常,再kill也不迟,而不是上来就kill。如果是RUNNING,等待就好,可以设置等待超时时间。
转载自journey
原文链接:https://segmentfault.com/a/1190000045084857
本文由 白鲸开源 提供发布支持!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册