Skip to main content
本指南解释了使用子图的机制。子图是一个,它被用作另一个图中的节点 子图对于以下方面很有用:
  • 构建多代理系统
  • 在多个图中重用一组节点
  • 分布式开发:当你希望不同的团队独立处理图的不同部分时,你可以将每个部分定义为子图,只要遵守子图接口(输入和输出模式),父图就可以在不知道子图任何细节的情况下构建

设置

npm install @langchain/langgraph
为 LangGraph 开发设置 LangSmith 注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序——在此处阅读有关如何开始的更多信息 here

定义子图通信

添加子图时,你需要定义父图和子图如何通信:
模式何时使用状态模式
在节点内调用子图父图和子图具有不同的状态模式(没有共享键),或者你需要在它们之间转换状态你编写一个包装函数,将父状态映射到子图输入,并将子图输出映射回父状态
将子图添加为节点父图和子图共享状态键——子图读取和写入与父图相同的通道你将编译后的子图直接传递给 add_node——不需要包装函数

在节点内调用子图

当父图和子图具有不同的状态模式(没有共享键)时,在节点函数内调用子图。当你想要为多代理系统中的每个代理保留私有消息历史记录时,这很常见。 节点函数在调用子图之前将父状态转换为子图状态,并在返回之前将结果转换回父状态。
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";

const SubgraphState = new StateSchema({
  bar: z.string(),
});

// Subgraph
const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "hi! " + state.bar };
  })
  .addEdge(START, "subgraphNode1");

const subgraph = subgraphBuilder.compile();

// Parent graph
const State = new StateSchema({
  foo: z.string(),
});

// Transform the state to the subgraph state and back
const builder = new StateGraph(State)
  .addNode("node1", async (state) => {
    const subgraphOutput = await subgraph.invoke({ bar: state.foo });
    return { foo: subgraphOutput.bar };
  })
  .addEdge(START, "node1");

const graph = builder.compile();
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";

// Define subgraph
const SubgraphState = new StateSchema({
  // note that none of these keys are shared with the parent graph state
  bar: z.string(),
  baz: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { baz: "baz" };
  })
  .addNode("subgraphNode2", (state) => {
    return { bar: state.bar + state.baz };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");

const subgraph = subgraphBuilder.compile();

// Define parent graph
const ParentState = new StateSchema({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", async (state) => {
    const response = await subgraph.invoke({ bar: state.foo });
    return { foo: response.bar };
  })
  .addEdge(START, "node1")
  .addEdge("node1", "node2");

const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  { subgraphs: true }
)) {
  console.log(chunk);
}
  1. 将状态转换为子图状态
  2. 将响应转换回父状态
