Skip to main content
LangChain 实现了一个流式处理系统,用于呈现实时更新。 流式处理对于提升基于大语言模型构建的应用程序的响应速度至关重要。通过渐进式地显示输出,甚至在完整响应准备好之前,流式处理显著改善了用户体验(UX),尤其是在处理大语言模型的延迟时。

概述

LangChain 的流式处理系统允许你将智能体运行的实时反馈呈现到你的应用程序中。 使用 LangChain 流式处理可以实现: 有关更多端到端示例,请参阅下面的常见模式部分。

支持的流式模式

将以下一个或多个流式模式作为列表传递给 stream 方法:
模式描述
updates在每个智能体步骤后流式传输状态更新。如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将被分别流式传输。
messages从任何调用了 LLM 的图节点流式传输 (token, metadata) 元组。
custom使用流式写入器从图节点内部流式传输自定义数据。

智能体进度

要流式传输智能体进度,请使用带有 streamMode: "updates"stream 方法。这会在每个智能体步骤后发出一个事件。 例如,如果你有一个调用一次工具的智能体,你应该会看到以下更新:
  • LLM 节点:带有工具调用请求的 AIMessage
  • 工具节点:带有执行结果的 ToolMessage
  • LLM 节点:最终的 AI 响应
import z from "zod";
import { createAgent, tool } from "langchain";

const getWeather = tool(
    async ({ city }) => {
        return `The weather in ${city} is always sunny!`;
    },
    {
        name: "get_weather",
        description: "Get weather for a given city.",
        schema: z.object({
        city: z.string(),
        }),
    }
);

const agent = createAgent({
    model: "gpt-5-nano",
    tools: [getWeather],
});

for await (const chunk of await agent.stream(
    { messages: [{ role: "user", content: "what is the weather in sf" }] },
    { streamMode: "updates" }
)) {
    const [step, content] = Object.entries(chunk)[0];
    console.log(`step: ${step}`);
    console.log(`content: ${JSON.stringify(content, null, 2)}`);
}
/**
 * step: model
 * content: {
 *   "messages": [
 *     {
 *       "kwargs": {
 *         // ...
 *         "tool_calls": [
 *           {
 *             "name": "get_weather",
 *             "args": {
 *               "city": "San Francisco"
 *             },
 *             "type": "tool_call",
 *             "id": "call_0qLS2Jp3MCmaKJ5MAYtr4jJd"
 *           }
 *         ],
 *         // ...
 *       }
 *     }
 *   ]
 * }
 * step: tools
 * content: {
 *   "messages": [
 *     {
 *       "kwargs": {
 *         "content": "The weather in San Francisco is always sunny!",
 *         "name": "get_weather",
 *         // ...
 *       }
 *     }
 *   ]
 * }
 * step: model
 * content: {
 *   "messages": [
 *     {
 *       "kwargs": {
 *         "content": "The latest update says: The weather in San Francisco is always sunny!\n\nIf you'd like real-time details (current temperature, humidity, wind, and today's forecast), I can pull the latest data for you. Want me to fetch that?",
 *         // ...
 *       }
 *     }
 *   ]
 * }
 */

LLM 标记

要流式传输 LLM 生成的标记,请使用 streamMode: "messages"
import z from "zod";
import { createAgent, tool } from "langchain";

const getWeather = tool(
    async ({ city }) => {
        return `The weather in ${city} is always sunny!`;
    },
    {
        name: "get_weather",
        description: "Get weather for a given city.",
        schema: z.object({
        city: z.string(),
        }),
    }
);

const agent = createAgent({
    model: "gpt-5.4-mini",
    tools: [getWeather],
});

for await (const [token, metadata] of await agent.stream(
    { messages: [{ role: "user", content: "what is the weather in sf" }] },
    { streamMode: "messages" }
)) {
    console.log(`node: ${metadata.langgraph_node}`);
    console.log(`content: ${JSON.stringify(token.contentBlocks, null, 2)}`);
}

自定义更新

要流式传输工具执行时的更新,你可以使用配置中的 writer 参数。
import z from "zod";
import { tool, createAgent } from "langchain";
import { LangGraphRunnableConfig } from "@langchain/langgraph";

const getWeather = tool(
    async (input, config: LangGraphRunnableConfig) => {
        // 流式传输任意数据
        config.writer?.(`正在查询城市数据: ${input.city}`);
        // ... 获取城市数据
        config.writer?.(`已获取城市数据: ${input.city}`);
        return `${input.city} 总是阳光明媚!`;
    },
    {
        name: "get_weather",
        description: "Get weather for a given city.",
        schema: z.object({
        city: z.string().describe("要获取天气信息的城市。"),
        }),
    }
);

const agent = createAgent({
    model: "gpt-5.4-mini",
    tools: [getWeather],
});

