Skip to main content

图 (Graphs)

从核心上讲,LangGraph 将代理工作流建模为图。您使用三个关键组件定义代理的行为:
  1. State (状态):一个共享的数据结构,代表您的应用程序的当前快照。它可以是任何数据类型,但通常使用共享状态模式定义。
  2. Nodes (节点):编码代理逻辑的函数。它们接收当前状态作为输入,执行一些计算或副作用,并返回更新的状态。
  3. Edges (边):根据当前状态确定下一个执行哪个 Node 的函数。它们可以是条件分支或固定转换。
通过组合 NodesEdges,您可以创建复杂的、循环的工作流,随时间推移演变状态。然而,真正的力量来自于 LangGraph 如何管理该状态。 要强调的是:NodesEdges 只不过是函数——它们可以包含 LLM 或只是普通的旧代码。 简而言之:节点做工作,边告诉下一步做什么 LangGraph 的底层图算法使用 消息传递 来定义通用程序。当节点完成其操作时,它会沿着一条或多条边向其他节点发送消息。这些接收节点随后执行其函数,将结果消息传递给下一组节点,过程继续进行。受 Google 的 Pregel 系统启发,程序在离散的“超级步骤 (super-steps)”中进行。 超级步骤可以被认为是图节点的一次单次迭代。并行运行的节点属于同一个超级步骤,而顺序运行的节点属于单独的超级步骤。在图执行开始时,所有节点都以 inactive(非活动)状态开始。当节点在其任何传入边(或“通道”)上收到新消息(状态)时,它变为 active(活动)。活动节点随后运行其函数并以更新进行响应。在每个超级步骤结束时,没有传入消息的节点通过将自己标记为 inactive 来投票 halt(停止)。当所有节点都处于 inactive 状态且没有消息在传输中时,图执行终止。

StateGraph

StateGraph 类是要使用的主要图类。这是由用户定义的 State 对象参数化的。

编译您的图

要构建您的图,您首先定义 状态,然后添加 节点,然后编译它。究竟什么是编译图,为什么需要它? 编译是一个非常简单的步骤。它对图的结构进行一些基本检查(没有孤立节点等)。这也是您可以指定运行时参数(如 检查点器 和断点)的地方。您只需调用 .compile 方法即可编译您的图:
const graph = new StateGraph(StateAnnotation)
  .addNode("nodeA", nodeA)
  .addEdge(START, "nodeA")
  .addEdge("nodeA", END)
  .compile();
必须 在使用之前编译您的图。

状态 (State)

定义图时要做的第一件事是定义图的 StateState图的模式 以及指定如何将更新应用于状态的 reducer 函数 组成。State 的模式将是图中所有 NodesEdges 的输入模式。您使用 StateSchema 类定义状态,该类接受任何 标准模式(如 Zod)用于各个字段,以及特殊值类型如 ReducedValueMessagesValue。所有 Nodes 将发出对 State 的更新,然后使用指定的 reducer 函数应用这些更新。

模式 (Schema)

指定图模式的主要方法是使用 StateSchema 类。模式中的每个字段可以是:
  • 用于简单字段的 标准模式(变为“最后值”通道,更新时覆盖)
  • 用于需要自定义 reducer 函数的字段的 ReducedValue(当节点并行运行时)
  • 用于聊天消息列表的 MessagesValue(预构建了消息感知 reducer)
  • 用于不应进行检查点的瞬态状态的 UntrackedValue
import {
  StateSchema,
  ReducedValue,
  MessagesValue,
  UntrackedValue
} from "@langchain/langgraph";
import { z } from "zod/v4";

