找回密码
 立即注册
首页 业界区 安全 在 Spring AI 中自定义 Tool 调用返回值——实现 TodoLi ...

在 Spring AI 中自定义 Tool 调用返回值——实现 TodoList 提醒注入

谷江雪 昨天 14:20
最近发现了一个极简 Claude Code 的文档:https://learn.shareai.run/en/s03/
其中有一个实用技巧:如何在适当时机提醒 AI 更新 TodoList? 文档中的做法是:当连续三次工具调用都没有触发 todo 更新操作时,在 Function Call 返回值的第一个位置插入一条提醒:
  1. <reminder>Update your todos.</reminder>
复制代码
对应的 Python 实现如下:
  1. if rounds_since_todo >= 3 and messages:    last = messages[-1]    if last["role"] == "user" and isinstance(last.get("content"), list):        last["content"].insert(0, {            "type": "text",            "text": "<reminder>Update your todos.</reminder>",        })
复制代码
那么在 Spring AI 中能否实现同样的效果?经过一番研究,答案是可以的。本文记录实现过程。
代码仓库

项目完整 LangChain4j 代码实现:https://www.codefather.cn/course/1948291549923344386
完整的代码地址:https://github.com/lieeew/leikooo-code-mother
依赖版本

对应的 SpringAI 版本和 SpringBoot 依赖:
  1. <properties>
  2.     <java.version>21</java.version>
  3.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  4.     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  5.     <spring-boot.version>4.0.1</spring-boot.version>
  6.     <spring-ai.version>2.0.0-M2</spring-ai.version>
  7. </properties>
  8. <dependencies>
  9.     <dependency>
  10.         <groupId>org.springframework.ai</groupId>
  11.         spring-ai-starter-model-minimax</artifactId>
  12.     </dependency>
  13. </dependencies>
  14. <dependencyManagement>
  15.     <dependencies>
  16.         <dependency>
  17.             <groupId>org.springframework.boot</groupId>
  18.             spring-boot-dependencies</artifactId>
  19.             <version>${spring-boot.version}</version>
  20.             <type>pom</type>
  21.             <scope>import</scope>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.springframework.ai</groupId>
  25.             spring-ai-bom</artifactId>
  26.             <version>${spring-ai.version}</version>
  27.             <type>pom</type>
  28.             <scope>import</scope>
  29.         </dependency>
  30.     </dependencies>
  31. </dependencyManagement>
  32.        
复制代码
定义 TodoList Tool

