找回密码
 立即注册
首页 业界区 业界 3.1.9 生产“稳”担当:Master 服务启动源码全方位解析 ...

3.1.9 生产“稳”担当:Master 服务启动源码全方位解析

杠氯 2025-9-24 11:04:39
作者 | 李杰 移动云,Apache DolphinScheduler贡献者
1.jpeg

在现代数据驱动的企业中,工作流调度系统是数据管道(Data Pipeline)的“中枢神经”。从 ETL 任务到机器学习训练,从报表生成到实时监控,几乎所有关键业务都依赖于一个稳定、高效、易扩展的调度引擎。
笔者认为Apache DolphinScheduler 3.1.9是稳定且广泛使用的版本,故本文将聚焦于这一版本,解析 Master 服务启动时相关流程,深入其源码核心,剖析其架构设计、模块划分与关键实现机制,帮助开发者理解 Master “如何工作”,并为进一步二次开发或性能优化打下基础。
本系列文章分为 3 个部分,分别为 Master Server 启动流程、Worker server 启动流程,以及相关流程图,本文为第一部分。
1. Master Server启动核心概览


  • 代码入口:org.apache.dolphinscheduler.server.master.MasterServer#run
  1. public void run() throws SchedulerException {
  2.         // 1、init rpc server
  3.         this.masterRPCServer.start();
  4.         // 2、install task plugin
  5.         this.taskPluginManager.loadPlugin();
  6.         // 3、self tolerant
  7.         this.masterRegistryClient.start();
  8.         this.masterRegistryClient.setRegistryStoppable(this);
  9.         // 4、master 调度
  10.         this.masterSchedulerBootstrap.init();
  11.         this.masterSchedulerBootstrap.start();
  12.         // 5、事件执行服务
  13.         this.eventExecuteService.start();
  14.         // 6、容错机制
  15.         this.failoverExecuteThread.start();
  16.         // 7、Quartz调度
  17.         this.schedulerApi.start();
  18.         ...
  19.     }
复制代码
1.1 rpc启动:


  • 描述:注册相关命令的process处理器,如task执行中、task执行结果、终止task等。
  • 代码入口:org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer#start
  1. public void start() {
  2.          ...
  3.         // 任务执行中的请求处理器
  4.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
  5.         // 任务执行结果的请求处理器
  6.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
  7.         // 任务终止的请求处理器
  8.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
  9.         this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
  10.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
  11.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
  12.         this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
  13.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
  14.         this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,
  15.                 workflowExecutingDataRequestProcessor);
  16.         // 流式任务启动请求处理器
  17.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);
  18.         // logger server
  19.         // log相关,查看或者获取日志等操作的处理器
  20.         this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
  21.         this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
  22.         this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
  23.         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
  24.         this.nettyRemotingServer.start();
  25.         logger.info("Started Master RPC Server...");
  26.     }
复制代码
1.2 任务插件初始化:


  • 描述:task的相关模板操作,如创建task、解析task参数、获取task资源信息等。对于该插件,api、master、worker都需要进行注册,在master的作用是设置数据源和UDF信息等。