const AgentState = new StateSchema({
  // 带有内置 reducer 的预构建消息值
  messages: MessagesValue,

  // 简单字段直接使用 Zod 模式
  currentStep: z.string(),

  // 带有默认值的字段
  retryCount: z.number().default(0),

  // 用于累积值的自定义 reducer
  allSteps: new ReducedValue(
    z.array(z.string()).default(() => []),
    {
      inputSchema: z.string(),
      reducer: (current, newStep) => [...current, newStep],
    }
  ),

  // 未保存到检查点的瞬态状态
  tempCache: new UntrackedValue(z.record(z.string(), z.unknown())),
});

// 类型提取
type State = typeof AgentState.State;   // 完整状态类型
type Update = typeof AgentState.Update; // 部分更新类型

// 在图中使用
const graph = new StateGraph(AgentState)
  .addNode("myNode", ...)
  .compile();
默认情况下,图将具有相同的输入和输出模式。如果您想更改这一点,您也可以直接指定显式的输入和输出模式。这在您有很多键,有些明确用于输入,其他用于输出时很有用。

多个模式

通常,所有图节点都使用单个模式进行通信。这意味着它们将读取和写入相同的状态通道。但是,有些情况下我们希望对此进行更多控制:
  • 内部节点可以传递图的输入/输出中不需要的信息。
  • 我们可能还希望为图使用不同的输入/输出模式。例如,输出可能仅包含单个相关输出键。
可以让节点写入图内部的私有状态通道以进行内部节点通信。我们可以简单地定义一个私有模式,PrivateState 也可以为图定义显式的输入和输出模式。在这些情况下,我们定义一个包含与图操作相关的 所有 键的“内部”模式。但是,我们也定义 inputoutput 模式,它们是“内部”模式的子集,以约束图的输入和输出。有关更多详细信息,请参阅 本指南 让我们看一个例子:
import { StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const InputState = new StateSchema({
  userInput: z.string(),
});

const OutputState = new StateSchema({
  graphOutput: z.string(),
});

const OverallState = new StateSchema({
  foo: z.string(),
  userInput: z.string(),
  graphOutput: z.string(),
});

const PrivateState = new StateSchema({
  bar: z.string(),
});

const graph = new StateGraph({
  state: OverallState,
  input: InputState,
  output: OutputState,
})
  .addNode("node1", (state) => {
    // 写入 OverallState
    return { foo: state.userInput + " name" };
  })
  .addNode("node2", (state) => {
    // 读取 OverallState,写入 PrivateState
    return { bar: state.foo + " is" };
  })
  .addNode(
    "node3",
    (state) => {
      // 读取 PrivateState,写入 OutputState
      return { graphOutput: state.bar + " Lance" };
    },
    { input: PrivateState }
  )
  .addEdge(START, "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .addEdge("node3", END)
  .compile();

await graph.invoke({ userInput: "My" });
// { graphOutput: 'My name is Lance' }
这里有两个微妙且重要的点需要注意:
  1. 我们将 state 作为输入模式传递给 node1。但是,我们写入 foo,这是 OverallState 中的一个通道。我们如何能够写入未包含在输入模式中的状态通道?这是因为节点 可以写入图状态中的任何状态通道。图状态是初始化时定义的状态通道的并集,包括 OverallState 和过滤器 InputStateOutputState
  2. 我们使用 StateGraph({ state: OverallState, input: InputState, output: OutputState }) 初始化图。那么,我们如何能在 node2 中写入 PrivateState?如果未在 StateGraph 初始化中传递该模式,图如何获得对该模式的访问权限?我们可以这样做,因为 只要状态模式定义存在,节点也可以声明额外的状态通道。在这种情况下,定义了 PrivateState 模式,因此我们可以将 bar 作为新的状态通道添加到图中并写入它。

Reducers

Reducer 是理解节点更新如何应用于 State 的关键。State 中的每个键都有其自己独立的 reducer 函数。如果未明确指定 reducer 函数,则假定对该键的所有更新都应覆盖它。有几种不同类型的 reducer,从默认类型的 reducer 开始:

默认 reducer

这两个示例展示了如何使用默认 reducer:
Example A
import { StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  foo: z.number(),
  bar: z.array(z.string()),
});
在这个例子中,没有为任何键指定 reducer 函数。让我们假设图的输入是: { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。请注意,Node 不需要返回整个 State 模式 - 只需返回更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["bye"] }
Example B
import { StateSchema, ReducedValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  foo: z.number(),
  bar: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});