首先需要定义供 LLM 调用的 Tool。以下是完整实现,包含读取和写入两个操作,并通过 Caffeine 本地缓存按会话隔离存储:
  1. /**
  2. * @author leikooo
  3. * @date 2025/12/31
  4. */
  5. @Component
  6. public class TodolistTools extends BaseTools {
  7.     private static final int MAX_TODOS = 20;
  8.     private static final Set<String> VALID_STATUSES = Set.of("pending", "in_progress", "completed");
  9.     private static final Map<String, String> STATUS_MARKERS = Map.of(
  10.             "pending", "[ ]",
  11.             "in_progress", "[>]",
  12.             "completed", "[x]"
  13.     );
  14.     private record TodoItem(String id, String text, String status) {}
  15.     private static final Cache<String, List<TodoItem>> TODOLIST_CACHE = Caffeine.newBuilder()
  16.             .maximumSize(1000)
  17.             .expireAfterWrite(Duration.ofMinutes(30))
  18.             .build();
  19.     @Tool(description = "Update task list. Track progress on multi-step tasks. Pass the full list of items each time (replaces previous list). " +
  20.             "Each item must have id, text, status. Status: pending, in_progress, completed. Only one item can be in_progress."
  21.     )
  22.     public String todoUpdate(
  23.             @ToolParam(description = "Full list of todo items. Each item: id (string), text (string), status (pending|in_progress|completed).")
  24.             List<Map<String, Object>> items,
  25.             ToolContext toolContext
  26.     ) {
  27.         try {
  28.             String conversationId = ConversationUtils.getToolsContext(toolContext).appId();
  29.             if (items == null || items.isEmpty()) {
  30.                 TODOLIST_CACHE.invalidate(conversationId);
  31.                 return "No todos.";
  32.             }
  33.             if (items.size() > MAX_TODOS) {
  34.                 return "Error: Max " + MAX_TODOS + " todos allowed";
  35.             }
  36.             List<TodoItem> validated = validateAndConvert(items);
  37.             TODOLIST_CACHE.put(conversationId, validated);
  38.             return render(validated);
  39.         } catch (IllegalArgumentException e) {
  40.             return "Error: " + e.getMessage();
  41.         }
  42.     }
  43.     private List<TodoItem> validateAndConvert(List<Map<String, Object>> items) {
  44.         int inProgressCount = 0;
  45.         List<TodoItem> result = new ArrayList<>(items.size());
  46.         for (int i = 0; i < items.size(); i++) {
  47.             Map<String, Object> item = items.get(i);
  48.             String id = String.valueOf(item.getOrDefault("id", String.valueOf(i + 1))).trim();
  49.             String text = String.valueOf(item.getOrDefault("text", "")).trim();
  50.             String status = String.valueOf(item.getOrDefault("status", "pending")).toLowerCase();
  51.             if (StringUtils.isBlank(text)) {
  52.                 throw new IllegalArgumentException("Item " + id + ": text required");
  53.             }
  54.             if (!VALID_STATUSES.contains(status)) {
  55.                 throw new IllegalArgumentException("Item " + id + ": invalid status '" + status + "'");
  56.             }
  57.             if ("in_progress".equals(status)) {
  58.                 inProgressCount++;
  59.             }
  60.             result.add(new TodoItem(id, text, status));
  61.         }
  62.         if (inProgressCount > 1) {
  63.             throw new IllegalArgumentException("Only one task can be in_progress at a time");
  64.         }
  65.         return result;
  66.     }
  67.     @Tool(description = "Read the current todo list for this conversation. Use this to check progress and see what tasks remain.")
  68.     public String todoRead(ToolContext toolContext) {
  69.         String conversationId = ConversationUtils.getToolsContext(toolContext).appId();
  70.         List<TodoItem> items = TODOLIST_CACHE.getIfPresent(conversationId);
  71.         return items == null || items.isEmpty() ? "No todos." : render(items);
  72.     }
  73.     private String render(List<TodoItem> items) {
  74.         if (items == null || items.isEmpty()) {
  75.             return "No todos.";
  76.         }
  77.         StringBuilder sb = new StringBuilder("\n\n");
  78.         for (TodoItem item : items) {
  79.             String marker = STATUS_MARKERS.getOrDefault(item.status(), "[ ]");
  80.             sb.append(marker).append(" #").append(item.id()).append(": ").append(item.text()).append("\n\n");
  81.         }
  82.         long done = items.stream().filter(t -> "completed".equals(t.status())).count();
  83.         sb.append("\n(").append(done).append("/").append(items.size()).append(" completed)");
  84.         return sb.append("\n\n").toString();
  85.     }
  86.     @Override
  87.     String getToolName() { return "Todo List Tool"; }
  88.     @Override
  89.     String getToolDes() { return "Read and write task todo lists to track progress"; }
  90. }
复制代码
问题分析:为什么不能在普通 Advisor 中拦截工具调用?