1.3 Self Tolerant(Master注册):


  • 描述:将自身信息注册至注册中心(本文以zookeeper为例),同时监听自身、其他master和所有worker节点的注册情况变化,从而做相应的容错处理。
  • 代码入口:org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient#start
  1. public void start() {
  2.         try {
  3.             this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
  4.             //  1、将自身信息注册至注册中心;
  5.             registry();
  6.             //  2、监听自身与注册中心的连接情况;
  7.             registryClient.addConnectionStateListener(
  8.                     new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
  9.             //  3、监听其他master与所有worker在注册中心的活跃情况,做相应的容错工作处理
  10.             //  如对灭亡的master上面的任务进行容错,同时将在worker节点上kill任务
  11.             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
  12.         } catch (Exception e) {
  13.             throw new RegistryException("Master registry client start up error", e);
  14.         }
  15.     }
复制代码
1.4 Master 调度


  • 描述:一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作,是工作流启动、实例容错等处理的核心逻辑。
  • 代码入口:org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run
  1. public void run() {
  2.         while (!ServerLifeCycleManager.isStopped()) {
  3.             try {
  4.                 if (!ServerLifeCycleManager.isRunning()) {
  5.                     // the current server is not at running status, cannot consume command.
  6.                     logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);
  7.                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
  8.                 }
  9.                 // todo: if the workflow event queue is much, we need to handle the back pressure
  10.                 boolean isOverload =
  11.                         OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
  12.                 // 如果cpu以及memory负载过高,那么就暂时不处理命令
  13.                 if (isOverload) {
  14.                     logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
  15.                     MasterServerMetrics.incMasterOverload();
  16.                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
  17.                     continue;
  18.                 }
  19.                 // 从数据库中获取commands执行命令,如启动工作流,容错工作流实例等
  20.                 List<Command> commands = findCommands();
  21.                 if (CollectionUtils.isEmpty(commands)) {
  22.                     // indicate that no command ,sleep for 1s
  23.                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
  24.                     continue;
  25.                 }
  26.                 // 将相应的commands转为工作流实例,转换成功后删除相应的commands  
  27.                 List<ProcessInstance> processInstances = command2ProcessInstance(commands);
  28.                 if (CollectionUtils.isEmpty(processInstances)) {
  29.                     // indicate that the command transform to processInstance error, sleep for 1s
  30.                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
  31.                     continue;
  32.                 }
  33.                 MasterServerMetrics.incMasterConsumeCommand(commands.size());
  34.                 processInstances.forEach(processInstance -> {
  35.                     try {
  36.                         LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
  37.                         if (processInstanceExecCacheManager.contains(processInstance.getId())) {
  38.                             logger.error(
  39.                                     "The workflow instance is already been cached, this case shouldn't be happened");
  40.                         }
  41.                         // 创建工作流执行线程,负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理
  42.                         WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
  43.                                 processService,
  44.                                 processInstanceDao,
  45.                                 nettyExecutorManager,
  46.                                 processAlertManager,
  47.                                 masterConfig,
  48.                                 stateWheelExecuteThread,
  49.                                 curingGlobalParamsService);
  50.                         // 此处将每个工作流执行线程进行缓存,后续从缓存中获取该线程进行执行
  51.                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
  52.                         // 将启动工作流事件放入工作流事件队列中,然后workflowEventLooper不断从队列中获取事件进行处理
  53.                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
  54.                                 processInstance.getId()));
  55.                     } finally {
  56.                         LoggerUtils.removeWorkflowInstanceIdMDC();
  57.                     }
  58.                 });
  59.             } catch (InterruptedException interruptedException) {
  60.                 logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
  61.                 Thread.currentThread().interrupt();
  62.                 break;
  63.             } catch (Exception e) {
  64.                 logger.error("Master schedule workflow error", e);
  65.                 // sleep for 1s here to avoid the database down cause the exception boom
  66.                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
  67.             }
  68.         }
  69.     }
复制代码
上述步骤产生工作流事件后,WorkflowEventLooper不断地消费处理:
  1. public void run() {
  2.         WorkflowEvent workflowEvent = null;
  3.         while (!ServerLifeCycleManager.isStopped()) {
  4.                 ...
  5.                 workflowEvent = workflowEventQueue.poolEvent();
  6.                 LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
  7.                 logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
  8.                 WorkflowEventHandler workflowEventHandler =
  9.                         workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
  10.                 // 相应的事件处理器来处理工作流事件,主要功能是执行上述中缓存的工作流执行线程WorkflowExecuteRunnable
  11.                 workflowEventHandler.handleWorkflowEvent(workflowEvent);
  12.                ...
  13.         }
  14.     }
