本指南解释了使用子图的机制。子图是一个图,它在另一个图中被用作节点。
子图对于以下场景非常有用:
- 构建多智能体系统
- 在多个图中复用一组节点
- 分布式开发:当您希望不同团队独立处理图的不同部分时,您可以将每个部分定义为一个子图,只要遵守子图接口(输入和输出模式),父图就可以在不了解子图任何细节的情况下构建。
npm install @langchain/langgraph
为 LangGraph 开发设置 LangSmith 注册
LangSmith 以快速发现问题并提升您的 LangGraph
项目性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的
LLM 应用程序——阅读更多关于如何开始使用
LangSmith的信息。
定义子图通信
添加子图时,您需要定义父图和子图之间的通信方式:
| 模式 | 使用时机 | 状态模式 |
|---|
| 在节点内调用子图 | 父图和子图具有不同的状态模式(无共享键),或者您需要在它们之间转换状态 | 您编写一个包装函数,将父状态映射到子图输入,并将子图输出映射回父状态 |
| 将子图添加为节点 | 父图和子图共享状态键——子图从与父图相同的通道读取和写入 | 您直接将编译后的子图传递给 add_node——不需要包装函数 |
在节点内调用子图
当父图和子图具有不同的状态模式(无共享键)时,在节点函数内调用子图。这在您希望为多智能体系统中的每个智能体保留私有消息历史时很常见。
节点函数在调用子图之前将父状态转换为子图状态,并在返回之前将结果转换回父状态。
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";
const SubgraphState = new StateSchema({
bar: z.string(),
});
// 子图
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "hi! " + state.bar };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile();
// 父图
const State = new StateSchema({
foo: z.string(),
});
// 将状态转换为子图状态,然后再转换回来
const builder = new StateGraph(State)
.addNode("node1", async (state) => {
const subgraphOutput = await subgraph.invoke({ bar: state.foo });
return { foo: subgraphOutput.bar };
})
.addEdge(START, "node1");
const graph = builder.compile();
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";
// 定义子图
const SubgraphState = new StateSchema({
// 注意,这些键均未与父图状态共享
bar: z.string(),
baz: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { baz: "baz" };
})
.addNode("subgraphNode2", (state) => {
return { bar: state.bar + state.baz };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// 定义父图
const ParentState = new StateSchema({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", async (state) => {
const response = await subgraph.invoke({ bar: state.foo });
return { foo: response.bar };
})
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{ subgraphs: true },
)) {
console.log(chunk);
}
- 将状态转换为子图状态
- 将响应转换回父状态
[[], { node1: { foo: 'hi! foo' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode1: { baz: 'baz' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode2: { bar: 'hi! foobaz' } }]
[[], { node2: { foo: 'hi! foobaz' } }]
将子图添加为节点
当父图和子图共享状态键时,您可以将编译后的子图直接传递给 add_node。不需要包装函数——子图会自动从父图的状态通道读取和写入。例如,在多智能体系统中,智能体通常通过共享的消息键进行通信。
如果您的子图与父图共享状态键,您可以按照以下步骤将其添加到您的图中:
- 定义子图工作流(在下面的示例中为
subgraphBuilder)并编译它
- 在定义父图工作流时,将编译后的子图传递给
.addNode 方法
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";
const State = new StateSchema({
foo: z.string(),
});
// 子图
const subgraphBuilder = new StateGraph(State)
.addNode("subgraphNode1", (state) => {
return { foo: "hi! " + state.foo };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile();
// 父图
const builder = new StateGraph(State)
.addNode("node1", subgraph)
.addEdge(START, "node1");
const graph = builder.compile();
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";
// 定义子图
const SubgraphState = new StateSchema({
foo: z.string(),
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
// 注意,此节点使用仅在子图中可用的状态键 ('bar')
// 并在共享状态键 ('foo') 上发送更新
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// 定义父图
const ParentState = new StateSchema({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream({ foo: "foo" })) {
console.log(chunk);
}
- 此键与父图状态共享
- 此键是
SubgraphState 私有的,父图不可见
{ node1: { foo: 'hi! foo' } }
{ node2: { foo: 'hi! foobar' } }
子图持久化
使用子图时,您需要决定其内部数据在调用之间的处理方式。考虑一个将任务委托给专业子智能体的客户支持机器人:“计费专家”子智能体应该记住客户之前的问题,还是每次调用时都重新开始?
.compile() 上的 checkpointer 参数控制子图持久化:
| 模式 | checkpointer= | 行为 |
|---|
| 每次调用 | None (默认) | 每次调用都重新开始,并继承父图的检查点以支持单次调用内的中断和持久执行。 |
| 每个线程 | True | 状态在同一线程上的多次调用之间累积。每次调用从上一次调用结束的地方继续。 |
| 无状态 | False | 完全没有检查点——像普通函数调用一样运行。没有中断或持久执行。 |
每次调用是大多数应用程序的正确选择,包括子智能体处理独立请求的多智能体系统。当子智能体需要多轮对话记忆时(例如,一个研究助理在多次交流中建立上下文),请使用每个线程。
父图必须使用检查点进行编译,子图持久化功能(中断、状态检查、每个线程内存)才能工作。请参阅持久化。
下面的示例使用 LangChain 的
create_agent,这是构建智能体的常用方法。create_agent
在底层生成一个 LangGraph
图,因此所有子图持久化概念都直接适用。如果您使用原始的
LangGraph StateGraph 构建,相同的模式和配置选项也适用——请参阅图
API 了解详细信息。
有状态
有状态子图继承父图的检查点,从而支持中断、持久执行和状态检查。两种有状态模式在状态保留时间上有所不同。
每次调用(默认)
这是大多数应用程序的推荐模式,包括子智能体作为工具调用的多智能体系统。它支持中断、持久执行和并行调用,同时保持每次调用隔离。
当每次对子图的调用都是独立的,并且子智能体不需要记住之前调用的任何内容时,使用每次调用持久化。这是最常见的模式,特别是对于处理一次性请求(如“查找此客户的订单”或“总结此文档”)的多智能体系统。
省略 checkpointer 或将其设置为 None。每次调用都重新开始,但在单次调用内,子图继承父图的检查点,并可以使用 interrupt() 来暂停和恢复。
以下示例使用两个子智能体(水果专家、蔬菜专家)包装为外部智能体的工具:
import { createAgent, tool } from "langchain";
import { MemorySaver, Command, interrupt } from "@langchain/langgraph";
import * as z from "zod";
const fruitInfo = tool((input) => `Info about ${input.fruitName}`, {
name: "fruit_info",
description: "Look up fruit info.",
schema: z.object({ fruitName: z.string() }),
});
const veggieInfo = tool((input) => `Info about ${input.veggieName}`, {
name: "veggie_info",
description: "Look up veggie info.",
schema: z.object({ veggieName: z.string() }),
});
// 子智能体 - 无检查点设置(继承父图)
const fruitAgent = createAgent({
model: "gpt-4.1-mini",
tools: [fruitInfo],
prompt:
"You are a fruit expert. Use the fruit_info tool. Respond in one sentence.",
});
const veggieAgent = createAgent({
model: "gpt-4.1-mini",
tools: [veggieInfo],
prompt:
"You are a veggie expert. Use the veggie_info tool. Respond in one sentence.",
});
// 将子智能体包装为外部智能体的工具
const askFruitExpert = tool(
async (input) => {
const response = await fruitAgent.invoke({
messages: [{ role: "user", content: input.question }],
});
return response.messages[response.messages.length - 1].content;
},
{
name: "ask_fruit_expert",
description: "Ask the fruit expert. Use for ALL fruit questions.",
schema: z.object({ question: z.string() }),
},
);
const askVeggieExpert = tool(
async (input) => {
const response = await veggieAgent.invoke({
messages: [{ role: "user", content: input.question }],
});
return response.messages[response.messages.length - 1].content;
},
{
name: "ask_veggie_expert",
description: "Ask the veggie expert. Use for ALL veggie questions.",
schema: z.object({ question: z.string() }),
},
);
// 带检查点的外部智能体
const agent = createAgent({
model: "gpt-4.1-mini",
tools: [askFruitExpert, askVeggieExpert],
prompt:
"You have two experts: ask_fruit_expert and ask_veggie_expert. " +
"ALWAYS delegate questions to the appropriate expert.",
checkpointer: new MemorySaver(),
});
每次调用都可以使用 interrupt() 来暂停和恢复。在工具函数中添加 interrupt() 以要求用户批准后再继续:const fruitInfo = tool(
(input) => {
interrupt("continue?");
return `Info about ${input.fruitName}`;
},
{
name: "fruit_info",
description: "Look up fruit info.",
schema: z.object({ fruitName: z.string() }),
},
);
const config = { configurable: { thread_id: "1" } };
// 调用 - 子智能体的工具调用 interrupt()
let response = await agent.invoke(
{ messages: [{ role: "user", content: "Tell me about apples" }] },
config,
);
// response 包含 __interrupt__
// 恢复 - 批准中断
response = await agent.invoke(new Command({ resume: true }), config);
// 子智能体消息计数:4
每次调用都从新的子智能体状态开始。子智能体不记得之前的调用:const config = { configurable: { thread_id: "1" } };
// 第一次调用
let response = await agent.invoke(
{ messages: [{ role: "user", content: "Tell me about apples" }] },
config,
);
// 子智能体消息计数:4
// 第二次调用 - 子智能体重新开始,不记得苹果
response = await agent.invoke(
{ messages: [{ role: "user", content: "Now tell me about bananas" }] },
config,
);
// 子智能体消息计数:4(仍然新鲜!)
对同一子图的多次调用可以无冲突地工作,因为每次调用都有自己的检查点命名空间:const config = { configurable: { thread_id: "1" } };
// LLM 调用 ask_fruit_expert 处理苹果和香蕉
const response = await agent.invoke(
{ messages: [{ role: "user", content: "Tell me about apples and bananas" }] },
config,
);
// 子智能体消息计数:4(苹果 - 新鲜)
// 子智能体消息计数:4(香蕉 - 新鲜)
每个线程
当子智能体需要记住之前的交互时,使用每个线程持久化。例如,一个研究助理在多次交流中建立上下文,或者一个编码助理跟踪其已编辑的文件。子智能体的对话历史和数据在同一线程上的多次调用之间累积。每次调用从上一次调用结束的地方继续。
使用 checkpointer=True 编译以启用此行为。
每个线程子图不支持并行工具调用。当 LLM 可以访问每个线程的子智能体作为工具时,它可能会尝试并行多次调用该工具(例如,同时询问水果专家关于苹果和香蕉的问题)。这会导致检查点冲突,因为两个调用都写入相同的命名空间。下面的示例使用 LangChain 的 ToolCallLimitMiddleware 来防止这种情况。如果您使用纯 LangGraph StateGraph 构建,您需要自己防止并行工具调用——例如,通过配置模型禁用并行工具调用,或添加逻辑以确保同一子图不会被并行多次调用。
以下示例使用使用 checkpointer=True 编译的水果专家子智能体:
import { createAgent, tool, toolCallLimitMiddleware } from "langchain";
import { MemorySaver, Command, interrupt } from "@langchain/langgraph";
import * as z from "zod";
const fruitInfo = tool((input) => `Info about ${input.fruitName}`, {
name: "fruit_info",
description: "Look up fruit info.",
schema: z.object({ fruitName: z.string() }),
});
// 带有 checkpointer=true 的子智能体,用于持久状态
const fruitAgent = createAgent({
model: "gpt-4.1-mini",
tools: [fruitInfo],
prompt:
"You are a fruit expert. Use the fruit_info tool. Respond in one sentence.",
checkpointer: true,
});
// 将子智能体包装为外部智能体的工具
const askFruitExpert = tool(
async (input) => {
const response = await fruitAgent.invoke({
messages: [{ role: "user", content: input.question }],
});
return response.messages[response.messages.length - 1].content;
},
{
name: "ask_fruit_expert",
description: "Ask the fruit expert. Use for ALL fruit questions.",
schema: z.object({ question: z.string() }),
},
);
// 带检查点的外部智能体
// 使用 toolCallLimitMiddleware 防止对每个线程子智能体的并行调用,
// 这会导致检查点冲突。
const agent = createAgent({
model: "gpt-4.1-mini",
tools: [askFruitExpert],
prompt:
"You have a fruit expert. ALWAYS delegate fruit questions to ask_fruit_expert.",
middleware: [
toolCallLimitMiddleware({ toolName: "ask_fruit_expert", runLimit: 1 }),
],
checkpointer: new MemorySaver(),
});
每个线程子智能体支持 interrupt(),就像每次调用一样。在工具函数中添加 interrupt() 以要求用户批准:const fruitInfo = tool(
(input) => {
interrupt("continue?");
return `Info about ${input.fruitName}`;
},
{
name: "fruit_info",
description: "Look up fruit info.",
schema: z.object({ fruitName: z.string() }),
},
);
const config = { configurable: { thread_id: "1" } };
// 调用 - 子智能体的工具调用 interrupt()
let response = await agent.invoke(
{ messages: [{ role: "user", content: "Tell me about apples" }] },
config,
);
// response 包含 __interrupt__
// 恢复 - 批准中断
response = await agent.invoke(new Command({ resume: true }), config);
// 子智能体消息计数:4
状态在多次调用之间累积——子智能体记住过去的对话:const config = { configurable: { thread_id: "1" } };
// 第一次调用
let response = await agent.invoke(
{ messages: [{ role: "user", content: "Tell me about apples" }] },
config,
);
// 子智能体消息计数:4
// 第二次调用 - 子智能体记住苹果对话
response = await agent.invoke(
{ messages: [{ role: "user", content: "Now tell me about bananas" }] },
config,
);
// 子智能体消息计数:8(累积!)
当您有多个不同的每个线程子图时(例如,水果专家和蔬菜专家),每个子图都需要自己的存储空间,这样它们的检查点就不会相互覆盖。这称为命名空间隔离。如果您在节点内调用子图,LangGraph 会根据调用顺序(第一次调用、第二次调用等)分配命名空间。这意味着重新排序调用可能会混淆哪个子图加载哪个状态。为了避免这种情况,将每个子智能体包装在具有唯一节点名称的自己的 StateGraph 中——这为每个子图提供了一个稳定、唯一的命名空间:import {
StateGraph,
StateSchema,
MessagesValue,
START,
} from "@langchain/langgraph";
function createSubAgent(
model: string,
{ name, ...kwargs }: { name: string; [key: string]: any },
) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent) // 唯一名称 → 稳定命名空间
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent",
tools: [fruitInfo],
prompt: "...",
checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent",
tools: [veggieInfo],
prompt: "...",
checkpointer: true,
});
const config = { configurable: { thread_id: "1" } };
// 第一次调用 - LLM 同时调用水果和蔬菜专家
let response = await agent.invoke(
{
messages: [
{ role: "user", content: "Tell me about cherries and broccoli" },
],
},
config,
);
// 水果子智能体消息计数:4
// 蔬菜子智能体消息计数:4
// 第二次调用 - 两个智能体独立累积
response = await agent.invoke(
{
messages: [
{ role: "user", content: "Now tell me about oranges and carrots" },
],
},
config,
);
// 水果子智能体消息计数:8(记住樱桃!)
// 蔬菜子智能体消息计数:8(记住西兰花!)
添加为节点的子图已经自动获得基于名称的命名空间,因此不需要此包装器。
无状态
当您希望像普通函数调用一样运行子智能体而没有检查点开销时,使用此模式。子图无法暂停/恢复,并且无法从持久执行中受益。使用 checkpointer=False 编译。
没有检查点,子图就没有持久执行。如果进程在运行过程中崩溃,子图无法恢复,必须从头开始重新运行。
const subgraphBuilder = new StateGraph(...);
const subgraph = subgraphBuilder.compile({ checkpointer: false });
检查点参考
使用 .compile() 上的 checkpointer 参数控制子图持久化:
const subgraph = builder.compile({ checkpointer: false }); // 或 true,或 null
| 功能 | 每次调用(默认) | 每个线程 | 无状态 |
|---|
checkpointer= | None | True | False |
| 中断 (HITL) | ✅ | ✅ | ❌ |
| 多轮记忆 | ❌ | ✅ | ❌ |
| 多次调用(不同子图) | ✅ | | ✅ |
| 多次调用(同一子图) | ✅ | ❌ | ✅ |
| 状态检查 | | ✅ | ❌ |
- 中断 (HITL):子图可以使用 interrupt() 来暂停执行并等待用户输入,然后从暂停处恢复。
- 多轮记忆:子图在同一线程 (thread) 内的多次调用之间保留其状态。每次调用从上一次调用结束的地方继续,而不是重新开始。
- 多次调用(不同子图):可以在单个节点内调用多个不同的子图实例,而不会出现检查点命名空间冲突。
- 多次调用(同一子图):可以在单个节点内多次调用同一子图实例。使用有状态持久化时,这些调用会写入相同的检查点命名空间并发生冲突——请改用每次调用持久化。
- 状态检查:子图的状态可通过
get_state(config, subgraphs=True) 用于调试和监控。
查看子图状态
启用持久化后,您可以使用子图选项检查子图状态。使用无状态检查点 (checkpointer=False) 时,不会保存子图检查点,因此子图状态不可用。
查看子图状态要求 LangGraph
能够静态发现子图——即它被添加为节点或在节点内调用。当子图在工具函数或其他间接调用(例如子智能体模式)中调用时,此功能无效。无论嵌套如何,中断都会传播到顶层图。
返回当前调用的子图状态。每次调用都重新开始。import {
StateGraph,
StateSchema,
START,
MemorySaver,
interrupt,
Command,
} from "@langchain/langgraph";
import * as z from "zod";
const State = new StateSchema({
foo: z.string(),
});
// 子图
const subgraphBuilder = new StateGraph(State)
.addNode("subgraphNode1", (state) => {
const value = interrupt("Provide value:");
return { foo: state.foo + value };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile(); // 继承父图检查点
// 父图
const builder = new StateGraph(State)
.addNode("node1", subgraph)
.addEdge(START, "node1");
const checkpointer = new MemorySaver();
const graph = builder.compile({ checkpointer });
const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);
// 查看当前调用的子图状态
const subgraphState = (await graph.getState(config, { subgraphs: true }))
.tasks[0].state;
// 恢复子图
await graph.invoke(new Command({ resume: "bar" }), config);
返回此线程上所有调用的累积子图状态。import {
StateGraph,
StateSchema,
MessagesValue,
START,
MemorySaver,
} from "@langchain/langgraph";
// 具有自身持久状态的子图
const SubgraphState = new StateSchema({
messages: MessagesValue,
});
const subgraphBuilder = new StateGraph(SubgraphState);
// ... 添加节点和边
const subgraph = subgraphBuilder.compile({ checkpointer: true });
// 父图
const builder = new StateGraph(SubgraphState)
.addNode("agent", subgraph)
.addEdge(START, "agent");
const checkpointer = new MemorySaver();
const graph = builder.compile({ checkpointer });
const config = { configurable: { thread_id: "1" } };
await graph.invoke({ messages: [{ role: "user", content: "hi" }] }, config);
await graph.invoke(
{ messages: [{ role: "user", content: "what did I say?" }] },
config,
);
// 查看累积的子图状态(包括两次调用的消息)
const subgraphState = (await graph.getState(config, { subgraphs: true }))
.tasks[0].state;
流式传输子图输出
要将子图的输出包含在流式输出中,您可以在父图的流方法中设置子图选项。这将流式传输父图和任何子图的输出。
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
subgraphs: true,
streamMode: "updates",
},
)) {
console.log(chunk);
}
- 设置
subgraphs: true 以流式传输子图的输出。
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";
// 定义子图
const SubgraphState = new StateSchema({
foo: z.string(),
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
// 注意,此节点使用仅在子图中可用的状态键 ('bar')
// 并在共享状态键 ('foo') 上发送更新
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// 定义父图
const ParentState = new StateSchema({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
subgraphs: true,
},
)) {
console.log(chunk);
}
- 设置
subgraphs: true 以流式传输子图的输出。
[[], { node1: { foo: 'hi! foo' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode1: { bar: 'bar' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode2: { foo: 'hi! foobar' } }]
[[], { node2: { foo: 'hi! foobar' } }]