在这个例子中,我们使用 ReducedValue 为第二个键 (bar) 指定了一个 reducer 函数。请注意,第一个键保持不变。让我们假设图的输入是 { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。请注意,Node 不需要返回整个 State 模式 - 只需返回更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["hi", "bye"] }。请注意,这里的 bar 键通过将两个数组连接在一起进行了更新。

未跟踪的值 (Untracked values)

UntrackedValue 用于应该在图执行期间存在但 永远不应进行检查点 的状态字段。当图从检查点恢复时,未跟踪的值将重置为其初始状态(或不可用)。 这对于以下情况很有用:
  • 无法序列化的 数据库连接
  • 应该在恢复时重建的 临时缓存
  • 您不想持久化的 大对象
  • 应该每次都新鲜传递的 仅运行时配置
import { StateSchema, UntrackedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: MessagesValue,

  // 未跟踪:如果在同一步骤中有多个节点写入则抛出异常(guard: true 是默认值)
  dbConnection: new UntrackedValue<DatabaseConnection>(),

  // 带有 guard: false 的未跟踪允许其多次写入,保留最后一个值
  tempCache: new UntrackedValue(
    z.record(z.string(), z.unknown()),
    { guard: false }
  ),

  // 没有模式的未跟踪(为了最大的灵活性)
  runtimeConfig: new UntrackedValue(),
});
行为:
  • 在执行期间:值像正常状态一样存储和访问
  • 在检查点时:未跟踪的值被 排除 在检查点数据之外
  • 在恢复时:未跟踪的值重新开始(空或使用其默认值)
  • 使用 guard: true(默认):如果在同一步骤中有多个节点写入,则抛出错误
  • 使用 guard: false:允许多次写入,最后一个值胜出
不要将 UntrackedValue 用于需要在中断或时间旅行中持久化的数据。对持久数据使用常规状态字段或 ReducedValue

类型实用程序

LangGraph 提供了几个类型实用程序,以便在定义节点和条件边时获得更好的 TypeScript 类型安全性。

GraphNode

使用 GraphNode 来为在图构建器外部定义的节点函数进行类型定义:
import { GraphNode, StateSchema, Command } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  count: z.number().default(0),
  result: z.string(),
});

// 基本节点 - 接收状态,返回部分更新
const incrementNode: GraphNode<typeof State> = (state) => {
  return { count: state.count + 1 };
};

// 异步节点
const fetchNode: GraphNode<typeof State> = async (state, config) => {
  const response = await fetch(`/api/data/${state.count}`);
  return { result: await response.text() };
};

// 带有 Command 路由的节点 - 指定有效目的地
const routerNode: GraphNode<typeof State, "process" | "done"> = (state) => {
  if (state.count >= 10) {
    return new Command({ goto: "done" });
  }
  return new Command({
    update: { count: state.count + 1 },
    goto: "process"
  });
};

State.Node 简写

每个 StateSchema 实例都有一个 Node 属性,提供用于键入节点的简写:
const State = new StateSchema({
  messages: MessagesValue,
  step: z.string(),
});

// 这些是等效的:
const myNode1: GraphNode<typeof State> = (state) => ({ step: "done" });
const myNode2: typeof State.Node = (state) => ({ step: "done" });

ConditionalEdgeRouter

使用 ConditionalEdgeRouter 用于条件边中的路由函数(没有状态更新,只是路由):
import { ConditionalEdgeRouter, END } from "@langchain/langgraph";

const State = new StateSchema({
  shouldContinue: z.boolean(),
  step: z.string(),
});

