
本文介绍如何在 langchain.js 中自定义回调处理器,过滤掉 agent 执行过程中的思考链(如 action、observation 等),仅将 response.output 的最终答案以流式方式逐 Token 返回给客户端。
本文介绍如何在 langchain.js 中自定义回调处理器,过滤掉 agent 执行过程中的思考链(如 action、observation 等),仅将 response.output 的最终答案以流式方式逐 token 返回给客户端。
在使用 langchain.js 构建基于 Agent 的流式问答服务时,一个常见痛点是:默认的 handleLLMNewToken 回调会将 LLM 生成的所有 token(包括推理过程中的中间步骤,如 “Thought:…”、”Action:…”、”Observation:…”)全部推送至客户端,导致前端接收到大量非用户所需的冗余内容。而用户真正关心的,仅是最终自然语言形式的答案(即 response.output 字段)。
LangChain.js 当前(v0.1.x)尚未提供开箱即用的 FinalStreamingStdOutCallbackHandler 类似物(该功能在 Python 版本中已原生支持),因此需手动实现一个轻量级、状态感知的自定义回调处理器。
✅ 核心思路:状态机式 Token 过滤
我们通过维护一个内部状态标志(isInFinalAnswer),在 Agent 执行流程中识别“最终答案开始”的信号(通常是 Final Answer: 后缀或 response.output 确认阶段),此后才启用 token 流式输出。
以下是一个生产就绪的自定义处理器示例:
class FinalAnswerStreamingHandler { private res: NodeJS.WritableStream; private isInFinalAnswer = false; private buffer = ""; // 缓冲未确认的 token,用于匹配起始标记 constructor(res: NodeJS.WritableStream) { this.res = res; } handleLLMNewToken(token: string): void { // Step 1: 检测是否进入最终答案阶段(兼容常见 Agent 输出格式) if (!this.isInFinalAnswer) { this.buffer += token; // 常见触发条件(可按实际 Agent prefix 调整): // - "Final Answer:"(Zero-shot React) // - "Answer:"(某些自定义 agent) // - 或更鲁棒地:等待 response.output 已确定后才开启(需结合 onAgentEnd) if (/Finals+Answers*:/i.test(this.buffer) || /Answers*:/i.test(this.buffer)) { this.isInFinalAnswer = true; // 清除前缀(如 "Final Answer: "),只流后续内容 const cleanToken = this.buffer.replace(/.*?(Finals+Answers*:|Answers*:)s*/i, ""); if (cleanToken) { this.res.write(cleanToken); } this.buffer = ""; return; } return; // 仍在前导阶段,暂不输出 } // Step 2: 已进入最终答案 → 直接流式写入 this.res.write(token); } // 【重要】配合 onAgentEnd 确保兜底(推荐启用) handleAgentEnd(): void { // 若因流式延迟导致最后 token 未 flush,此处可强制结束 this.res.write("n"); } }
? 使用方式(集成到 express/http Server)
app.post("/chat", async (req, res) => { res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); const handler = new FinalAnswerStreamingHandler(res); const model = new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.5, streaming: true, callbacks: [handler], // 注入自定义处理器 }); const executor = await initializeAgentExecutorWithOptions( [qaTool], model, { agentType: "zero-shot-react-description", agentArgs: { prefix }, // 确保 prefix 中包含明确的 "Final Answer:" 提示 } ); try { const response = await executor.call({ input: req.body.prompt }); // 注意:response.output 是完整答案字符串,但流式已由 handler 分发 res.end(); } catch (err) { console.error(err); res.status(500).end(); } });
⚠️ 关键注意事项
- Agent Prompt 必须规范:确保你使用的 prefix 显式要求模型以 “Final Answer:” 开头输出答案(这是 Zero-shot React 的标准约定)。例如:
... You have access to the following tools: ... Use the following format: Thought: ... Action: ... Observation: ... ... Final Answer: <your answer here> - 不要依赖 onLLMEnd 或 onChainEnd:它们在流式过程中不保证时序,且无法访问 response.output;真正的答案边界应由 LLM 生成的 token 序列本身定义。
- 缓冲区大小控制:上述示例中 buffer 仅用于匹配起始标记,实际生产环境建议限制最大长度(如 buffer.Length
- 多轮/复杂 Agent 场景:若使用 Plan-and-Execute、Self-Ask 等高级 Agent,需相应扩展 isInFinalAnswer 的检测逻辑(例如监听特定 tool 名称后的 Answer:)。
通过该方案,你将获得干净、可控的流式响应——前端接收到的每个 chunk 都是最终答案的一部分,无需二次解析或丢弃脏数据,显著提升用户体验与前端处理效率。