[[], { node1: { foo: 'hi! foo' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode1: { baz: 'baz' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode2: { bar: 'hi! foobaz' } }]
[[], { node2: { foo: 'hi! foobaz' } }]
这是一个具有两级子图的示例:父 -> 子 -> 孙。
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import * as z from "zod";

// Grandchild graph
const GrandChildState = new StateSchema({
  myGrandchildKey: z.string(),
});

const grandchild = new StateGraph(GrandChildState)
  .addNode("grandchild1", (state) => {
    // NOTE: child or parent keys will not be accessible here
    return { myGrandchildKey: state.myGrandchildKey + ", how are you" };
  })
  .addEdge(START, "grandchild1")
  .addEdge("grandchild1", END);

const grandchildGraph = grandchild.compile();

// Child graph
const ChildState = new StateSchema({
  myChildKey: z.string(),
});

const child = new StateGraph(ChildState)
  .addNode("child1", async (state) => {
    // NOTE: parent or grandchild keys won't be accessible here
    const grandchildGraphInput = { myGrandchildKey: state.myChildKey };
    const grandchildGraphOutput = await grandchildGraph.invoke(grandchildGraphInput);
    return { myChildKey: grandchildGraphOutput.myGrandchildKey + " today?" };
  })   
  .addEdge(START, "child1")
  .addEdge("child1", END);

const childGraph = child.compile();

// Parent graph
const ParentState = new StateSchema({
  myKey: z.string(),
});

const parent = new StateGraph(ParentState)
  .addNode("parent1", (state) => {
    // NOTE: child or grandchild keys won't be accessible here
    return { myKey: "hi " + state.myKey };
  })
  .addNode("child", async (state) => {
    const childGraphInput = { myChildKey: state.myKey };
    const childGraphOutput = await childGraph.invoke(childGraphInput);
    return { myKey: childGraphOutput.myChildKey };
  })   
  .addNode("parent2", (state) => {
    return { myKey: state.myKey + " bye!" };
  })
  .addEdge(START, "parent1")
  .addEdge("parent1", "child")
  .addEdge("child", "parent2")
  .addEdge("parent2", END);

const parentGraph = parent.compile();

for await (const chunk of await parentGraph.stream(
  { myKey: "Bob" },
  { subgraphs: true }
)) {
  console.log(chunk);
}
  1. 我们正在将状态从子状态通道 (myChildKey) 转换为孙状态通道 (myGrandchildKey)
  2. 我们正在将状态从孙状态通道 (myGrandchildKey) 转换回子状态通道 (myChildKey)
  3. 我们在这里传递一个函数而不是仅传递编译后的图 (grandchildGraph)
  4. 我们正在将状态从父状态通道 (myKey) 转换为子状态通道 (myChildKey)
  5. 我们正在将状态从子状态通道 (myChildKey) 转换回父状态通道 (myKey)
  6. 我们在这里传递一个函数而不是仅传递编译后的图 (childGraph)
[[], { parent1: { myKey: 'hi Bob' } }]
[['child:2e26e9ce-602f-862c-aa66-1ea5a4655e3b', 'child1:781bb3b1-3971-84ce-810b-acf819a03f9c'], { grandchild1: { myGrandchildKey: 'hi Bob, how are you' } }]
[['child:2e26e9ce-602f-862c-aa66-1ea5a4655e3b'], { child1: { myChildKey: 'hi Bob, how are you today?' } }]
[[], { child: { myKey: 'hi Bob, how are you today?' } }]
[[], { parent2: { myKey: 'hi Bob, how are you today? bye!' } }]

将子图添加为节点

当父图和子图共享状态键时,你可以将编译后的子图直接传递给 add_node。无需包装函数——子图自动读取和写入父图的状态通道。例如,在多代理系统中,代理通常通过共享的 messages 键进行通信。 SQL agent graph 如果你的子图与父图共享状态键,你可以按照以下步骤将其添加到你的图中:
  1. 定义子图工作流(下例中的 subgraphBuilder)并编译它
  2. 在定义父图工作流时将编译后的子图传递给 .addNode 方法
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  foo: z.string(),
});