// 路由器返回节点名称或 END
const router: ConditionalEdgeRouter<typeof State, "process" | "summarize"> = (state) => {
  if (!state.shouldContinue) {
    return END;
  }
  return state.step === "initial" ? "process" : "summarize";
};

// 在图中使用
graph.addConditionalEdges("check", router);

StateSchema.StateStateSchema.Update

从模式中提取状态和更新类型以用于自定义类型定义:
import { StateSchema } from "@langchain/langgraph";

const MyStateSchema = new StateSchema({
  messages: MessagesValue,
  count: z.number().default(0),
});

// 提取完整状态类型
type MyState = typeof MyStateSchema.State;
// { messages: BaseMessage[], count: number }

// 提取更新类型(部分,带有 reducer 输入类型)
type MyUpdate = typeof MyStateSchema.Update;
// { messages?: Messages, count?: number }

在图状态中使用消息

为什么要使用消息?

大多数现代 LLM 提供商都有一个接受消息列表作为输入的聊天模型接口。LangChain 的 聊天模型接口 特别接受消息对象列表作为输入。这些消息有多种形式,例如 HumanMessage(用户输入)或 AIMessage(LLM 响应)。 要阅读更多关于消息对象的内容,请参阅 消息概念指南

在您的图中使用消息

在许多情况下,将之前的对话历史记录作为消息列表存储在图状态中是很有帮助的。为此,您可以使用预构建的 MessagesValue,它提供了一个消息感知 reducer,可以自动处理消息 ID、更新和删除。 MessagesValue reducer 对于告诉图如何在每次状态更新时更新状态中的 Message 对象列表至关重要。如果您不指定 reducer,每次状态更新都会用最近提供的值覆盖消息列表。MessagesValue 正确处理了这一点:对于全新的消息,它会追加到现有列表中;对于现有消息(通过 ID 匹配),它会就地更新它们。
MessagesValue 实际上是 ReducedValue 的一个特例,预配置了一个内部 messagesStateReducer 来处理消息列表和更新。这为 LangGraph 图中的聊天消息历史记录提供了方便的、消息感知的状态管理。

序列化

除了跟踪消息 ID 之外,只要在 messages 通道上收到状态更新,MessagesValue 还会尝试将消息反序列化为 LangChain Message 对象。这允许以下列格式发送图输入/状态更新:
// 这是支持的
{
  messages: [new HumanMessage("message")];
}

// 这也是支持的
{
  messages: [{ role: "human", content: "message" }];
}
由于在使用 MessagesValue 时状态更新总是反序列化为 LangChain Messages,因此您应该使用点表示法访问消息属性,如 state.messages.at(-1).content。下面是一个使用 MessagesValue 的图示例:
import { StateGraph, StateSchema, MessagesValue } from "@langchain/langgraph";

const State = new StateSchema({
  messages: MessagesValue,
});

const graph = new StateGraph(State)
  ...
messages 字段被定义为 MessagesValue,这是一个带有内置 reducer 的 BaseMessage 对象列表。通常,要跟踪的状态不仅仅是消息,所以我们看到人们扩展此状态并添加更多字段,如:
import { StateSchema, MessagesValue } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  documents: z.array(z.string()),
});

节点 (Nodes)

在 LangGraph 中,节点通常是接受以下参数的函数(同步或异步):
  1. state – 图的 状态
  2. config – 一个 RunnableConfig 对象,包含配置信息(如 thread_id)和跟踪信息(如 tags
您可以使用 addNode 方法向图中添加节点。为了更好的类型安全性,请使用 GraphNode 类型实用程序或 State.Node 来键入您的节点函数:
import { StateGraph, StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  input: z.string(),
  results: z.string(),
});

// 选项 1:使用 GraphNode 类型实用程序
const myNode: GraphNode<typeof State> = (state, config) => {
  console.log("In node: ", config?.configurable?.user_id);
  return { results: `Hello, ${state.input}!` };
};

