【大数据 & AI】Flink Agents 源码解读 --- (4) --- AgentPlan
【大数据 & AI】Flink Agents 源码解读 --- (4) ---AgentPlan目录
[*]【大数据 & AI】Flink Agents 源码解读 --- (4) ---AgentPlan
[*]0x00 概要
[*]0x01 基本概念
[*]1.1 定义
[*]1.2 在系统中的位置和作用
[*]1.3 流程
[*]0x02 具体功能
[*]2.1 编译
[*]2.2 提供数据
[*]0x03 与 Flink 原生的关系
[*]3.1 核心对应点(AgentPlan ↔ JobGraph)
[*]3.2 非对应点
[*]3.3 总结
[*]0x04 与智能 Agent “规划(Planning)” 的核心相似性
[*]4.1 核心目标
[*]4.2 核心特征
[*]4.2.1 Agent规划
[*]4.2.2 AgentPlan
[*]0xFF 参考
0x00 概要
AgentPlan 是 Flink Agent 框架中的一个核心组件,它起到连接用户定义的 Agent 和实际执行环境之间的桥梁作用。AgentPlan 的核心职责如下:
[*]解析用户定义的 Agent 中的动作(Action)、事件监听规则、资源提供者(ResourceProvider);
[*]提供 AgentPlan 运行时的核心接口(获取事件对应的动作、获取资源、读取动作配置等);
[*]统一管理 Agent 依赖的各类资源(如模型、工具、Prompt、MCP 服务等)的创建与复用。
0x01 基本概念
AgentPlan 本质上是将用户友好的Agent定义转换为运行时可执行的结构化表示,使得执行引擎可以高效地处理Agent逻辑。
1.1 定义
定义如下:
class AgentPlan(BaseModel):
"""
从用户自定义 Agent 编译得到的智能体执行计划
核心作用:封装 Agent 运行所需的动作、事件映射、资源、配置等核心信息
"""
# 动作名称到动作对象的映射
actions: Dict
# 事件类型(字符串格式)到监听该事件的动作名称列表的映射
actions_by_event: Dict]
# 资源提供者映射:第一层是资源类型,第二层是资源名称,值为对应资源提供者
resource_providers: Dict] | None = None
# Agent 的全局配置
config: AgentConfiguration | None = None
# 私有缓存:已创建的资源实例(避免重复初始化),键为 (资源类型, 资源名称)
__resources: Dict] = {} 1.2 在系统中的位置和作用
AgentPlan 在系统中的位置和作用如下:
用户定义的Agent
↓
↓ (AgentPlan.from_agent)
AgentPlan(编译后的执行计划)
↓
↓(LocalRunner 使用)
LocalRunner(实际执行) AgentPlan 与 Flink 集群的关系如下:
[*]部署阶段:AgentPlan 被序列化并通过 CompileUtils 传递给运行时操作符(如 ActionExecutionOperator)。
[*]运行时行为:操作符使用 AgentPlan 决定如何处理事件和执行动作,但不会修改 AgentPlan 本身。
[*]资源配置:实际资源(如模型连接、工具等)在运行时通过 ResourceProvider 提供,这不改变 AgentPlan 的结构。
并行度调整的影响如下:
[*]状态恢复:故障恢复或手动调整并行度时,Flink 会重新分配键控状态,AgentPlan 结构保持一致。
[*]检查点兼容性:若需更改 AgentPlan(如添加新动作),必须考虑检查点兼容性和状态演化策略。
1.3 流程
AgentPlan 的流程如下:
[*]编译:将用户定义的Agent 转换为可执行的计划
[*]映射管理:维护事件类型和动作之间的映射关系
[*]资源提供:管理和提供各种资源(模型、工具、提示等)
[*]配置存储:存储和提供动作的配置参数
[*]解耦:将用户接口和执行实现解耦,提供统一的执行计划接口
0x02 具体功能
2.1 编译
from_agent 将用户定义的高级 Agent 对象转换为可执行的计划 AgentPlan。流程如下:
代码如下:
@staticmethod
def from_agent(agent: Agent, config: AgentConfiguration) -> "AgentPlan":
"""
核心工厂方法:将用户自定义的 Agent 编译为 AgentPlan
:param agent: 用户定义的 Agent 实例
:param config: Agent 的全局配置
:return: 编译后的 AgentPlan 实例
"""
# 1. 收集 Agent 中的所有动作(自定义动作 + 内置动作)
actions = {}
actions_by_event = {}
# 合并用户自定义动作和框架内置动作
for action in _get_actions(agent) + BUILT_IN_ACTIONS:
# 校验动作名称唯一性,避免重复
assert action.name not in actions, f"Duplicate action name: {action.name}"
actions = action
# 构建事件类型到动作名称的映射
for event_type in action.listen_event_types:
if event_type not in actions_by_event:
actions_by_event = []
actions_by_event.append(action.name)
# 2. 收集 Agent 中的所有资源提供者
resource_providers = {}
for provider in _get_resource_providers(agent):
type = provider.type
if type not in resource_providers:
resource_providers = {}
name = provider.name
# 校验资源名称在同类型下的唯一性
assert name not in resource_providers, f"Duplicate resource name: {name}"
resource_providers = provider
# 3. 创建并返回 AgentPlan 实例
return AgentPlan(
actions=actions,
actions_by_event=actions_by_event,
resource_providers=resource_providers,
config=config,
)2.2 提供数据
AgentPlan 维护了事件类型到动作的映射关系,用于驱动执行流程。
# 存储结构
actions_by_event: Dict] # 事件类型 --> 动作名称列表
# 查询方法
def get_actions(self, event_type: str) -> List:
"""
获取监听指定事件类型的所有动作
:param event_type: 事件类型(字符串格式)
:return: 动作实例列表
"""
return for name in self.actions_by_event] AgentPlan 管理所有资源的提供者,并在需要时实例化资源。具体代码如下:
def get_resource(self, name: str, type: ResourceType) -> Resource:
"""
获取指定类型+名称的资源实例(懒加载 + 缓存复用)
:param name: 资源名称
:param type: 资源类型
:return: 资源实例
"""
# 初始化该类型的资源缓存
if type not in self.__resources:
self.__resources = {}
# 若资源未创建,则通过资源提供者创建并缓存
if name not in self.__resources:
resource_provider = self.resource_providers
# 调用资源提供者的 provide 方法创建资源(支持递归获取依赖资源)
resource = resource_provider.provide(
get_resource=self.get_resource, config=self.config
)
self.__resources = resource
# 返回缓存的资源实例
return self.__resourcesAgentPlan 存储和提供动作的配置信息。
def get_action_config(self, action_name: str) -> Dict:
"""
获取指定动作的配置
:param action_name: 动作名称
:return: 动作配置字典
"""
return self.actions.config
def get_action_config_value(self, action_name: str, key: str) -> Any:
"""
获取指定动作配置中的某个键值
:param action_name: 动作名称
:param key: 配置键名
:return: 配置值(不存在则返回 None)
"""
return self.actions.config.get(key, None)0x03 与 Flink 原生的关系
Flink 原生的 “Plan 体系”(StreamGraph → JobGraph → ExecutionGraph)是 AgentPlan 的底层支撑,其中 AgentPlan 最直接对应原生 Flink 的 JobGraph,而非笼统的 “Flink Plan”。
这是因为,Flink 中 “Plan” 不是单一组件,而是从 “逻辑定义” 到 “物理执行” 的三层编译产物,这是理解对应关系的基础。AgentPlan 是 Flink Agents 对 Agent 逻辑编译后的 “可执行计划”,其核心定位和原生 Flink 的 JobGraph 完全对齐,而非笼统的 “Plan”。
原生 Plan 层级核心作用关键特征StreamGraph逻辑拓扑(用户视角)由 DataStream API 代码生成,描述 “数据处理的逻辑步骤”,不涉及并行度、资源分配JobGraph可执行计划(集群视角)StreamGraph 编译后的产物,补充并行度、算子链、中间结果传递规则,是 “系统能识别的执行计划”ExecutionGraph物理执行图(运行时视角)JobGraph 提交到集群后生成,绑定 TaskManager、Slot、物理资源,是 “正在运行的拓扑”3.1 核心对应点(AgentPlan ↔ JobGraph)
我们来看看AgentPlan和JobGraph之间的关联。
[*]角色一致:都是 “编译层产物”,连接 “高层业务定义” 和 “底层物理执行”:
[*]原生:StreamGraph(用户写的 DataStream 逻辑)→ JobGraph(集群可执行计划);
[*]Flink Agents:Agent(用户定义的 Agent 行为逻辑)→ AgentPlan(系统可调度的动作执行计划)。
[*]内容一致:都包含 “执行规则” 而非 “纯逻辑”:
[*]JobGraph 包含算子并行度、输入输出流、状态存储规则;
[*]AgentPlan 包含动作触发规则、资源映射关系、Action 与 Event 的绑定规则。
[*]作用一致:都是 “中间层”,不直接运行,而是给底层执行组件(Operator/TaskManager)提供执行依据。
3.2 非对应点
我们再来看看AgentPlan 不对应哪些。
[*]AgentPlan ≠ StreamGraph:StreamGraph 是 “用户逻辑的直接映射”,无编译优化;AgentPlan 是 Agent 逻辑的 “编译优化后产物”,已包含系统可执行的规则(如动作调度优先级)。
[*]AgentPlan ≠ ExecutionGraph:ExecutionGraph 是 “运行时物理拓扑”,绑定具体资源;AgentPlan 是 “预执行计划”,不涉及物理资源分配(由 Flink 集群自动处理)。
3.3 总结
[*]Flink 原生的 “Plan” 是三层体系(StreamGraph/JobGraph/ExecutionGraph),AgentPlan 并非对应 “所有 Plan”,而是精准对应 JobGraph;
[*]核心逻辑:AgentPlan 是 Flink Agents 框架对 Agent 高层逻辑的 “编译产物”,和 JobGraph 一样承担 “承上启下” 的作用 —— 把用户的业务语义转换为系统可执行的规则;
[*]边界:AgentPlan 是 “逻辑可执行计划”,不涉及物理资源分配(这部分仍由原生 Flink 的 ExecutionGraph 处理);
0x04 与智能 Agent “规划(Planning)” 的核心相似性
我们再来看看 Flink Agents 的 AgentPlan 与智能 Agent “规划” 的核心相似性
Flink Agents 中的 AgentPlan 是面向流式计算场景的任务执行规划,而智能 Agent 领域的 “规划(Planning)” 是 Agent 为达成目标制定行动序列的核心能力,二者虽应用场景(流式计算 vs 通用智能)不同,但底层逻辑、核心特征高度契合,具体相似性可从以下维度拆解:
4.1 核心目标
核心目标都是:为 “达成既定目标” 制定可执行路径。
[*]智能 Agent 的规划:核心是基于自身目标(如 “导航到目的地”“完成订单处理”)、环境状态(如 “当前位置”“订单队列长度”)和可用动作(如 “左转”“分配订单”),生成一套有序的行动序列,本质是 “从目标反推可落地的步骤”,确保 Agent 能通过执行该序列完成任务。
[*]Flink Agents 的 AgentPlan:核心是基于流式计算任务目标(如 “实时统计订单量”“清洗异常日志”)、Flink 集群的资源状态(如 “算子并行度”“节点负载”)和可用计算动作(如 “数据分片”“窗口聚合”“状态存储”),生成一套结构化的流式任务执行方案,本质是 “将抽象的计算目标转化为 Flink 可执行的算子调度、资源分配、数据流转步骤”。
相似性:二者均以 “目标导向” 为核心,规划的最终目的是将抽象目标拆解为可落地、可验证的执行路径,而非单纯的 “任务罗列”。
4.2 核心特征
4.2.1 Agent规划
Agent规划是动态逻辑。
智能 Agent 的规划的执行逻辑为:遵循 “规划生成→执行动作→感知环境反馈→调整规划” 的闭环(如机器人 “规划抓取动作→执行抓取→感知未抓稳→重新规划抓取角度”)。
智能 Agent 的规划不是 “一次性规划”,而是与执行过程形成闭环,通过反馈持续优化规划,确保目标最终达成。
4.2.2 AgentPlan
AgentPlan 是静态执行蓝图,在作业部署时确定,运行期间不因 Flink 集群配置变化而动态调整。任何代理逻辑更改都需重新编译并部署新的 AgentPlan。
AgentPlan 在编译时生成:AgentPlan 在作业提交前从用户定义的 Agent 类编译而来,是静态计划。它包含所有动作(actions)、资源配置(resource providers)以及事件监听关系(actions_by_event)。
静态结构:一旦创建,AgentPlan 的结构在整个作业生命周期内保持不变。
0xFF 参考
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 这个好,看起来很实用 收藏一下 不知道什么时候能用到 感谢分享,下载保存了,貌似很强大 谢谢分享,试用一下 感谢,下载保存了 喜欢鼓捣这些软件,现在用得少,谢谢分享! 谢谢分享,辛苦了 感谢分享,下载保存了,貌似很强大 不错,里面软件多更新就更好了 感谢分享 新版吗?好像是停更了吧。 鼓励转贴优秀软件安全工具和文档! 这个好,看起来很实用 这个有用。 这个好,看起来很实用 很好很强大我过来先占个楼 待编辑 过来提前占个楼 懂技术并乐意极积无私分享的人越来越少。珍惜 收藏一下 不知道什么时候能用到
页:
[1]
2