复制代码
启动WorkflowExecuteRunnable时,主要功能是初始化DAG、提交且分发task等:
  1. public WorkflowSubmitStatue call() {
  2.              ...
  3.             LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
  4.             if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
  5.                // 构建工作流的DAG
  6.                 buildFlowDag();
  7.                 workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
  8.                 logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
  9.             }
  10.             if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
  11.                 // 初始化相关队列, 将相关队列都清空
  12.                 initTaskQueue();
  13.                 workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
  14.                 logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
  15.             }
  16.             if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
  17.                 // 从起始节点开始执行,提交所有节点任务
  18.                 submitPostNode(null);
  19.                 workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
  20.                 logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
  21.             }
  22.             return WorkflowSubmitStatue.SUCCESS;
  23.             ...
  24.     }
复制代码
此时parentNodeCode为null,表示从根节点开始启动所有node:
  1. private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
  2.        ...
  3.        // 根据起点节点parentNodeCode获取其后续待执行的task
  4.        List<TaskInstance> taskInstances=...
  5.         for (TaskInstance task : taskInstances) {
  6.             ...
  7.             // 将task放到 “预提交”队列 readyToSubmitTaskQueue
  8.             addTaskToStandByList(task);
  9.         }
  10.         // 处理“预提交”队列readyToSubmitTaskQueue,提交task
  11.         submitStandByTask();
  12.         ...
  13.     }
复制代码
  1. public void submitStandByTask() throws StateEventHandleException {
  2.         int length = readyToSubmitTaskQueue.size();
  3.         for (int i = 0; i < length; i++) {
  4.             TaskInstance task = readyToSubmitTaskQueue.peek();
  5.             ...
  6.             // 检测task的依赖关系是否构建成功,如果成功,则进行提交操作
  7.             DependResult dependResult = getDependResultForTask(task);
  8.             if (DependResult.SUCCESS == dependResult) {
  9.                 logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
  10.                 // 提交task
  11.                 Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
  12.                 // 提交失败
  13.                 if (!taskInstanceOptional.isPresent()) {
  14.                 ...
  15.                 } else {
  16.                     // 提交成功,从“预提交”队里中清除该task
  17.                     removeTaskFromStandbyList(task);
  18.                 }
  19.             }
  20.             ...
  21.         }
  22.     }
复制代码
  1. private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
  2.             ...
  3.             // 根据master侧任务类型(不是shell、spark那种, 此处是例如Common、Condition、SubTask、SwitchTask等),做相应的初始化操作,为了便于理解,本文采用通用task来处理
  4.             ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
  5.             taskProcessor.init(taskInstance, processInstance);
  6.             ...
  7.             // 补充taskInstance参数,且提交保存至db
  8.             boolean submit = taskProcessor.action(TaskAction.SUBMIT);
  9.             ...
  10.             // 若为通用task类型,则将任务提交到一个待dispatch的task队列taskPriorityQueue中,有消费者TaskPriorityQueueConsumer专门消费该队列
  11.             boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
  12.             ...
  13.             // 若为通用task类型,则不做任何处理
  14.             taskProcessor.action(TaskAction.RUN);
  15.             // 增加超时检测,若是超时,会发生告警
  16.             stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
  17.             // 增加状态检查,当成功或者其他状态时,会进行相应的处理
  18.             stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
  19.             ...
  20.             return Optional.of(taskInstance);
  21.             ...
  22.     }
复制代码
TaskPriorityQueueConsumer是一个专门消费上述taskPriorityQueue队列的线程,在程序启动时开始监听taskPriorityQueue队列:
  1. public void run() {
  2.         int fetchTaskNum = masterConfig.getDispatchTaskNumber();
  3.         while (!ServerLifeCycleManager.isStopped()) {
  4.             try {
  5.                 // 消费需要dispatch的task
  6.                 // 为task挑选可用worker节点,然后将task分配至该worker节点
  7.                 List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
  8.                 ...
  9.             } catch (Exception e) {
  10.                 TaskMetrics.incTaskDispatchError();
  11.                 logger.error("dispatcher task error", e);
  12.             }
  13.         }
  14.     }
