LangGraph 实现了一个流式传输系统来展示实时更新。流式传输对于增强基于 LLM 构建的应用程序的响应能力至关重要。通过逐步显示输出,甚至在完整响应准备好之前,流式传输显着改善了用户体验 (UX),特别是在处理 LLM 的延迟时。
LangGraph 流式传输可以做什么:
- 流式传输图状态 — 使用
updates 和 values 模式获取状态更新/值。
- 流式传输子图输出 — 包括来自父图和任何嵌套子图的输出。
- 流式传输 LLM 令牌 — 从任何地方捕获令牌流:节点、子图或工具内部。
- 流式传输自定义数据 — 直接从工具函数发送自定义更新或进度信号。
- 流式传输工具进度 — 实时跟踪工具生命周期事件(
start、progress、end、error)。
- 使用多种流式传输模式 — 从
values(完整状态)、updates(状态增量)、messages(LLM 令牌 + 元数据)、custom(任意用户数据)、tools(工具生命周期事件)或 debug(详细跟踪)中进行选择。
支持的流式传输模式
将以下一个或多个流式传输模式作为列表传递给 stream 方法:
| 模式 | 描述 |
|---|
values | 在图的每一步之后流式传输状态的完整值。 |
updates | 在图的每一步之后流式传输对状态的更新。如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将单独流式传输。 |
custom | 从图节点内部流式传输自定义数据。 |
messages | 从调用 LLM 的任何图节点流式传输 2 元组(LLM 令牌,元数据)。 |
tools | 从工具执行中流式传输工具调用生命周期事件(on_tool_start、on_tool_event、on_tool_end、on_tool_error)。 |
debug | 在图的整个执行过程中流式传输尽可能多的信息。 |
基本用法示例
LangGraph 图公开了 stream 方法以生成流式输出作为迭代器。
for await (const chunk of await graph.stream(inputs, {
streamMode: "updates",
})) {
console.log(chunk);
}
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod/v4";
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
// 设置 streamMode: "updates" 以仅流式传输每个节点后对图状态的更新
// 其他流式传输模式也可用。有关详细信息,请参阅支持的流式传输模式
{ streamMode: "updates" }
)) {
console.log(chunk);
}
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
流式传输多种模式
您可以传递一个数组作为 streamMode 参数以一次流式传输多种模式。
流式传输的输出将是 [mode, chunk] 的元组,其中 mode 是流式传输模式的名称,chunk 是该模式流式传输的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
streamMode: ["updates", "custom"],
})) {
console.log(chunk);
}
流式传输图状态
使用流式传输模式 updates 和 values 在图执行时流式传输图的状态。
updates 在图的每一步之后流式传输对状态的 更新。
values 在图的每一步之后流式传输状态的 完整值。
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod/v4";
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
使用此模式仅流式传输每个步骤后节点返回的 状态更新。流式传输的输出包括节点的名称以及更新。for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "updates" }
)) {
console.log(chunk);
}
使用此模式流式传输每个步骤后图的 完整状态。for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "values" }
)) {
console.log(chunk);
}
流式传输子图输出
要在流式传输输出中包含来自 子图 的输出,您可以在父图的 .stream() 方法中设置 subgraphs: true。这将流式传输来自父图和任何子图的输出。
输出将作为元组 [namespace, data] 流式传输,其中 namespace 是一个元组,包含调用子图的节点的路径,例如 ["parent_node:<task_id>", "child_node:<task_id>"]。
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
// 设置 subgraphs: true 以流式传输来自子图的输出
subgraphs: true,
streamMode: "updates",
}
)) {
console.log(chunk);
}
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import { z } from "zod/v4";
// 定义子图
const SubgraphState = new StateSchema({
foo: z.string(), // 请注意,此键与父图状态共享
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// 定义父图
const ParentState = new StateSchema({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
// 设置 subgraphs: true 以流式传输来自子图的输出
subgraphs: true,
}
)) {
console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
注意 我们不仅接收节点更新,还接收命名空间,这告诉我们正在从哪个图(或子图)流式传输。
使用 debug 流式传输模式在图的整个执行过程中流式传输尽可能多的信息。流式传输的输出包括节点的名称以及完整状态。
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "debug" }
)) {
console.log(chunk);
}
LLM 令牌
使用 messages 流式传输模式从图的任何部分(包括节点、工具、子图或任务)逐个令牌 地流式传输大型语言模型 (LLM) 输出。
messages 模式 的流式输出是一个元组 [message_chunk, metadata],其中:
message_chunk:来自 LLM 的令牌或消息段。
metadata:包含有关图节点和 LLM 调用的详细信息的字典。
如果您的 LLM 不作为 LangChain 集成提供,您可以使用 custom 模式来流式传输其输出。有关详细信息,请参阅 与任何 LLM 一起使用。
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";
const MyState = new StateSchema({
topic: z.string(),
joke: z.string().default(""),
});
const model = new ChatOpenAI({ model: "gpt-4.1-mini" });
const callModel: GraphNode<typeof MyState> = async (state) => {
// 调用 LLM 以生成关于主题的笑话
// 请注意,即使使用 .invoke 而不是 .stream 运行 LLM,也会发出消息事件
const modelResponse = await model.invoke([
{ role: "user", content: `Generate a joke about ${state.topic}` },
]);
return { joke: modelResponse.content };
};
const graph = new StateGraph(MyState)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
// "messages" 流式传输模式返回元组 [messageChunk, metadata] 的迭代器
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点的信息和其他信息
for await (const [messageChunk, metadata] of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "messages" }
)) {
if (messageChunk.content) {
console.log(messageChunk.content + "|");
}
}
按 LLM 调用过滤
您可以将 tags 与 LLM 调用相关联,以按 LLM 调用过滤流式传输的令牌。
import { ChatOpenAI } from "@langchain/openai";
// model1 标记为 "joke"
const model1 = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ['joke']
});
// model2 标记为 "poem"
const model2 = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ['poem']
});
const graph = // ... define a graph that uses these LLMs
// streamMode 设置为 "messages" 以流式传输 LLM 令牌
// metadata 包含有关 LLM 调用的信息,包括 tags
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// 按 metadata 中的 tags 字段过滤流式传输的令牌,仅包括
// 带有 "joke" 标签的 LLM 调用的令牌
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";
// jokeModel 标记为 "joke"
const jokeModel = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ["joke"]
});
// poemModel 标记为 "poem"
const poemModel = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ["poem"]
});
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const callModel: GraphNode<typeof State> = async (state) => {
const topic = state.topic;
console.log("Writing joke...");
const jokeResponse = await jokeModel.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
console.log("\n\nWriting poem...");
const poemResponse = await poemModel.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return {
joke: jokeResponse.content,
poem: poemResponse.content
};
};
const graph = new StateGraph(State)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
// streamMode 设置为 "messages" 以流式传输 LLM 令牌
// metadata 包含有关 LLM 调用的信息,包括 tags
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// 按 metadata 中的 tags 字段过滤流式传输的令牌,仅包括
// 带有 "joke" 标签的 LLM 调用的令牌
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
按节点过滤
要仅从特定节点流式传输令牌,请使用 stream_mode="messages" 并通过流式传输元数据中的 langgraph_node 字段过滤输出:
// "messages" 流式传输模式返回 [messageChunk, metadata] 的元组
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点的信息和其他信息
for await (const [msg, metadata] of await graph.stream(
inputs,
{ streamMode: "messages" }
)) {
// 按 metadata 中的 langgraph_node 字段过滤流式传输的令牌
// 仅包括来自指定节点的令牌
if (msg.content && metadata.langgraph_node === "some_node_name") {
// ...
}
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";
const model = new ChatOpenAI({ model: "gpt-4.1-mini" });
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const writeJoke: GraphNode<typeof State> = async (state) => {
const topic = state.topic;
const jokeResponse = await model.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
return { joke: jokeResponse.content };
};
const writePoem: GraphNode<typeof State> = async (state) => {
const topic = state.topic;
const poemResponse = await model.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return { poem: poemResponse.content };
};
const graph = new StateGraph(State)
.addNode("writeJoke", writeJoke)
.addNode("writePoem", writePoem)
// 同时写入笑话和诗歌
.addEdge(START, "writeJoke")
.addEdge(START, "writePoem")
.compile();
// "messages" 流式传输模式返回 [messageChunk, metadata] 的元组
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点的信息和其他信息
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// 按 metadata 中的 langgraph_node 字段过滤流式传输的令牌
// 仅包括来自 writePoem 节点的令牌
if (msg.content && metadata.langgraph_node === "writePoem") {
console.log(msg.content + "|");
}
}
流式传输自定义数据
要从 LangGraph 节点或工具内部发送 自定义用户定义数据,请按照以下步骤操作:
- 使用
LangGraphRunnableConfig 中的 writer 参数发出自定义数据。
- 调用
.stream() 时设置 streamMode: "custom" 以在流中获取自定义数据。您可以组合多种模式(例如 ["updates", "custom"]),但必须至少包含一个 "custom"。
import { StateGraph, StateSchema, GraphNode, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const State = new StateSchema({
query: z.string(),
answer: z.string(),
});
const node: GraphNode<typeof State> = async (state, config) => {
// 使用 writer 发出自定义键值对(例如,进度更新)
config.writer({ custom_key: "Generating custom data inside node" });
return { answer: "some data" };
};
const graph = new StateGraph(State)
.addNode("node", node)
.addEdge(START, "node")
.compile();
const inputs = { query: "example" };
// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
import { tool } from "@langchain/core/tools";
import { LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const queryDatabase = tool(
async (input, config: LangGraphRunnableConfig) => {
// 使用 writer 发出自定义键值对(例如,进度更新)
config.writer({ data: "Retrieved 0/100 records", type: "progress" });
// 执行查询
// 发出另一个自定义键值对
config.writer({ data: "Retrieved 100/100 records", type: "progress" });
return "some-answer";
},
{
name: "query_database",
description: "Query the database.",
schema: z.object({
query: z.string().describe("The query to execute."),
}),
}
);
const graph = // ... define a graph that uses this tool
// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
流式传输工具进度
使用 tools 流式传输模式来接收工具执行的实时生命周期事件。这对于在工具运行时在 UI 中显示进度指示器、部分结果和错误状态非常有用。
tools 流式传输模式发出四种事件类型:
| 事件 | 何时 | 载荷 |
|---|
on_tool_start | 工具调用开始 | name,input,toolCallId |
on_tool_event | 工具产生中间数据 | name,data,toolCallId |
on_tool_end | 工具返回其最终结果 | name,output,toolCallId |
on_tool_error | 工具抛出错误 | name,error,toolCallId |
定义流式传输进度的工具
要发出 on_tool_event 事件,请将您的工具函数定义为 异步生成器 (async function*)。每个 yield 将中间数据发送到流,return 值用作工具的最终结果。
import { tool } from "@langchain/core/tools";
import { z } from "zod/v4";
const searchFlights = tool(
async function* (input) {
const airlines = ["United", "Delta", "American", "JetBlue"];
const completed: string[] = [];
for (let i = 0; i < airlines.length; i++) {
await new Promise((r) => setTimeout(r, 500));
completed.push(airlines[i]);
// 每个 yield 向流发出一个 on_tool_event
yield {
message: `Searching ${airlines[i]}...`,
progress: (i + 1) / airlines.length,
completed,
};
}
// 返回值成为工具结果 (ToolMessage.content)
return JSON.stringify({
flights: [
{ airline: "United", price: 450, duration: "5h 30m" },
{ airline: "Delta", price: 520, duration: "5h 15m" },
],
});
},
{
name: "search_flights",
description: "Search for available flights to a destination.",
schema: z.object({
destination: z.string(),
date: z.string(),
}),
}
);
返回 Promise 的现有工具完全兼容。它们发出 on_tool_start 和 on_tool_end 事件,但不发出 on_tool_event 事件。
在服务器端使用工具事件
将 streamMode: ["tools"](或与其他模式组合)传递给 graph.stream():
for await (const [mode, chunk] of await graph.stream(
{ messages: [{ role: "user", content: "Find flights to Tokyo" }] },
{ streamMode: ["updates", "tools"] }
)) {
if (mode === "tools") {
switch (chunk.event) {
case "on_tool_start":
console.log(`Tool started: ${chunk.name}`, chunk.input);
break;
case "on_tool_event":
console.log(`Tool progress: ${chunk.name}`, chunk.data);
break;
case "on_tool_end":
console.log(`Tool finished: ${chunk.name}`, chunk.output);
break;
case "on_tool_error":
console.error(`Tool failed: ${chunk.name}`, chunk.error);
break;
}
}
}
在 React 中使用 useStream 获得工具进度
来自 @langchain/langgraph-sdk/react 的 useStream 钩子在您的流式传输模式中包含 "tools" 时公开一个 toolProgress 数组。每个条目都是一个跟踪运行工具当前状态的 ToolProgress 对象:
| 字段 | 描述 |
|---|
name | 工具名称 |
state | 当前生命周期状态:"starting"、"running"、"completed" 或 "error" |
toolCallId | 来自 LLM 的工具调用 ID |
input | 工具的输入参数 |
data | 来自 on_tool_event 的最新产生的数据 |
result | 最终结果,在 on_tool_end 上设置 |
error | 错误,在 on_tool_error 上设置 |
import { useStream } from "@langchain/langgraph-sdk/react";
function Chat() {
const stream = useStream({
assistantId: "my-agent",
streamMode: ["values", "tools"],
});
// 过滤正在运行的工具
const activeTools = stream.toolProgress.filter(
(t) => t.state === "starting" || t.state === "running"
);
return (
<div>
{stream.messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
{/* 显示正在运行的工具的进度卡 */}
{activeTools.map((tool) => (
<ToolProgressCard
key={tool.toolCallId ?? tool.name}
name={tool.name}
state={tool.state}
data={tool.data}
/>
))}
</div>
);
}
此示例显示了一个完整的代理,该代理具有异步生成器工具,可将搜索进度流式传输到 React UI。代理定义:import { tool } from "@langchain/core/tools";
import { ChatOpenAI } from "@langchain/openai";
import { createAgent } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint-memory";
import { z } from "zod/v4";
const searchFlights = tool(
async function* (input) {
const airlines = ["United", "Delta", "American", "JetBlue"];
const completed: string[] = [];
for (let i = 0; i < airlines.length; i++) {
await new Promise((r) => setTimeout(r, 600));
completed.push(`${airlines[i]}: checked`);
yield {
message: `Searching ${airlines[i]}...`,
progress: (i + 1) / airlines.length,
completed,
};
}
return JSON.stringify({
flights: [
{ airline: "United", price: 450, duration: "5h 30m" },
{ airline: "Delta", price: 520, duration: "5h 15m" },
],
});
},
{
name: "search_flights",
description: "Search for available flights.",
schema: z.object({
destination: z.string(),
departure_date: z.string(),
}),
}
);
const checkHotels = tool(
async function* (input) {
const hotels = ["Grand Hyatt", "Marriott", "Hilton"];
const completed: string[] = [];
for (let i = 0; i < hotels.length; i++) {
await new Promise((r) => setTimeout(r, 400));
completed.push(`${hotels[i]}: available`);
yield {
message: `Checking ${hotels[i]}...`,
progress: (i + 1) / hotels.length,
completed,
};
}
return JSON.stringify({
hotels: [
{ name: "Grand Hyatt", price: 250, rating: 4.5 },
{ name: "Marriott", price: 180, rating: 4.2 },
],
});
},
{
name: "check_hotels",
description: "Check hotel availability.",
schema: z.object({
city: z.string(),
check_in: z.string(),
nights: z.number(),
}),
}
);
export const agent = createAgent({
model: new ChatOpenAI({ model: "gpt-4o-mini" }),
tools: [searchFlights, checkHotels],
checkpointer: new MemorySaver(),
});
带有进度卡的 React 组件:import { useStream } from "@langchain/langgraph-sdk/react";
function TravelPlanner() {
const stream = useStream<typeof agent>({
assistantId: "travel-agent",
streamMode: ["values", "tools"],
});
const activeTools = stream.toolProgress.filter(
(t) => t.state === "starting" || t.state === "running"
);
return (
<div>
{stream.messages.map((msg) => (
<div key={msg.id}>{msg.content}</div>
))}
{activeTools.map((tool) => {
const data = tool.data as {
message?: string;
progress?: number;
completed?: string[];
} | undefined;
return (
<div key={tool.toolCallId ?? tool.name}>
<strong>{tool.name}</strong>
{data?.message && <p>{data.message}</p>}
{data?.progress != null && (
<div style={{ width: "100%", background: "#eee" }}>
<div
style={{
width: `${data.progress * 100}%`,
background: "#4CAF50",
height: 8,
transition: "width 0.3s ease",
}}
/>
</div>
)}
{data?.completed?.map((step, i) => (
<div key={i}>✓ {step}</div>
))}
</div>
);
})}
</div>
);
}
这两种流式传输模式都可以显示工具进度,但它们服务于不同的目的:
tools — 自动发出结构化的生命周期事件(on_tool_start、on_tool_event、on_tool_end、on_tool_error),除了使用 async function* 外,无需在工具中更改代码。useStream 钩子开箱即用地提供反应式 toolProgress 数组。
custom — 让您完全控制发出的数据以及何时使用 config.writer()。当您需要不映射到工具生命周期的自由格式数据时,或者当您想要从节点(不仅仅是工具)进行流式传输时,请使用此模式。
与任何 LLM 一起使用
您可以使用 streamMode: "custom" 从 任何 LLM API 流式传输数据 — 即使该 API 未 实现 LangChain 聊天模型接口。
这使您可以集成原始 LLM 客户端或提供其自己的流式传输接口的外部服务,使 LangGraph 对于自定义设置具有高度的灵活性。
import { StateGraph, GraphNode, StateSchema } from "@langchain/langgraph";
import * as z from "zod";
const State = new StateSchema({ result: z.string() });
const callArbitraryModel: GraphNode<typeof State> = async (state, config) => {
// 调用任意模型并流式传输输出的示例节点
// 假设您有一个产生块的流式客户端
// 使用您的自定义流式客户端生成 LLM 令牌
for await (const chunk of yourCustomStreamingClient(state.topic)) {
// 使用 writer 将自定义数据发送到流
config.writer({ custom_llm_chunk: chunk });
}
return { result: "completed" };
};
const graph = new StateGraph(State)
.addNode("callArbitraryModel", callArbitraryModel)
// 根据需要添加其他节点和边
.compile();
// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(
{ topic: "cats" },
{ streamMode: "custom" }
)) {
// 该块将包含从 LLM 流式传输的自定义数据
console.log(chunk);
}
import { StateGraph, StateSchema, MessagesValue, GraphNode, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { tool } from "@langchain/core/tools";
import * as z from "zod";
import OpenAI from "openai";
const openaiClient = new OpenAI();
const modelName = "gpt-4.1-mini";
async function* streamTokens(modelName: string, messages: any[]) {
const response = await openaiClient.chat.completions.create({
messages,
model: modelName,
stream: true,
});
let role: string | null = null;
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta?.role) {
role = delta.role;
}
if (delta?.content) {
yield { role, content: delta.content };
}
}
}
// 这是我们的工具
const getItems = tool(
async (input, config: LangGraphRunnableConfig) => {
let response = "";
for await (const msgChunk of streamTokens(
modelName,
[
{
role: "user",
content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
},
]
)) {
response += msgChunk.content;
config.writer?.(msgChunk);
}
return response;
},
{
name: "get_items",
description: "Use this tool to list items one might find in a place you're asked about.",
schema: z.object({
place: z.string().describe("The place to look up items for."),
}),
}
);
const State = new StateSchema({
messages: MessagesValue,
});
const callTool: GraphNode<typeof State> = async (state) => {
const aiMessage = state.messages.at(-1);
const toolCall = aiMessage.tool_calls?.at(-1);
const functionName = toolCall?.function?.name;
if (functionName !== "get_items") {
throw new Error(`Tool ${functionName} not supported`);
}
const functionArguments = toolCall?.function?.arguments;
const args = JSON.parse(functionArguments);
const functionResponse = await getItems.invoke(args);
const toolMessage = {
tool_call_id: toolCall.id,
role: "tool",
name: functionName,
content: functionResponse,
};
return { messages: [toolMessage] };
};
const graph = new StateGraph(State)
// 这是工具调用图节点
.addNode("callTool", callTool)
.addEdge(START, "callTool")
.compile();
让我们使用包含工具调用的 AIMessage 调用该图:const inputs = {
messages: [
{
content: null,
role: "assistant",
tool_calls: [
{
id: "1",
function: {
arguments: '{"place":"bedroom"}',
name: "get_items",
},
type: "function",
}
],
}
]
};
for await (const chunk of await graph.stream(
inputs,
{ streamMode: "custom" }
)) {
console.log(chunk.content + "|");
}
禁用特定聊天模型的流式传输
如果您的应用程序混合使用了支持流式传输和不支持流式传输的模型,您可能需要为不支持流式传输的模型显式禁用流式传输。
初始化模型时设置 streaming: false。
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "o1-preview",
// 设置 streaming: false 以禁用聊天模型的流式传输
streaming: false,
});
并非所有聊天模型集成都支持 streaming 参数。如果您的模型不支持,请改用 disableStreaming: true。该参数通过基类在所有聊天模型上可用。
通过 MCP 将这些文档连接 到 Claude、VSCode 等以获取实时解答。