找回密码
 立即注册
首页 业界区 安全 (二)3.1.9 生产“稳”担当:Apache DolphinScheduler ...

(二)3.1.9 生产“稳”担当:Apache DolphinScheduler Worker 服务源码全方位解析

呼延含玉 2025-9-25 08:48:48
作者 | 李杰 移动云,Apache DolphinScheduler贡献者
1.png

在现代数据驱动的企业中,工作流调度系统是数据管道(Data Pipeline)的“中枢神经”。从 ETL 任务到机器学习训练,从报表生成到实时监控,几乎所有关键业务都依赖于一个稳定、高效、易扩展的调度引擎。
笔者认为 Apache DolphinScheduler 3.1.9 是稳定且广泛使用的版本,故本系列文章将深入其源码核心,剖析其架构设计、模块划分与关键实现机制,帮助开发者理解 Master 和 Worker “如何工作”,并为进一步二次开发或性能优化打下基础。
我们之前解读了 Apache DolphinScheduler 3.1.9版本源码的 Master server 启动流程,感兴趣的可以去查看。本文是 Apache DolphinScheduler 3.1.9 版本源码解读的第二篇:Worker Server 启动流程源码解读以及相关流程设计。结尾处附有相关流程图,供大家参考。
2. Worker Server启动核心概览


  • 代码入口:org.apache.dolphinscheduler.server.worker.WorkerServer#run
  1. public void run() {
  2.         // 1. rpc启动
  3.         this.workerRpcServer.start();
  4.         // 忽略,因为workerRpcServer初始化时包含workerRpcClient初始化的功能
  5.         this.workerRpcClient.start();
  6.         // 2. 任务插件初始化
  7.         this.taskPluginManager.loadPlugin();
  8.         this.workerRegistryClient.setRegistryStoppable(this);
  9.         // 3. worker 注册
  10.         this.workerRegistryClient.start();
  11.         // 4. worker管理线程,不断从任务队列中waitSubmitQueue领取任务,提交到线程池处理
  12.         this.workerManagerThread.start();
  13.         // 5. 消息重试线程。负责轮询通过RPC发送服务,如当task在运行中,若未收到master的ack信息,会周期给master发送“运行中”信号
  14.         this.messageRetryRunner.start();
  15.         ...
  16.     }
复制代码
2.1 rpc启动:


  • 描述:注册相关命令的process处理器,如接收任务请求、停止任务请求等。
  • 代码入口:org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer#start
  1. public void start() {
  2.         LOGGER.info("Worker rpc server starting");
  3.         NettyServerConfig serverConfig = new NettyServerConfig();
  4.         serverConfig.setListenPort(workerConfig.getListenPort());
  5.         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
  6.         // 接收派发任务请求。然后将任务放置到任务队列waitSubmitQueue中,等待workerManagerThread去处理
  7.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
  8.         // 停止任务请求
  9.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
  10.         // 接收任务运行中的ack请求
  11.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
  12.                 taskExecuteRunningAckProcessor);
  13.         // 接收任务结果的ack请求
  14.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
  15.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
  16.         this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
  17.         this.nettyRemotingServer.registerProcessor(CommandType.TASK_SAVEPOINT_REQUEST, taskSavePointProcessor);
  18.         // logger server
  19.         this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, loggerRequestProcessor);
  20.         this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
  21.         this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
  22.         this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
  23.         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
  24.         this.nettyRemotingServer.start();
  25.         LOGGER.info("Worker rpc server started");
  26.     }
复制代码
此处以TASK_DISPATCH_REQUEST为例进行描述。当有任务从master派发请求时,worker会接受TASK_DISPATCH_REQUEST的RPC请求,然后触发process处理器taskDispatchProcessor(org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor#process)的处理:
  1. public void process(Channel channel, Command command) {
  2.             ...
  3.             TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
  4.             ...
  5.             // set cache, it will be used when kill task
  6.             TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
  7.             // 设置执行任务的worker地址
  8.             taskExecutionContext.setHost(workerConfig.getWorkerAddress());
  9.             // 设置任务执行日志的目录
  10.             taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
  11.             // 构建任务执行线程。整个任务执行需要依赖该线程
  12.             WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder
  13.                     .createWorkerDelayTaskExecuteRunnableFactory(
  14.                             taskExecutionContext,
  15.                             workerConfig,
  16.                             workflowMasterAddress,
  17.                             workerMessageSender,
  18.                             alertClientService,
  19.                             taskPluginManager,
  20.                             storageOperate)
  21.                     .createWorkerTaskExecuteRunnable();
  22.             // submit task to manager
  23.             // 提交到一个task队列,然后有消费者消费该队列
  24.             boolean offer = workerManager.offer(workerTaskExecuteRunnable);
  25.             ...
  26.     }
复制代码
最终会提交给waitSubmitQueue队列,后续有消费者不断进行消费。
  1. public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
  2.         if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
  3.             return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
  4.         }
  5.         if (waitSubmitQueue.size() > workerExecThreads) {
  6.             logger.warn("Wait submit queue is full, will retry submit task later");
  7.             WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
  8.             // if waitSubmitQueue is full, it will wait 1s, then try add
  9.             ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
  10.             if (waitSubmitQueue.size() > workerExecThreads) {
  11.                 return false;
  12.             }
  13.         }
  14.         return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
  15.     }
复制代码
2.2 任务插件初始化:


  • 描述:task的相关模板操作,如创建task、解析task参数、获取task资源信息等。对于该插件,api、master、worker都需要进行注册,在worker的作用是获取文件资源、创建任务信息等。
2.3 worker 注册:


  • 描述:将worker信息注册至注册中心(本文以zookeeper为例),同时监听注册变化情况。
  • 代码入口:org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient#start
  1. public void start() {
  2.         try {
  3.             // 1、将worker信息注册至注册中心(本文以zookeeper为例)
  4.             registry();
  5.             // 2、监听自身与注册中心的连接情况;
  6.             registryClient.addConnectionStateListener(
  7.                     new WorkerConnectionStateListener(workerConfig, registryClient, workerConnectStrategy));
  8.         } catch (Exception ex) {
  9.             throw new RegistryException("Worker registry client start up error", ex);
  10.         }
  11.     }
复制代码
2.4 worker管理线程:


  • 描述:不断从任务队列中waitSubmitQueue领取任务,提交到线程池处理。
  • 代码入口:org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread#run
[code]public void run() {        Thread.currentThread().setName("Worker-Execute-Manager-Thread");        while (!ServerLifeCycleManager.isStopped()) {            try {                if (!ServerLifeCycleManager.isRunning()) {                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);                }                // 1、如果任务线程池线程个数够用,则处理任务                if (this.getThreadPoolQueueSize()

相关推荐

您需要登录后才可以回帖 登录 | 立即注册