仁夹篇 发表于 2026-1-12 22:10:00

【大数据 & AI】Flink Agents 源码解读 --- (6) --- ActionTask

【大数据 & AI】Flink Agents 源码解读 --- (6) ---ActionTask


目录

[*]【大数据 & AI】Flink Agents 源码解读 --- (6) ---ActionTask

[*]0x00 概要
[*]0x01 基础知识

[*]1.1 相关组件
[*]1.2 ActionTask
[*]1.3 PythonActionTask

[*]1.3.1 定义
[*]1.3.2 PythonActionTask 与 Function 的关系
[*]1.3.3 与其他组件的关系
[*]1.3.4 调用流程

[*]1.4 PythonGeneratorActionTask
[*]1.5 JavaActionTask
[*]1.6 ActionTaskResult 结构

[*]0x02 ActionTask 切分机制

[*]2.1 切分方式
[*]2.2 实现细节
[*]2.3 关键点



0x00 概要

ActionTask 是 Action 执行的基本单位,代表一个可执行的任务块。一个完整的 Action 可能会被切分成多个 ActionTask 来执行。ActionTask 在整体流程的位置如下:
Action Code → Agent → AgentPlan → ActionExecutionOperator → ActionTask → Flink Runtime0x01 基础知识

ActionTask 是 Action 执行过程中的一个片段,用于支持复杂的执行逻辑(如异步处理),对应关系如下:

[*]一个 Action 对应一个函数
在 AgentPlan 中,每个 Action 包含一个执行函数 (exec),通常是 PythonFunction 或 JavaFunction,例如在 tool_call_action.py 中:
TOOL_CALL_ACTION = Action (
    name="tool_call_action",
    exec=PythonFunction.from_callable (process_tool_request), // 一个函数
    listen_event_types=[...]
)

[*]一个 Action 可能产生多个 ActionTask

[*]ActionTask 是 Action 的执行时表示,可以看作是 Action 的 “执行片段”
[*]一个 Action 可能在执行过程中被拆分为多个 ActionTask,特别是在处理异步操作时

1.1 相关组件

ActionTask 概念的相关组件如下
组件核心功能JavaActionTask执行 Java 函数PythonActionTask执行 Python 函数,支持异步 / 生成器模式,桥接 Java 与 Python 生态LocalRunnerContext本地执行上下文,模拟 Flink 分布式状态,管理事件队列、key 隔离状态、资源访问ActionTaskResult动作执行结果载体,包含是否完成、输出事件、下一个待执行任务(若有)PythonGeneratorActionTask处理 Python 生成器的异步任务,持续执行直到完成所有异步操作Tool 相关机制支持装饰器(@tool)、add_resource 等方式注册工具,通过 TOOL_CALL_ACTION 触发执行在系统中的架构如下

1.2 ActionTask

我们接下来看看 ActionTask 的具体实现。
ActionTask 是基类。
/**
* This class represents a task related to the execution of an action in {@link
* ActionExecutionOperator}.
*
* <p>An action is split into multiple code blocks, and each code block is represented by an {@code
* ActionTask}. You can call {@link #invoke()} to execute a code block and obtain invoke result
* {@link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next
* {@code ActionTask} via {@link ActionTaskResult#getGeneratedActionTask()} and continue executing
* it.
*/
public abstract class ActionTask {
    protected final Object key;
    protected final Event event;
    protected final Action action;
    /**
   * Since RunnerContextImpl contains references to the Operator and state, it should not be
   * serialized and included in the state with ActionTask. Instead, we should check if a valid
   * RunnerContext exists before each ActionTask invocation and create a new one if necessary.
   */
    protected transient RunnerContextImpl runnerContext;

    public ActionTask(Object key, Event event, Action action) {
      this.key = key;
      this.event = event;
      this.action = action;
    }

    public RunnerContextImpl getRunnerContext() {
      return runnerContext;
    }

    public void setRunnerContext(RunnerContextImpl runnerContext) {
      this.runnerContext = runnerContext;
    }

    public Object getKey() {
      return key;
    }