// 选项 2:使用 State.Node 简写
const otherNode: typeof State.Node = (state) => {
  return state;
};

const builder = new StateGraph(State)
  .addNode("myNode", myNode)
  .addNode("otherNode", otherNode)
  ...
在幕后,函数被转换为 @[RunnableLambda],它为您的函数添加批处理和异步支持,以及 原生跟踪和调试 如果您在不指定名称的情况下向图中添加节点,它将被赋予与函数名称等效的默认名称。
builder.addNode(myNode);
// 然后,您可以通过将其引用为 `"myNode"` 来创建进出此节点的边

START 节点

START 节点是一个特殊节点,代表将用户输入发送到图的节点。引用此节点的主要目的是确定应首先调用哪些节点。
import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

END 节点

END 节点是一个特殊节点,代表终端节点。当您想表示某些边在完成后没有操作时,引用此节点。
import { END } from "@langchain/langgraph";

graph.addEdge("nodeA", END);

节点缓存

LangGraph 支持基于节点输入对任务/节点进行缓存。要使用缓存:
  • 编译图(或指定入口点)时指定缓存
  • 为节点指定缓存策略。每个缓存策略支持:
    • keyFunc,用于根据节点输入生成缓存键。
    • ttl,缓存的生存时间(秒)。如果未指定,缓存将永不过期。
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import { InMemoryCache } from "@langchain/langgraph-checkpoint";
import { z } from "zod/v4";

const State = new StateSchema({
  x: z.number(),
  result: z.number(),
});

const expensiveNode: GraphNode<typeof State> = async (state) => {
  // 模拟昂贵的操作
  await new Promise((resolve) => setTimeout(resolve, 3000));
  return { result: state.x * 2 };
};

const graph = new StateGraph(State)
  .addNode("expensive_node", expensiveNode, { cachePolicy: { ttl: 3 } })
  .addEdge(START, "expensive_node")
  .compile({ cache: new InMemoryCache() });

await graph.invoke({ x: 5 }, { streamMode: "updates" });
// [{"expensive_node": {"result": 10}}]
await graph.invoke({ x: 5 }, { streamMode: "updates" });
// [{"expensive_node": {"result": 10}, "__metadata__": {"cached": true}}]

边 (Edges)

边定义了逻辑如何路由以及图如何决定停止。这是您的代理如何工作以及不同节点如何相互通信的重要组成部分。有几种关键类型的边:
  • 正常边:直接从一个节点转到下一个节点。
  • 条件边:调用函数以确定下一步转到哪个节点。
  • 入口点:用户输入到达时首先调用哪个节点。
  • 条件入口点:调用函数以确定用户输入到达时首先调用哪个节点。
一个节点可以有多个传出边。如果一个节点有多个传出边,所有这些目标节点都将作为下一个超级步骤的一部分 并行 执行。

正常边

如果您 总是 想从节点 A 转到节点 B,您可以直接使用 addEdge 方法。
graph.addEdge("nodeA", "nodeB");

条件边

如果您想 可选地 路由到一个或多个边(或可选地终止),您可以使用 addConditionalEdges 方法。此方法接受节点的名称和在该节点执行后调用的“路由函数”:
graph.addConditionalEdges("nodeA", routingFunction);
与节点类似,routingFunction 接受图的当前 state 并返回一个值。 默认情况下,返回值 routingFunction 用作将状态发送到的下一个节点(或节点列表)的名称。所有这些节点都将作为下一个超级步骤的一部分并行运行。 您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。
graph.addConditionalEdges("nodeA", routingFunction, {
  true: "nodeB",
  false: "nodeC",
});
如果您想在单个函数中组合状态更新和路由,请使用 Command 而不是条件边。

入口点

入口点是图启动时首先运行的节点。您可以使用虚拟 START 节点到要执行的第一个节点的 addEdge 方法来指定从何处进入图。
import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