复制代码
  1. public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
  2.         ...
  3.         // 利用多线程并发消费task
  4.         CountDownLatch latch = new CountDownLatch(fetchTaskNum);
  5.         for (int i = 0; i < fetchTaskNum; i++) {
  6.             TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
  7.             ...
  8.             consumerThreadPoolExecutor.submit(() -> {
  9.                 try {
  10.                     // 为task进行分发操作
  11.                     boolean dispatchResult = this.dispatchTask(taskPriority);
  12.                     ...
  13.                 } finally {
  14.                     // make sure the latch countDown
  15.                     latch.countDown();
  16.                 }
  17.             });
  18.         }
  19.         latch.await();
  20.         ...
  21.     }
复制代码
  1. protected boolean dispatchTask(TaskPriority taskPriority) {
  2.         ...
  3.         try {
  4.             WorkflowExecuteRunnable workflowExecuteRunnable =
  5.                     processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
  6.             ...
  7.             Optional<TaskInstance> taskInstanceOptional =
  8.                     workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
  9.             ...
  10.             TaskInstance taskInstance = taskInstanceOptional.get();
  11.             TaskExecutionContext context = taskPriority.getTaskExecutionContext();
  12.             ExecutionContext executionContext =
  13.                     new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),
  14.                             taskInstance);
  15.             ...
  16.             // 挑选可用worker节点,然后将task分配至该worker节点
  17.             result = dispatcher.dispatch(executionContext);
  18.             ...
  19.         } catch (RuntimeException | ExecuteException e) {
  20.             logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
  21.         }
  22.         return result;
  23.     }
复制代码
具体的分发操作:
  1. public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
  2.          ...
  3.         // host select
  4.         // 根据配置的选择器,筛选符合要求的worker节点信息
  5.         Host host = hostManager.select(context);
  6.         ...
  7.         context.setHost(host);
  8.         ...
  9.         // 将task信息通过RPC发送给挑选的worker节点,要是发送失败,则往其他可用的worker节点发送
  10.         return executorManager.execute(context);
  11.         ...
  12.     }
复制代码
1.5 事件执行服务:


  • 描述:主要负责工作流实例的事件队列的轮询,因为工作流在执行过程中会不断产生事件,如工作流提交失败、任务状态变更等,下面方法就是处理产生的的相关事件。
  • 代码入口:org.apache.dolphinscheduler.server.master.runner.EventExecuteService#run
  1. public void run() {
  2.         while (!ServerLifeCycleManager.isStopped()) {
  3.             try {
  4.                 // 处理工作流执行线程的相关事件,最终会触发WorkflowExecuteRunnable#handleEvents方法
  5.                 workflowEventHandler();
  6.                 // 处理流式任务执行线程的相关事件
  7.                 streamTaskEventHandler();
  8.                 TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
  9.             } ...
  10.         }
  11.     }
复制代码
工作流和实时任务的事件处理逻辑基本一致,下述只描述工作流的处理过程:
  1. public void handleEvents() {
  2.         ...
  3.         StateEvent stateEvent = null;
  4.         while (!this.stateEvents.isEmpty()) {
  5.             try {
  6.                 stateEvent = this.stateEvents.peek();
  7.                 ...
  8.                 StateEventHandler stateEventHandler =
  9.                         StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
  10.                                 .orElseThrow(() -> new StateEventHandleError(
  11.                                         "Cannot find handler for the given state event"));
  12.                 logger.info("Begin to handle state event, {}", stateEvent);
  13.                 // 根据不同事件处理器做不同的处理逻辑
  14.                 if (stateEventHandler.handleStateEvent(this, stateEvent)) {
  15.                     this.stateEvents.remove(stateEvent);
  16.                 }
  17.             } ...
  18.         }
  19.     }