// Subgraph
const subgraphBuilder = new StateGraph(State)
  .addNode("subgraphNode1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addEdge(START, "subgraphNode1");

const subgraph = subgraphBuilder.compile();

// Parent graph
const builder = new StateGraph(State)
  .addNode("node1", subgraph)
  .addEdge(START, "node1");

const graph = builder.compile();
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";

// Define subgraph
const SubgraphState = new StateSchema({
  foo: z.string(),
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    // note that this node is using a state key ('bar') that is only available in the subgraph
    // and is sending update on the shared state key ('foo')
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");

const subgraph = subgraphBuilder.compile();

// Define parent graph
const ParentState = new StateSchema({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");

const graph = builder.compile();

for await (const chunk of await graph.stream({ foo: "foo" })) {
  console.log(chunk);
}
  1. 此键与父图状态共享
  2. 此键对于 SubgraphState 是私有的,对父图不可见
{ node1: { foo: 'hi! foo' } }
{ node2: { foo: 'hi! foobar' } }

子图持久化

当你使用子图时,你需要决定在调用之间其内部数据会发生什么。考虑一个委托给专家子代理的客户支持机器人:“计费专家”子代理应该记住客户之前的问题,还是每次被调用时都重新开始? 默认情况下,子图是无状态的(无记忆):每次调用都从空白状态开始。对于大多数应用程序来说,这是正确的选择,包括子代理处理独立请求的多代理系统。如果子代理需要多轮对话记忆(例如,一个通过多次交流建立上下文的研究助手),你可以使其有状态(持久记忆),以便其对话历史和数据在同一线程上的调用之间累积。
父图必须使用 checkpointer 进行编译,以便子图持久化功能(中断、状态检查、有状态记忆)工作。请参阅持久化
下面的示例使用 LangChain 的 @[create_agent],这是构建代理的常用方法。create_agent 在底层生成一个 LangGraph 图,因此所有子图持久化概念都直接适用。如果你使用原始 LangGraph StateGraph 构建,则适用相同的模式和配置选项——有关详细信息,请参阅 Graph API

无状态

当每次对子图的调用都是独立的并且子代理不需要记住先前调用中的任何内容时,使用无状态子图。这是最常见的模式,特别是对于多代理系统,其中子代理处理一次性请求,如“查找此客户的订单”或“总结此文档”。 根据你是否需要子图中的中断(人机交互暂停)和持久执行,有两个无状态选项。

带中断

这是大多数应用程序的推荐模式,包括将子代理作为工具调用的多代理系统。它支持中断、持久执行和并行调用,同时保持每次调用隔离。
当你想要一个在调用之间没有记忆的子代理,但你仍然需要持久执行和在运行中途暂停以供用户输入的能力(例如,在采取行动之前请求批准)时,请使用此选项。这是默认行为:省略 checkpointer 或将其设置为 None。每次调用都重新开始,但在单次调用中,子图可以使用 interrupt() 来暂停和恢复。 以下示例使用两个包装为外部代理工具的子代理(水果专家、蔬菜专家):
import { createAgent, tool } from "langchain";
import { MemorySaver, Command, interrupt } from "@langchain/langgraph";
import * as z from "zod";

const fruitInfo = tool(
  (input) => `Info about ${input.fruitName}`,
  {
    name: "fruit_info",
    description: "Look up fruit info.",
    schema: z.object({ fruitName: z.string() }),
  }
);

const veggieInfo = tool(
  (input) => `Info about ${input.veggieName}`,
  {
    name: "veggie_info",
    description: "Look up veggie info.",
    schema: z.object({ veggieName: z.string() }),
  }
);

// Subagents — no checkpointer setting (inherits parent)
const fruitAgent = createAgent({
  model: "gpt-4.1-mini",
  tools: [fruitInfo],
  prompt: "You are a fruit expert. Use the fruit_info tool. Respond in one sentence.",
});

const veggieAgent = createAgent({
  model: "gpt-4.1-mini",
  tools: [veggieInfo],
  prompt: "You are a veggie expert. Use the veggie_info tool. Respond in one sentence.",
});

// Wrap subagents as tools for the outer agent
const askFruitExpert = tool(
  async (input) => {
    const response = await fruitAgent.invoke({
      messages: [{ role: "user", content: input.question }],
    });
    return response.messages[response.messages.length - 1].content;
  },
  {
    name: "ask_fruit_expert",
    description: "Ask the fruit expert. Use for ALL fruit questions.",
    schema: z.object({ question: z.string() }),
  }
);

const askVeggieExpert = tool(
  async (input) => {
    const response = await veggieAgent.invoke({
      messages: [{ role: "user", content: input.question }],
    });
    return response.messages[response.messages.length - 1].content;
  },
  {
    name: "ask_veggie_expert",
    description: "Ask the veggie expert. Use for ALL veggie questions.",
    schema: z.object({ question: z.string() }),
  }
);

// Outer agent with checkpointer
const agent = createAgent({
  model: "gpt-4.1-mini",
  tools: [askFruitExpert, askVeggieExpert],
  prompt:
    "You have two experts: ask_fruit_expert and ask_veggie_expert. " +
    "ALWAYS delegate questions to the appropriate expert.",
  checkpointer: new MemorySaver(),
});
每次调用都可以使用 interrupt() 来暂停和恢复。将 interrupt() 添加到工具函数中以在继续之前要求用户批准:
const fruitInfo = tool(
  (input) => {
    interrupt("continue?");
    return `Info about ${input.fruitName}`;
  },
  {
    name: "fruit_info",
    description: "Look up fruit info.",
    schema: z.object({ fruitName: z.string() }),
  }
);
const config = { configurable: { thread_id: "1" } };

// Invoke — the subagent's tool calls interrupt()
let response = await agent.invoke(
  { messages: [{ role: "user", content: "Tell me about apples" }] },
  config,
);
// response contains __interrupt__

// Resume — approve the interrupt
response = await agent.invoke(new Command({ resume: true }), config);
// Subagent message count: 4

不带中断

当你想要像普通函数调用一样运行子代理且没有检查点开销时,请使用此选项。子图无法暂停/恢复,也无法从持久执行中受益。使用 checkpointer=False 进行编译。
没有检查点,子图就没有持久执行。如果进程在运行中途崩溃,子图无法恢复,必须从头开始重新运行。
const subgraphBuilder = new StateGraph(...);
const subgraph = subgraphBuilder.compile({ checkpointer: false });

有状态

当子代理需要记住以前的交互时,使用有状态子图。例如,一个通过多次交流建立上下文的研究助手,或一个跟踪已编辑文件的编码助手。通过有状态持久化,子代理的对话历史和数据会在同一线程的调用之间累积。每次调用都会从上一次停止的地方继续。 使用 checkpointer=True 编译以启用此行为。
有状态子图不支持并行工具调用。当 LLM 可以访问有状态子代理作为工具时,它可能会尝试并行多次调用该工具(例如,同时向水果专家询问苹果和香蕉)。这会导致检查点冲突,因为两个调用都写入相同的命名空间。下面的示例使用 LangChain 的 ToolCallLimitMiddleware 来防止这种情况。如果你使用纯 LangGraph StateGraph 构建,你需要自己防止并行工具调用——例如,通过配置你的模型以禁用并行工具调用,或通过添加逻辑以确保同一子图不会被并行调用多次。
以下示例使用编译为 checkpointer=True 的水果专家子代理:
import { createAgent, tool, toolCallLimitMiddleware } from "langchain";
import { MemorySaver, Command, interrupt } from "@langchain/langgraph";
import * as z from "zod";

const fruitInfo = tool(
  (input) => `Info about ${input.fruitName}`,
  {
    name: "fruit_info",
    description: "Look up fruit info.",
    schema: z.object({ fruitName: z.string() }),
  }
);

// Subagent with checkpointer=true for persistent state
const fruitAgent = createAgent({
  model: "gpt-4.1-mini",
  tools: [fruitInfo],
  prompt: "You are a fruit expert. Use the fruit_info tool. Respond in one sentence.",
  checkpointer: true,
});

// Wrap subagent as a tool for the outer agent
const askFruitExpert = tool(
  async (input) => {
    const response = await fruitAgent.invoke({
      messages: [{ role: "user", content: input.question }],
    });
    return response.messages[response.messages.length - 1].content;
  },
  {
    name: "ask_fruit_expert",
    description: "Ask the fruit expert. Use for ALL fruit questions.",
    schema: z.object({ question: z.string() }),
  }
);

// Outer agent with checkpointer
// Use toolCallLimitMiddleware to prevent parallel calls to stateful subagents,
// which would cause checkpoint conflicts.
const agent = createAgent({
  model: "gpt-4.1-mini",
  tools: [askFruitExpert],
  prompt: "You have a fruit expert. ALWAYS delegate fruit questions to ask_fruit_expert.",
  middleware: [  
    toolCallLimitMiddleware({ toolName: "ask_fruit_expert", runLimit: 1 }),
  ],
  checkpointer: new MemorySaver(),
});
有状态子代理支持 interrupt(),就像每次调用一样。将 interrupt() 添加到工具函数中以要求用户批准:
const fruitInfo = tool(
  (input) => {
    interrupt("continue?");
    return `Info about ${input.fruitName}`;
  },
  {
    name: "fruit_info",
    description: "Look up fruit info.",
    schema: z.object({ fruitName: z.string() }),
  }
);
const config = { configurable: { thread_id: "1" } };

// Invoke — the subagent's tool calls interrupt()
let response = await agent.invoke(
  { messages: [{ role: "user", content: "Tell me about apples" }] },
  config,
);
// response contains __interrupt__

// Resume — approve the interrupt
response = await agent.invoke(new Command({ resume: true }), config);
// Subagent message count: 4

Checkpointer 参考

使用 .compile() 上的 checkpointer 参数控制子图持久化:
const subgraph = builder.compile({ checkpointer: false });  // or true, or null
特性不带中断带中断(默认)有状态
checkpointer=FalseNoneTrue
中断 (HITL)
多轮记忆
多次调用(不同子图)
多次调用(同一子图)
状态检查
  • 中断 (HITL):子图可以使用 interrupt() 暂停执行并等待用户输入,然后从中断处恢复。
  • 多轮记忆:子图在同一线程内的多次调用中保留其状态。每次调用都从上一次停止的地方继续,而不是重新开始。
  • 多次调用(不同子图):可以在单个节点内调用多个不同的子图实例,而不会发生检查点命名空间冲突。
  • 多次调用(同一子图):可以在单个节点内多次调用同一个子图实例。对于有状态持久化,这些调用写入相同的检查点命名空间并发生冲突——请改用每次调用持久化。
  • 状态检查:子图的状态可通过 get_state(config, subgraphs=True) 获得,用于调试和监控。

查看子图状态

当你启用持久化时,你可以使用 subgraphs 选项检查子图状态。使用 checkpointer=False 时,不会保存子图检查点,因此子图状态不可用。
查看子图状态要求 LangGraph 可以静态发现子图——即,它被添加为节点在节点内调用。当子图在工具函数或其他间接方式(例如,子代理模式)中调用时,它不起作用。无论嵌套如何,中断仍会传播到顶层图。
返回仅当前调用的子图状态。每次调用都重新开始。
import { StateGraph, StateSchema, START, MemorySaver, interrupt, Command } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  foo: z.string(),
});

// Subgraph
const subgraphBuilder = new StateGraph(State)
  .addNode("subgraphNode1", (state) => {
    const value = interrupt("Provide value:");
    return { foo: state.foo + value };
  })
  .addEdge(START, "subgraphNode1");

const subgraph = subgraphBuilder.compile();  // inherits parent checkpointer

// Parent graph
const builder = new StateGraph(State)
  .addNode("node1", subgraph)
  .addEdge(START, "node1");

const checkpointer = new MemorySaver();
const graph = builder.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };

await graph.invoke({ foo: "" }, config);

// View subgraph state for the current invocation
const subgraphState = (await graph.getState(config, { subgraphs: true })).tasks[0].state;

// Resume the subgraph
await graph.invoke(new Command({ resume: "bar" }), config);

流式传输子图输出

要在流式输出中包含来自子图的输出,你可以在父图的 stream 方法中设置 subgraphs 选项。这将从父图和任何子图流式传输输出。
for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    subgraphs: true,
    streamMode: "updates",
  }
)) {
  console.log(chunk);
}
  1. 设置 subgraphs: true 以从子图流式传输输出。
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";

// Define subgraph
const SubgraphState = new StateSchema({
  foo: z.string(),
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    // note that this node is using a state key ('bar') that is only available in the subgraph
    // and is sending update on the shared state key ('foo')
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");

const subgraph = subgraphBuilder.compile();

// Define parent graph
const ParentState = new StateSchema({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");

const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    streamMode: "updates",
    subgraphs: true,
  }
)) {
  console.log(chunk);
}
  1. 设置 subgraphs: true 以从子图流式传输输出。
[[], { node1: { foo: 'hi! foo' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode1: { bar: 'bar' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode2: { foo: 'hi! foobar' } }]
[[], { node2: { foo: 'hi! foobar' } }]