for await (const chunk of await agent.stream(
    { messages: [{ role: "user", content: "what is the weather in sf" }] },
    { streamMode: "custom" }
)) {
    console.log(chunk);
}
输出
正在查询城市数据: San Francisco
已获取城市数据: San Francisco
如果你将 writer 参数添加到你的工具中,那么在不提供 writer 函数的情况下,你将无法在 LangGraph 执行上下文之外调用该工具。

流式传输多种模式

你可以通过将 streamMode 作为数组传递来指定多种流式模式:streamMode: ["updates", "messages", "custom"] 流式输出将是 [mode, chunk] 元组,其中 mode 是流式模式的名称,chunk 是该模式流式传输的数据。
import z from "zod";
import { tool, createAgent } from "langchain";
import { LangGraphRunnableConfig } from "@langchain/langgraph";

const getWeather = tool(
    async (input, config: LangGraphRunnableConfig) => {
        // 流式传输任意数据
        config.writer?.(`正在查询城市数据: ${input.city}`);
        // ... 获取城市数据
        config.writer?.(`已获取城市数据: ${input.city}`);
        return `${input.city} 总是阳光明媚!`;
    },
    {
        name: "get_weather",
        description: "Get weather for a given city.",
        schema: z.object({
        city: z.string().describe("要获取天气信息的城市。"),
        }),
    }
);

const agent = createAgent({
    model: "gpt-5.4-mini",
    tools: [getWeather],
});

for await (const [streamMode, chunk] of await agent.stream(
    { messages: [{ role: "user", content: "what is the weather in sf" }] },
    { streamMode: ["updates", "messages", "custom"] }
)) {
    console.log(`${streamMode}: ${JSON.stringify(chunk, null, 2)}`);
}

常见模式

以下是展示流式处理常见用例的示例。

流式传输思考/推理标记

一些模型在生成最终答案之前会执行内部推理。你可以通过过滤 标准内容块type"reasoning" 的内容,在生成时流式传输这些思考/推理标记。
必须在模型上启用推理输出。有关配置详情,请参阅 推理部分 和你的提供商集成页面要快速检查模型的推理支持情况,请参阅 models.dev
要从智能体流式传输思考标记,请使用 streamMode: "messages" 并过滤推理内容块。当模型支持时,使用启用了扩展思考的模型实例(例如 ChatAnthropic):
import z from "zod";
import { createAgent, tool } from "langchain";
import { ChatAnthropic } from "@langchain/anthropic";

const getWeather = tool(
  async ({ city }) => {
    return `${city}总是阳光明媚!`;
  },
  {
    name: "get_weather",
    description: "获取给定城市的天气信息。",
    schema: z.object({ city: z.string() }),
  },
);

const agent = createAgent({
  model: new ChatAnthropic({
    model: "claude-sonnet-4-6",
    thinking: { type: "enabled", budget_tokens: 5000 },
  }),
  tools: [getWeather],
});

for await (const [token, metadata] of await agent.stream(
  { messages: [{ role: "user", content: "旧金山的天气怎么样?" }] },
  { streamMode: "messages" },
)) {
  if (!token.contentBlocks) continue;
  const reasoning = token.contentBlocks.filter((b) => b.type === "reasoning");
  const text = token.contentBlocks.filter((b) => b.type === "text");
  if (reasoning.length) {
    process.stdout.write(`[思考中] ${reasoning[0].reasoning}`);
  }
  if (text.length) {
    process.stdout.write(text[0].text);
  }
}
输出
[thinking] The user is asking about the weather in San Francisco. I have a tool
[thinking]  available to get this information. Let me call the get_weather tool
[thinking]  with "San Francisco" as the city parameter.
The weather in San Francisco is: It's always sunny in San Francisco!
无论模型提供商如何,此方法都以相同的方式工作——LangChain 通过 content_blocks 属性将特定于提供商的格式(Anthropic thinking 块、OpenAI reasoning 摘要等)标准化为标准的 "reasoning" 内容块类型。 要直接从聊天模型(不使用智能体)流式传输推理标记,请参阅使用聊天模型进行流式处理

禁用流式处理

在某些应用程序中,你可能需要禁用给定模型的单个标记的流式处理。这在以下情况下很有用:
  • 使用多智能体系统来控制哪些智能体流式传输其输出
  • 混合使用支持流式处理和不支持流式处理的模型
  • 部署到 LangSmith 并希望防止某些模型输出流式传输到客户端
在初始化模型时设置 streaming: false
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "gpt-5.4",
  streaming: false,
});
部署到 LangSmith 时,对于任何不希望其输出流式传输到客户端的模型,请设置 streaming=False。这在部署前在你的图代码中进行配置。
并非所有聊天模型集成都支持 streaming 参数。如果你的模型不支持它,请改用 disableStreaming: true。此参数可通过基类在所有聊天模型上使用。
有关更多详细信息,请参阅 LangGraph 流式处理指南

相关内容