复制代码
下面以工作流提交失败为例:
  1. public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
  2.                                     StateEvent stateEvent) throws StateEventHandleException {
  3.         WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent) stateEvent;
  4.         ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
  5.         measureProcessState(workflowStateEvent);
  6.         log.info(
  7.                 "Handle workflow instance submit fail state event, the current workflow instance state {} will be changed to {}",
  8.                 processInstance.getState(), workflowStateEvent.getStatus());
  9.         // 将实例状态改为FAILURE后入库
  10.         workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent);
  11.         workflowExecuteRunnable.endProcess();
  12.         return true;
  13.     }
复制代码
1.6 容错机制:


  • 描述:主要负责Master容错和Worker容错的相关逻辑。
  • 代码入口:org.apache.dolphinscheduler.server.master.service.MasterFailoverService#checkMasterFailover
  1. public void checkMasterFailover() {
  2.                 // 获取需要容错的master节点
  3.         List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
  4.                 .stream()
  5.                 // failover myself || dead server
  6.                 // 自身或者发生已经灭亡的master
  7.                 .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER))
  8.                 .distinct()
  9.                 .collect(Collectors.toList());
  10.         if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
  11.             return;
  12.         }
  13.          ...
  14.         for (String needFailoverMasterHost : needFailoverMasterHosts) {
  15.             failoverMaster(needFailoverMasterHost);
  16.         }
  17.     }
复制代码
  1. private void doFailoverMaster(@NonNull String masterHost) {
  2.         ...
  3.         // 从注册中心获取master的启动时间
  4.         Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
  5.                 masterHost);
  6.         // 从获取与当前master的需要容错的工作路实例(主要根据需要容错的状态去筛选,如:SUBMITTED_SUCCESS、RUNNING_EXECUTION)
  7.         List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
  8.                 masterHost);
  9.         ...
  10.         for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
  11.                 ...
  12.                 // 判断该实例是否需要容错处理,判断逻辑例如:
  13.                 // 1、其他已经灭亡的master还未重新启动,此时需要进行容错
  14.                 // 2、若工作流实例的启动时间比master的启动时间早,说明master重启过,此时需要容错
  15.                 // ...
  16.                 if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
  17.                     LOGGER.info("WorkflowInstance doesn't need to failover");
  18.                     continue;
  19.                 }
  20.                 List<TaskInstance> taskInstanceList =...
  21.                 for (TaskInstance taskInstance : taskInstanceList) {
  22.                     ...
  23.                         if (!checkTaskInstanceNeedFailover(taskInstance)) {
  24.                             LOGGER.info("The taskInstance doesn't need to failover");
  25.                             continue;
  26.                         }
  27.                         // 对于worker侧的任务,需要进行kill处理,同时将任务实例状态标记为NEED_FAULT_TOLERANCE
  28.                         failoverTaskInstance(processInstance, taskInstance);
  29.                     ...
  30.                 }
  31.                 ProcessInstanceMetrics.incProcessInstanceByState("failover");
  32.                 // updateProcessInstance host is null to mark this processInstance has been failover
  33.                 // and insert a failover command
  34.                 processInstance.setHost(Constants.NULL);
  35.                 // 生成需要容错的command入库,待master调度进行扫描
  36.                 processService.processNeedFailoverProcessInstances(processInstance);
  37.             ...
  38.         }
  39.         
  40.     }
复制代码
结语

以上是笔者对 Apache DolphinScheduler 3.1.9 版本特性与架构的初步理解,基于个人学习与实践整理而成,后续还会输出 Worker 启动流程以及 Master 与 Worker 的交互流程相关文章。由于水平有限,文中难免存在理解偏差或疏漏之处,恳请各位读者不吝指正。如有不同见解,欢迎交流讨论,共同进步。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册