条件入口点

条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点的 addConditionalEdges 来实现这一点。
import { START } from "@langchain/langgraph";

graph.addConditionalEdges(START, routingFunction);
您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。
graph.addConditionalEdges(START, routingFunction, {
  true: "nodeB",
  false: "nodeC",
});

Send

默认情况下,NodesEdges 是提前定义的,并对相同的共享状态进行操作。但是,在某些情况下,确切的边无法提前知道和/或您可能希望同时存在不同版本的 State。这方面的一个常见示例是 map-reduce 设计模式。在此设计模式中,第一个节点可能会生成一个对象列表,您可能希望将某个其他节点应用于所有这些对象。对象的数量可能无法提前知道(意味着边的数量可能无法知道),并且下游 Node 的输入 State 应该是不同的(每个生成的对象一个)。 为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:第一个是节点的名称,第二个是传递给该节点的状态。
import { Send } from "@langchain/langgraph";

graph.addConditionalEdges("nodeA", (state) => {
  return state.subjects.map(
    (subject) => new Send("generateJoke", { subject })
  );
});

Command

Command 是控制图执行的多功能原语。它接受四个参数:
  • update:应用状态更新(类似于从节点返回更新)。
  • goto:导航到特定节点(类似于 条件边)。
  • graph:从 子图 导航时定位父图。
  • resume:在 中断 后提供一个值以恢复执行。
Command 用于三种上下文:

从节点返回

updategoto

从节点函数返回 Command 以在单步中更新状态并路由到下一个节点:
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  return new Command({
    update: { foo: "bar" },
    goto: "myOtherNode",
  });
});
使用 Command,您还可以实现动态控制流行为(与 条件边 相同):
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  if (state.foo === "bar") {
    return new Command({
      update: { foo: "baz" },
      goto: "myOtherNode",
    });
  }
});
当您需要 同时 更新状态 路由到不同节点时,请使用 Command。如果您只需要路由而不需要更新状态,请改用 条件边 在节点函数中使用 Command 时,添加节点时必须添加 ends 参数以指定它可以路由到哪些节点:
builder.addNode("myNode", myNode, {
  ends: ["myOtherNode", END],
});
Command 仅添加动态边 — 使用 add_edge / addEdge 定义的静态边仍会执行。例如,如果 node_a 返回 Command(goto="my_other_node") 并且您还有 graph.add_edge("node_a", "node_b"),那么 node_bmy_other_node 都会运行。
查看此 操作指南,了解如何使用 Command 的端到端示例。

graph

如果您正在使用 子图,您可以通过在 Command 中指定 graph: Command.PARENT 从子图中的节点导航到父图中的不同节点:
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  return new Command({
    update: { foo: "bar" },
    goto: "otherSubgraph", // 其中 `otherSubgraph` 是父图中的一个节点
    graph: Command.PARENT,
  });
});
graph 设置为 Command.PARENT 将导航到最近的父图。当您从子图节点向父图节点发送父图和子图 状态模式 共享的键的更新时,您 必须 为父图状态中正在更新的键定义一个 reducer
这在实现 多代理切换 时特别有用。有关详细信息,请参阅 本指南

invoke/stream 的输入

new Command({ resume: ... }) 是打算作为 invoke()/stream() 输入的 唯一 Command 模式。不要使用 new Command({ update: ... }) 作为输入来继续多轮对话——因为传递任何 Command 作为输入都会从最新的检查点(即最后运行的步骤,而不是 __start__)恢复,如果图已经完成,它将显得卡住。要在现有线程上继续对话,请传递一个普通的输入对象:
// 错误 — 图从最新的检查点恢复
// (最后运行的步骤),显得卡住
await graph.invoke(new Command({ update: { messages: [{ role: "user", content: "follow up" }] } }), config);

// 正确 — 普通对象从 __start__ 重新开始
await graph.invoke({ messages: [{ role: "user", content: "follow up" }] }, config);