通过阅读源码 org.springframework.ai.minimax.MiniMaxChatModel#stream 可以发现,框架内部会在 ChatModel 层直接执行 Tool 调用,而不是将其透传给 Advisor 链。核心执行逻辑如下:
  1. // Tool 调用的核心逻辑,如下:
  2. Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
  3.     if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(requestPrompt.getOptions(), response)) {
  4.         // FIXME: bounded elastic needs to be used since tool calling
  5.         //  is currently only synchronous
  6.         return Flux.deferContextual(ctx -> {
  7.             ToolExecutionResult toolExecutionResult;
  8.             try {
  9.                 ToolCallReactiveContextHolder.setContext(ctx);
  10.                 toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
  11.             }
  12.             finally {
  13.                 ToolCallReactiveContextHolder.clearContext();
  14.             }
  15.             return Flux.just(ChatResponse.builder().from(response)
  16.                     .generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
  17.                     .build());
  18.         }).subscribeOn(Schedulers.boundedElastic());
  19.     }
  20.     return Flux.just(response);
  21. })
  22. .doOnError(observation::error)
  23. .doFinally(signalType -> observation.stop())
  24. .contextWrite(ctx -> ctx.put(ObservationThreadLocalAccessor.KEY, observation));
复制代码
这意味着,如果我们在外层 Advisor 中尝试拦截 tool_call,此时工具已经执行完毕,并且无法识别到工具调用。所以我准备使用我自己写的 MiniMaxChatModel 覆盖掉这个源码的逻辑,之后再 Advisor 接管这个 Tool 执行。
验证思路:能否通过 Advisor 接管工具执行?

我们需要在自己的项目目录创建一个 org.springframework.ai.minimax.MiniMaxChatModel 具体文件内容可以访问 https://www.codecopy.cn/post/7lonmm 获取完整的代码。详细代码位置如下图所示:
1.png

这样写好之后就可以让工具调用信号透传到 Advisor 层,判断是否有 Tool 调用。验证用的 Advisor 如下:
  1. @Slf4j
  2. public class FindToolAdvisor implements StreamAdvisor {
  3.     @Override
  4.     public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
  5.         return Flux.deferContextual(contextView -> {
  6.             log.info("Advising stream");
  7.             return streamAdvisorChain.nextStream(chatClientRequest).doOnNext(streamResponse -> {
  8.                 boolean hasToolCalls = streamResponse.chatResponse().hasToolCalls();
  9.                 log.info("Found tool calls: {}", hasToolCalls);
  10.             });
  11.         });
  12.     }
  13.     @Override
  14.     public String getName() { return "FindToolAdvisor"; }
  15.     @Override
  16.     public int getOrder() { return 0; }
  17. }
复制代码
  1. @Component
  2. public class StreamApplication implements CommandLineRunner {
  3.     @Resource
  4.     private ChatModel chatModel;
  5.     @Override
  6.     public void run(String... args) throws Exception {
  7.         ChatClient chatClient = ChatClient.builder(chatModel)
  8.                 .defaultTools(FileSystemTools.builder().build())
  9.                 .defaultAdvisors(new FindToolAdvisor())
  10.                 .build();
  11.         ChatClient.StreamResponseSpec stream = chatClient.prompt("""
  12.                 帮我写一个简单的 HTML 页面,路径是 E:\\TEMPLATE\\spring-skills 不超过 300 行代码
  13.                 """).stream();
  14.         stream.content().subscribe(System.out::println);
  15.     }
  16. }
复制代码
配置文件:
  1. spring:
  2.   ai:
  3.     minimax:
  4.       api-key: sk-cp-xxxxx
  5.       chat:
  6.         options:
  7.           model: MiniMax-M2.5
复制代码
测试结果证明工具调用信号可以被成功拦截,方案可行:
2.png

改造项目:实现 ExecuteToolAdvisor