    /** Invokes the action task. */
    public abstract ActionTaskResult invoke() throws Exception;

    public class ActionTaskResult {
      private final boolean finished;
      private final List<Event> outputEvents;
      private final Optional generatedActionTaskOpt;

      public ActionTaskResult(
                boolean finished,
                List<Event> outputEvents,
                @Nullable ActionTask generatedActionTask) {
            this.finished = finished;
            this.outputEvents = outputEvents;
            this.generatedActionTaskOpt = Optional.ofNullable(generatedActionTask);
      }

      public boolean isFinished() {
            return finished;
      }

      public List<Event> getOutputEvents() {
            return outputEvents;
      }

      public Optional getGeneratedActionTask() {
            return generatedActionTaskOpt;
      }
    }
}1.3 PythonActionTask

PythonActionTask 是一个专门用于执行 Python 动作任务的特殊 ActionTask 实现。它的主要作用包括:

[*]执行 Python 函数:调用 Python 函数来处理事件
[*]处理异步操作:支持 Python 中的异步操作,通过生成器机制实现
[*]桥接 Java 和 Python:作为 Java 端和 Python 端之间的桥梁,协调两者间的交互
1.3.1 定义

PythonActionTask 对应一个 Python 函数(更准确地说是一个 PythonFunction 对象),这个函数是在创建 Action 时定义的,存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装,而是使其能够在 Flink Agents 框架中正确执行,并支持框架所需的高级特性。它提供了以下附加价值:

[*]复杂逻辑:PythonActionTask 不仅仅是执行函数,还负责处理复杂的交互逻辑
[*]执行环境管理:为函数提供合适的执行上下文
[*]异步支持:通过生成器机制支持长时间运行的操作
[*]事件处理:管理和传递执行过程中产生的事件
[*]状态维护:在整个执行过程中维护必要的状态信息
PythonActionTask 在系统架构中的位置和交互关系如下:

代码如下:
public class PythonActionTask extends ActionTask {
    public ActionTaskResult invoke() throws Exception {
      PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();

      // 这里执行实际的 Python 函数
      String pythonGeneratorRef =
            pythonActionExecutor.executePythonFunction(
                (PythonFunction) action.getExec(), // <-- 这就是对应的函数
                (PythonEvent) event,
                runnerContext);

      // 处理异步情况
      if (pythonGeneratorRef != null) {
            // 如果函数返回了生成器,则创建新的任务继续执行
            ActionTask tempGeneratedActionTask =
                new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
            tempGeneratedActionTask.setRunnerContext(runnerContext);
      if (pythonGeneratorRef != null) {
            // 如果函数返回了生成器,则创建新的任务继续执行
            ActionTask tempGeneratedActionTask =
                new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
            tempGeneratedActionTask.setRunnerContext(runnerContext);
            return tempGeneratedActionTask.invoke();
      }
      // 否则表示函数已执行完毕
      return new ActionTaskResult(
            true,
            runnerContext.drainEvents(event.getSourceTimestamp()),
            null);            这种设计允许将带有异步操作的复杂动作分解为可管理的单元,同时保持执行语义和状态一致性。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

届表 发表于 2026-1-19 01:44:26

感谢分享,学习下。

伯绮梦 发表于 2026-1-21 20:40:06

不错,里面软件多更新就更好了

秦晓曼 发表于 2026-1-22 04:31:41

不错,里面软件多更新就更好了

胆饬 发表于 2026-1-23 07:10:47

过来提前占个楼

准挝 发表于 2026-1-23 19:15:26

用心讨论,共获提升!

萨瑞饨 发表于 2026-1-24 07:22:41

感谢分享

缑娅瑛 发表于 2026-1-24 08:26:00

感谢,下载保存了

颜清华 发表于 4 天前

东西不错很实用谢谢分享

裸历 发表于 17 小时前

前排留名,哈哈哈

肿圬后 发表于 2 小时前

谢谢分享,试用一下
页: [1]
查看完整版本: 【大数据 & AI】Flink Agents 源码解读 --- (6) --- ActionTask