resume

使用 new Command({ resume: ... }) 提供一个值并在 中断 后恢复图执行。传递给 resume 的值将成为暂停节点内 interrupt() 调用的返回值:
import { Command, interrupt } from "@langchain/langgraph";

const humanReview = async (state: typeof StateAnnotation.State) => {
  // 暂停图并等待一个值
  const answer = interrupt("Do you approve?");
  return { messages: [{ role: "user", content: answer }] };
};

// 第一次调用 — 遇到中断并暂停
const result = await graph.invoke({ messages: [...] }, config);

// 用一个值恢复 — interrupt() 调用返回 "yes"
const resumed = await graph.invoke(new Command({ resume: "yes" }), config);
查看 中断概念指南 了解中断模式的完整详细信息,包括多重中断和验证循环。

从工具返回

您可以从工具返回 Command 以更新图状态和控制流。使用 update 修改状态(例如,保存对话期间查找的客户信息),并使用 goto 在工具完成后路由到特定节点。
在工具内部使用时,goto 添加了一条动态边 — 调用工具的节点上已定义的任何静态边仍将执行。
有关详细信息,请参阅 本指南

图迁移

LangGraph 可以轻松处理图定义(节点、边和状态)的迁移,即使在使用检查点器跟踪状态时也是如此。
  • 对于图末尾的线程(即未中断),您可以更改图的整个拓扑(即所有节点和边,删除、添加、重命名等)
  • 对于当前中断的线程,我们支持除重命名/删除节点以外的所有拓扑更改(因为该线程现在可能即将进入不再存在的节点)— 如果这是一个阻碍,请联系我们,我们可以优先考虑解决方案。
  • 对于修改状态,我们完全支持添加和删除键的向后和向前兼容性
  • 重命名的状态键会丢失现有线程中保存的状态
  • 类型以不兼容方式更改的状态键目前可能会在更改之前具有状态的线程中导致问题 — 如果这是一个阻碍,请联系我们,我们可以优先考虑解决方案。

运行时上下文

创建图时,您可以指定一个 contextSchema 用于传递给节点的运行时上下文。这对于将不属于图状态的信息传递给节点很有用。例如,您可能希望传递诸如模型名称或数据库连接之类的依赖项。
import { StateGraph, StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  input: z.string(),
  output: z.string(),
});

const ContextSchema = z.object({
  llm: z.union([z.literal("openai"), z.literal("anthropic")]),
});

const graph = new StateGraph(State, ContextSchema);
然后,您可以使用 context 属性将此配置传递到图中。
const config = { context: { llm: "anthropic" } };

await graph.invoke(inputs, config);
然后,您可以在节点或条件边内访问并使用此上下文:
import { Runtime, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const nodeA: GraphNode<typeof State> = (state, config) => {
  const llm = getLLM(runtime.context?.llm);
  // ...
  return {};
};
有关配置的完整细分,请参阅 本指南
graph.addNode("myNode", (state, config) => {
  const llmType = config.context?.llm || "openai";
  const llm = getLLM(llmType);
  return { results: `Hello, ${state.input}!` };
});

递归限制

递归限制设置图在单次执行期间可以执行的最大 超级步骤 数。一旦达到限制,LangGraph 将引发 GraphRecursionError。默认情况下,此值设置为 25 步。递归限制可以在运行时在任何图上设置,并通过配置对象传递给 invoke/stream。重要的是,recursionLimit 是一个独立的 config 键,不应像所有其他用户定义配置那样在 configurable 键内传递。请参阅下面的示例:
await graph.invoke(inputs, {
  recursionLimit: 5,
  context: { llm: "anthropic" },
});

访问和处理递归计数器

当前步骤计数器可在任何节点内的 config.metadata.langgraph_step 中访问,允许在达到递归限制之前进行主动递归处理。这使您能够在图逻辑中实施优雅降级策略。

它是如何工作的

步骤计数器存储在 config.metadata.langgraph_step 中。递归限制检查遵循逻辑:step > stop 其中 stop = step + recursionLimit + 1。当超过限制时,LangGraph 引发 GraphRecursionError

访问当前步骤计数器

您可以在任何节点内访问当前步骤计数器以监控执行进度。
import { RunnableConfig } from "@langchain/core/runnables";
import { StateGraph } from "@langchain/langgraph";

const myNode: GraphNode<typeof State> = async (state, config) => {
  const currentStep = config.metadata?.langgraph_step;
  console.log(`Currently on step: ${currentStep}`);
  return state;
}
设计带有显式终止条件的图,并捕获 GraphRecursionError 作为安全网:
import {
  StateGraph,
  StateSchema,
  ReducedValue,
  GraphNode,
  ConditionalEdgeRouter,
  END,
  GraphRecursionError
} from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});