参考 Spring AI 社区中一个尚未合并的 PR(#5383),我们实现了 ExecuteToolAdvisor,主要做了两件事:

  • 工具调用 JSON 格式容错:捕获 JSON 解析异常,最多重试 3 次再抛出,提升大模型调用 Tool 时格式不规范的容错能力。
  • TodoList 提醒注入:连续 3 次工具调用均未触发 todoUpdate 时,在 ToolResponseMessage 的第一个位置注入提醒,引导 AI 及时更新任务列表。
⚠️ 注意 order 顺序:由于该 Advisor 接管了工具执行,它的 order 值应尽量大(即靠后执行)。若 order 较小,可能导致后续 Advisor 的 doFinally 在每次工具调用时都被触发(比如后面的 buildAdvisor、versionAdvisor 只需要执行一次),而非在整个对话结束时触发一次。本实现中使用 Integer.MAX_VALUE - 100。
  1. /** * 手动执行 tool 的 StreamAdvisor:关闭框架内部执行,自行执行并可在工具返回值中注入提醒(如更新 todo)。 * * @author leikooo * @date 2026/3/14 */@Slf4j@Componentpublic class ExecuteToolAdvisor implements StreamAdvisor {    private static final String TODO_REMINDER = "<reminder>Update your todos.</reminder>";    private static final String JSON_ERROR_MESSAGE =            "Tool call JSON parse failed. Fix and retry.\n"                    + "Rules: strict RFC8259 JSON, no trailing commas, no comments, "                    + "no unescaped control chars in strings (escape newlines as \\n, tabs as \\t), "                    + "all keys double-quoted.";    private static final int MAX_TOOL_RETRY = 3;    private static final int ORDER = Integer.MAX_VALUE - 100;    private static final String TODO_METHOD = "todoUpdate";    private static final int REMINDER_THRESHOLD = 3;    /**     * 三次工具没有使用 todoTool 那么就在 tool_result[0] 位置添加 TODO_REMINDER     */    private final Cache roundsSinceTodo = Caffeine.newBuilder()            .maximumSize(10_00)            .expireAfterWrite(Duration.ofMinutes(30))            .build();                @Resource    private ToolCallingManager toolCallingManager;    @Override    public Flux adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {        Assert.notNull(streamAdvisorChain, "streamAdvisorChain must not be null");        Assert.notNull(chatClientRequest, "chatClientRequest must not be null");        if (chatClientRequest.prompt().getOptions() == null                || !(chatClientRequest.prompt().getOptions() instanceof ToolCallingChatOptions)) {            throw new IllegalArgumentException(                    "ExecuteToolAdvisor requires ToolCallingChatOptions to be set in the ChatClientRequest options.");        }        var optionsCopy = (ToolCallingChatOptions) chatClientRequest.prompt().getOptions().copy();        optionsCopy.setInternalToolExecutionEnabled(false);        return internalStream(streamAdvisorChain, chatClientRequest, optionsCopy,                chatClientRequest.prompt().getInstructions(), 0);    }    private Flux internalStream(            StreamAdvisorChain streamAdvisorChain,            ChatClientRequest originalRequest,            ToolCallingChatOptions optionsCopy,            List instructions,            int jsonRetryCount) {        return Flux.deferContextual(contextView -> {            var processedRequest = ChatClientRequest.builder()                    .prompt(new Prompt(instructions, optionsCopy))                    .context(originalRequest.context())                    .build();            StreamAdvisorChain chainCopy = streamAdvisorChain.copy(this);            Flux responseFlux = chainCopy.nextStream(processedRequest);            AtomicReference aggregatedResponseRef = new AtomicReference();            AtomicReference chunksRef = new AtomicReference(new ArrayList());            return new ChatClientMessageAggregator()                    .aggregateChatClientResponse(responseFlux, aggregatedResponseRef::set)                    .doOnNext(chunk -> chunksRef.get().add(chunk))                    .ignoreElements()                    .cast(ChatClientResponse.class)                    .concatWith(Flux.defer(() -> processAggregatedResponse(                            aggregatedResponseRef.get(), chunksRef.get(), processedRequest,                            streamAdvisorChain, originalRequest, optionsCopy, jsonRetryCount)));        });    }    private Flux processAggregatedResponse(            ChatClientResponse aggregatedResponse,            List chunks,            ChatClientRequest finalRequest,            StreamAdvisorChain streamAdvisorChain,            ChatClientRequest originalRequest,            ToolCallingChatOptions optionsCopy,            int retryCount) {        if (aggregatedResponse == null) {            return Flux.fromIterable(chunks);        }        ChatResponse chatResponse = aggregatedResponse.chatResponse();        boolean isToolCall = chatResponse != null && chatResponse.hasToolCalls();        if (isToolCall) {            Assert.notNull(chatResponse, "chatResponse must not be null when hasToolCalls is true");            ChatClientResponse finalAggregatedResponse = aggregatedResponse;            Flux toolCallFlux = Flux.deferContextual(ctx -> {                ToolExecutionResult toolExecutionResult;                try {                    ToolCallReactiveContextHolder.setContext(ctx);                    toolExecutionResult = toolCallingManager.executeToolCalls(finalRequest.prompt(), chatResponse);                } catch (Exception e) {                    if (retryCount < MAX_TOOL_RETRY) {                        List retryInstructions = buildRetryInstructions(finalRequest, chatResponse, e);                        if (retryInstructions != null) {                            return internalStream(streamAdvisorChain, originalRequest, optionsCopy,                                    retryInstructions, retryCount + 1);                        }                    }                    throw e;                } finally {                    ToolCallReactiveContextHolder.clearContext();                }                List historyWithReminder = injectReminderIntoConversationHistory(                        toolExecutionResult.conversationHistory(), getAppId(finalRequest));                if (toolExecutionResult.returnDirect()) {                    return Flux.just(buildReturnDirectResponse(finalAggregatedResponse, chatResponse,                            toolExecutionResult, historyWithReminder));                }                return internalStream(streamAdvisorChain, originalRequest, optionsCopy, historyWithReminder, 0);            });            return toolCallFlux.subscribeOn(Schedulers.boundedElastic());        }        return Flux.fromIterable(chunks);    }    /**     * 获取 AppId     */    private String getAppId(ChatClientRequest finalRequest) {        if (finalRequest.prompt().getOptions() instanceof ToolCallingChatOptions toolCallingChatOptions) {            return toolCallingChatOptions.getToolContext().get(CONVERSATION_ID).toString();        }        throw new BusinessException(ErrorCode.SYSTEM_ERROR);    }    private static List buildRetryInstructions(ChatClientRequest finalRequest,                                                        ChatResponse chatResponse,                                                        Throwable error) {        AssistantMessage assistantMessage = extractAssistantMessage(chatResponse);        if (assistantMessage == null || assistantMessage.getToolCalls() == null                || assistantMessage.getToolCalls().isEmpty()) {            return null;        }        List instructions = new ArrayList(finalRequest.prompt().getInstructions());        instructions.add(assistantMessage);        String errorMessage = buildJsonErrorMessage(error);        List responses = assistantMessage.getToolCalls().stream()                .map(toolCall -> new ToolResponseMessage.ToolResponse(                        toolCall.id(),                        toolCall.name(),                        errorMessage))                .toList();        instructions.add(ToolResponseMessage.builder().responses(responses).build());        return instructions;    }    private static AssistantMessage extractAssistantMessage(ChatResponse chatResponse) {        if (chatResponse == null) {            return null;        }        Generation result = chatResponse.getResult();        if (result != null && result.getOutput() != null) {            return result.getOutput();        }        List results = chatResponse.getResults();        if (results != null && !results.isEmpty() && results.get(0).getOutput() != null) {            return results.get(0).getOutput();        }        return null;    }    private static String buildJsonErrorMessage(Throwable error) {        String detail = ExceptionUtils.getRootCauseMessage(error);        if (detail.isBlank()) {            return JSON_ERROR_MESSAGE;        }        return JSON_ERROR_MESSAGE + "\nError: " + detail;    }    /**     * 对 conversationHistory 中的 TOOL 类消息,在其每个 ToolResponse 的 responseData 后追加提醒。     */    private List injectReminderIntoConversationHistory(List conversationHistory, String appId) {        if (conversationHistory == null || conversationHistory.isEmpty()) {            return conversationHistory;        }        if (!(conversationHistory.getLast() instanceof ToolResponseMessage toolMsg)) {            return conversationHistory;        }        List responses = toolMsg.getResponses();        if (responses.isEmpty()) {            return conversationHistory;        }        ToolResponseMessage.ToolResponse firstResponse = responses.getFirst();        if (!updateRoundsAndCheckReminder(appId, firstResponse.name())) {            return conversationHistory;        }        List newResponses = new ArrayList(responses);        ToolResponseMessage.ToolResponse actualRes = newResponses.removeFirst();        newResponses.add(new ToolResponseMessage.ToolResponse(                firstResponse.id(), "text", TODO_REMINDER));        newResponses.add(actualRes);        List result = new ArrayList(                conversationHistory.subList(0, conversationHistory.size() - 1));        result.add(ToolResponseMessage.builder().responses(newResponses).build());        return result;    }    /**     * 构造 returnDirect 时的 ChatClientResponse,使用注入提醒后的 conversationHistory 生成 generations。     */    private static ChatClientResponse buildReturnDirectResponse(            ChatClientResponse aggregatedResponse,            ChatResponse chatResponse,            ToolExecutionResult originalResult,            List historyWithReminder) {        ToolExecutionResult resultWithReminder = ToolExecutionResult.builder()                .conversationHistory(historyWithReminder)                .returnDirect(originalResult.returnDirect())                .build();        ChatResponse newChatResponse = ChatResponse.builder()                .from(chatResponse)                .generations(ToolExecutionResult.buildGenerations(resultWithReminder))                .build();        return aggregatedResponse.mutate().chatResponse(newChatResponse).build();    }    /**     * updateRoundsAndCheckReminder     * @param appId appId     * @param methodName methodName     * @return 是否需要更新     */    private boolean updateRoundsAndCheckReminder(String appId, String methodName) {        if (TODO_METHOD.equals(methodName)) {            roundsSinceTodo.put(appId, 0);            return false;        }        int count = roundsSinceTodo.asMap().merge(appId, 1, Integer::sum);        return count >= REMINDER_THRESHOLD;    }    @Override    public String getName() {        return "ExecuteToolAdvisor";    }    @Override    public int getOrder() {        return ORDER;    }}
复制代码
因为这个 Advisor 也使用到了 StreamAdvisorChain  接口的 copy 所以我们需要覆盖源码的这个 StreamAdvisorChain 并且实现对应的接口,下面的代码包路径是 org.springframework.ai.chat.client.advisor.api 具体的代码:
  1. public interface StreamAdvisorChain extends AdvisorChain {
  2.     /**
  3.      * Invokes the next {@link StreamAdvisor} in the {@link StreamAdvisorChain} with the
  4.      * given request.
  5.      */
  6.     Flux<ChatClientResponse> nextStream(ChatClientRequest chatClientRequest);
  7.     /**
  8.      * Returns the list of all the {@link StreamAdvisor} instances included in this chain
  9.      * at the time of its creation.
  10.      */
  11.     List<StreamAdvisor> getStreamAdvisors();
  12.     /**
  13.      * Creates a new StreamAdvisorChain copy that contains all advisors after the
  14.      * specified advisor.
  15.      * @param after the StreamAdvisor after which to copy the chain
  16.      * @return a new StreamAdvisorChain containing all advisors after the specified
  17.      * advisor
  18.      * @throws IllegalArgumentException if the specified advisor is not part of the chain
  19.      */
  20.     StreamAdvisorChain copy(StreamAdvisor after);
  21. }
复制代码
下面的包位置是 org.springframework.ai.chat.client.advisor具体的实现代码:
[code]/** * Default implementation for the {@link BaseAdvisorChain}. Used by the {@link ChatClient} * to delegate the call to the next {@link CallAdvisor} or {@link StreamAdvisor} in the * chain. * * @author Christian Tzolov * @author Dariusz Jedrzejczyk * @author Thomas Vitale * @since 1.0.0 */public class DefaultAroundAdvisorChain implements BaseAdvisorChain {        public static final AdvisorObservationConvention DEFAULT_OBSERVATION_CONVENTION = new DefaultAdvisorObservationConvention();        private static final ChatClientMessageAggregator CHAT_CLIENT_MESSAGE_AGGREGATOR = new ChatClientMessageAggregator();        private final List originalCallAdvisors;        private final List originalStreamAdvisors;        private final Deque callAdvisors;        private final Deque streamAdvisors;        private final ObservationRegistry observationRegistry;        private final AdvisorObservationConvention observationConvention;        DefaultAroundAdvisorChain(ObservationRegistry observationRegistry, Deque callAdvisors,                        Deque streamAdvisors, @Nullable AdvisorObservationConvention observationConvention) {                Assert.notNull(observationRegistry, "the observationRegistry must be non-null");                Assert.notNull(callAdvisors, "the callAdvisors must be non-null");                Assert.notNull(streamAdvisors, "the streamAdvisors must be non-null");                this.observationRegistry = observationRegistry;                this.callAdvisors = callAdvisors;                this.streamAdvisors = streamAdvisors;                this.originalCallAdvisors = List.copyOf(callAdvisors);                this.originalStreamAdvisors = List.copyOf(streamAdvisors);                this.observationConvention = observationConvention != null ? observationConvention                                : DEFAULT_OBSERVATION_CONVENTION;        }        public static Builder builder(ObservationRegistry observationRegistry) {                return new Builder(observationRegistry);        }        @Override        public ChatClientResponse nextCall(ChatClientRequest chatClientRequest) {                Assert.notNull(chatClientRequest, "the chatClientRequest cannot be null");                if (this.callAdvisors.isEmpty()) {                        throw new IllegalStateException("No CallAdvisors available to execute");                }                var advisor = this.callAdvisors.pop();                var observationContext = AdvisorObservationContext.builder()                        .advisorName(advisor.getName())                        .chatClientRequest(chatClientRequest)                        .order(advisor.getOrder())                        .build();                return AdvisorObservationDocumentation.AI_ADVISOR                        .observation(this.observationConvention, DEFAULT_OBSERVATION_CONVENTION, () -> observationContext,                                        this.observationRegistry)                        .observe(() -> {                                var chatClientResponse = advisor.adviseCall(chatClientRequest, this);                                observationContext.setChatClientResponse(chatClientResponse);                                return chatClientResponse;                        });        }        @Override        public Flux nextStream(ChatClientRequest chatClientRequest) {                Assert.notNull(chatClientRequest, "the chatClientRequest cannot be null");                return Flux.deferContextual(contextView -> {                        if (this.streamAdvisors.isEmpty()) {                                return Flux.error(new IllegalStateException("No StreamAdvisors available to execute"));                        }                        var advisor = this.streamAdvisors.pop();                        AdvisorObservationContext observationContext = AdvisorObservationContext.builder()                                .advisorName(advisor.getName())                                .chatClientRequest(chatClientRequest)                                .order(advisor.getOrder())                                .build();                        var observation = AdvisorObservationDocumentation.AI_ADVISOR.observation(this.observationConvention,                                        DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry);                        observation.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null)).start();                        // @formatterff                        Flux chatClientResponse = Flux.defer(() -> advisor.adviseStream(chatClientRequest, this)                                                .doOnError(observation::error)                                                .doFinally(s -> observation.stop())                                                .contextWrite(ctx -> ctx.put(ObservationThreadLocalAccessor.KEY, observation)));                        // @formattern                        return CHAT_CLIENT_MESSAGE_AGGREGATOR.aggregateChatClientResponse(chatClientResponse,                                        observationContext::setChatClientResponse);                });        }        @Override        public CallAdvisorChain copy(CallAdvisor after) {                return this.copyAdvisorsAfter(this.getCallAdvisors(), after);        }        @Override        public StreamAdvisorChain copy(StreamAdvisor after) {                return this.copyAdvisorsAfter(this.getStreamAdvisors(), after);        }        private DefaultAroundAdvisorChain copyAdvisorsAfter(List

相关推荐

您需要登录后才可以回帖 登录 | 立即注册