// 构建带有显式终止逻辑的图
const graph = new StateGraph(State)
  .addNode("reasoning", async (state) => {
    // 正常处理 - 设计带有显式终止条件的图
    return {
      messages: ["thinking..."]
    };
  })
  .addConditionalEdges("reasoning", (state) => {
    // 在此处添加您的终止条件
    if (state.messages.length >= 5) {
      return END;
    }
    return "reasoning";
  });

const app = graph.compile();

// 捕获 GraphRecursionError 作为安全网
try {
  const result = await app.invoke(
    { messages: [] },
    { recursionLimit: 10 }
  );
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion limit reached, handling gracefully");
    // 处理错误 - 返回部分结果,通知用户等
  }
}

主动与被动方法

处理递归限制主要有两种方法:主动(图内监控)和被动(外部捕获错误)。
import {
  StateGraph,
  StateSchema,
  ReducedValue,
  GraphNode,
  ConditionalEdgeRouter,
  END,
  GraphRecursionError
} from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});


// 构建带有显式终止逻辑的图
const builder = new StateGraph(State)
  .addNode("agent", async (state) => {
    return {
      messages: ["Processing..."]
    };
  })
  .addConditionalEdges("agent", (state) => {
    // 在图中设计终止条件
    if (state.messages.length >= 5) {
      return END;
    }
    return "agent";
  });

const graph = builder.compile();

// 被动方法 - 捕获 GraphRecursionError 作为安全网
try {
  const result = await graph.invoke(
    { messages: [] },
    { recursionLimit: 10 }
  );
} catch (error) {
  if (error instanceof GraphRecursionError) {
    // 图执行失败后外部处理
    console.log("Recursion limit exceeded, handling gracefully");
  }
}
被动方法在超过限制后捕获 GraphRecursionError。设计带有显式终止条件的图,以避免首先达到限制。
方法检测处理控制流
被动(捕获 GraphRecursionError超过限制后在 try/catch 中的图外部图执行终止
被动优势:
  • 实现简单
  • 无需修改图逻辑
  • 集中式错误处理

其他可用元数据

除了 langgraph_step,以下元数据也可在 config.metadata 中使用:
const inspectMetadata: GraphNode<typeof State> = async (state, config) => {
  const metadata = config.metadata;

  console.log(`Step: ${metadata?.langgraph_step}`);
  console.log(`Node: ${metadata?.langgraph_node}`);
  console.log(`Triggers: ${metadata?.langgraph_triggers}`);
  console.log(`Path: ${metadata?.langgraph_path}`);
  console.log(`Checkpoint NS: ${metadata?.langgraph_checkpoint_ns}`);

  return state;
}

可视化

能够可视化图通常很棒,尤其是当它们变得越来越复杂时。LangGraph 带有几种内置的可视化图的方法。有关更多信息,请参阅 此操作指南

可观测性和跟踪

要跟踪、调试和评估您的代理,请使用 LangSmith

了解更多