Skip to main content

LangGraph 的核心是将代理工作流建模为图。您使用三个关键组件定义代理的行为:
  1. State:表示应用程序当前快照的共享数据结构。它可以是任何数据类型,但通常使用共享状态模式定义。
  2. Nodes:编码代理逻辑的函数。它们接收当前状态作为输入,执行一些计算或副作用,并返回更新后的状态。
  3. Edges:根据当前状态确定下一个要执行的 Node 的函数。它们可以是条件分支或固定转换。
通过组合 NodesEdges,您可以创建复杂的、循环的工作流,使状态随时间演变。然而,真正的力量来自于 LangGraph 如何管理该状态。 需要强调的是:NodesEdges 不过是函数——它们可以包含 LLM,也可以只是普通的代码。 简而言之:节点执行工作,边决定下一步做什么 LangGraph 的底层图算法使用消息传递来定义通用程序。当一个节点完成其操作时,它会沿着一条或多条边向其他节点发送消息。这些接收节点然后执行它们的函数,将结果消息传递给下一组节点,该过程继续进行。受 Google 的 Pregel 系统启发,程序以离散的“超级步骤”进行。 一个超级步骤可以被视为对图节点的一次迭代。并行运行的节点属于同一个超级步骤,而顺序运行的节点属于不同的超级步骤。在图执行开始时,所有节点都处于 inactive 状态。当节点在其任何传入边(或“通道”)上接收到新消息(状态)时,它变为 active。活动节点然后运行其函数并响应更新。在每个超级步骤结束时,没有传入消息的节点通过将自己标记为 inactive 来投票 halt。当所有节点都 inactive 且没有消息在传输中时,图执行终止。

StateGraph

StateGraph 类是要使用的主要图类。它由用户定义的 State 对象参数化。

编译您的图

要构建您的图,您首先定义状态,然后添加节点,然后编译它。编译您的图到底是什么,为什么需要它? 编译是一个相当简单的步骤。它对图的结构提供一些基本检查(没有孤立节点等)。它也是您可以指定运行时参数(如检查点器和断点)的地方。您只需调用 .compile 方法来编译您的图:
const graph = new StateGraph(StateAnnotation)
  .addNode("nodeA", nodeA)
  .addEdge(START, "nodeA")
  .addEdge("nodeA", END)
  .compile();
必须在使用图之前编译它。

状态

定义图时,首先要定义图的 StateState 包括图的模式以及指定如何将更新应用于状态的 reducer 函数State 的模式将是图中所有 NodesEdges 的输入模式。您使用 StateSchema 类定义状态,该类接受任何标准模式(如 Zod)用于单个字段,以及特殊值类型,如 ReducedValueMessagesValue。所有 Nodes 都会将更新发射到 State,然后使用指定的 reducer 函数应用这些更新。

模式

指定图模式的主要方法是使用 StateSchema 类。模式中的每个字段可以是:
  • 用于简单字段的标准模式(成为“最后值”通道,在更新时覆盖)
  • 用于需要自定义 reducer 函数的字段的 ReducedValue(当节点并行运行时)
  • 用于聊天消息列表的 MessagesValue(预构建了消息感知的 reducer)
  • 用于不应被检查点的瞬态状态的 UntrackedValue
import {
  StateSchema,
  ReducedValue,
  MessagesValue,
  UntrackedValue
} from "@langchain/langgraph";
import { z } from "zod/v4";

const AgentState = new StateSchema({
  // 预构建的消息值,带有内置 reducer
  messages: MessagesValue,

  // 简单字段直接使用 Zod 模式
  currentStep: z.string(),

  // 带有默认值的字段
  retryCount: z.number().default(0),

  // 用于累积值的自定义 reducer
  allSteps: new ReducedValue(
    z.array(z.string()).default(() => []),
    {
      inputSchema: z.string(),
      reducer: (current, newStep) => [...current, newStep],
    }
  ),

  // 不保存到检查点的瞬态状态
  tempCache: new UntrackedValue(z.record(z.string(), z.unknown())),
});

// 类型提取
type State = typeof AgentState.State;   // 完整状态类型
type Update = typeof AgentState.Update; // 部分更新类型

// 在图中使用
const graph = new StateGraph(AgentState)
  .addNode("myNode", ...)
  .compile();
默认情况下,图将具有相同的输入和输出模式。如果您想更改此设置,也可以直接指定显式的输入和输出模式。当您有很多键,并且一些键明确用于输入而另一些用于输出时,这很有用。

多个模式

通常,所有图节点都使用单一模式进行通信。这意味着它们将读取和写入相同的状态通道。但是,在某些情况下,我们希望对此有更多控制:
  • 内部节点可以传递图输入/输出不需要的信息。
  • 我们可能还想为图使用不同的输入/输出模式。例如,输出可能只包含一个相关的输出键。
可以让节点在图内写入私有状态通道以进行内部节点通信。我们可以简单地定义一个私有模式 PrivateState 也可以为图定义显式的输入和输出模式。在这些情况下,我们定义一个包含图操作所有相关键的“内部”模式。但是,我们还定义 inputoutput 模式,它们是“内部”模式的子集,以约束图的输入和输出。有关更多详细信息,请参阅定义输入和输出模式 让我们看一个例子:
import { StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const InputState = new StateSchema({
  userInput: z.string(),
});

const OutputState = new StateSchema({
  graphOutput: z.string(),
});

const OverallState = new StateSchema({
  foo: z.string(),
  userInput: z.string(),
  graphOutput: z.string(),
});

const PrivateState = new StateSchema({
  bar: z.string(),
});

const graph = new StateGraph({
  state: OverallState,
  input: InputState,
  output: OutputState,
})
  .addNode("node1", (state) => {
    // 写入 OverallState
    return { foo: state.userInput + " name" };
  })
  .addNode("node2", (state) => {
    // 从 OverallState 读取,写入 PrivateState
    return { bar: state.foo + " is" };
  })
  .addNode(
    "node3",
    (state) => {
      // 从 PrivateState 读取,写入 OutputState
      return { graphOutput: state.bar + " Lance" };
    },
    { input: PrivateState }
  )
  .addEdge(START, "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .addEdge("node3", END)
  .compile();

await graph.invoke({ userInput: "My" });
// { graphOutput: 'My name is Lance' }
这里有两个微妙而重要的点需要注意:
  1. 我们将 state 作为输入模式传递给 node1。但是,我们写入 foo,这是 OverallState 中的一个通道。我们如何写入未包含在输入模式中的状态通道?这是因为节点_可以写入图状态中的任何状态通道_。图状态是初始化时定义的状态通道的并集,其中包括 OverallState 以及过滤器 InputStateOutputState
  2. 我们使用 StateGraph({ state: OverallState, input: InputState, output: OutputState }) 初始化图。我们如何在 node2 中写入 PrivateState?如果未在 StateGraph 初始化中传递,图如何获得对此模式的访问权限?我们可以这样做,因为_节点也可以声明额外的状态通道_,只要状态模式定义存在。在这种情况下,PrivateState 模式已定义,因此我们可以将 bar 添加为图中的新状态通道并写入它。

Reducer

Reducer 是理解节点更新如何应用于 State 的关键。State 中的每个键都有其独立的 reducer 函数。如果没有显式指定 reducer 函数,则假定对该键的所有更新都应覆盖它。有几种不同类型的 reducer,从默认类型的 reducer 开始:

默认 reducer

这两个示例展示了如何使用默认 reducer:
示例 A
import { StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  foo: z.number(),
  bar: z.array(z.string()),
});
在此示例中,没有为任何键指定 reducer 函数。假设图的输入是:{ foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。请注意,Node 不需要返回整个 State 模式——只需一个更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["bye"] }
示例 B
import { StateSchema, ReducedValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  foo: z.number(),
  bar: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});
在此示例中,我们使用 ReducedValue 为第二个键 (bar) 指定 reducer 函数。请注意,第一个键保持不变。假设图的输入是 { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。请注意,Node 不需要返回整个 State 模式——只需一个更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["hi", "bye"] }。请注意,这里 bar 键通过连接两个数组进行更新。

未跟踪值

UntrackedValue 用于在图执行期间存在但永不被检查点的状态字段。当图从检查点恢复时,未跟踪值将重置为其初始状态(或不可用)。 这对于以下情况很有用:
  • 无法序列化的数据库连接
  • 应在恢复时重建的临时缓存
  • 您不想持久化的大型对象
  • 每次都应传递新鲜的仅运行时配置
import { StateSchema, UntrackedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: MessagesValue,

  // 未跟踪:如果多个节点在同一步骤写入则抛出错误(默认 guard: true)
  dbConnection: new UntrackedValue<DatabaseConnection>(),

  // 未跟踪且 guard: false 允许多次写入,保留最后一个值
  tempCache: new UntrackedValue(
    z.record(z.string(), z.unknown()),
    { guard: false }
  ),

  // 未跟踪且无模式(用于最大灵活性)
  runtimeConfig: new UntrackedValue(),
});
行为:
  • 执行期间:值像正常状态一样存储和可访问
  • 检查点时:未跟踪值不包括在检查点数据中
  • 恢复时:未跟踪值从头开始(空或具有其默认值)
  • 使用 guard: true(默认):如果多个节点在同一步骤写入则抛出错误
  • 使用 guard: false:允许多次写入,最后一个值获胜
不要将 UntrackedValue 用于需要跨中断或时间旅行持久化的数据。对于持久数据,请使用常规状态字段或 ReducedValue

类型实用程序

LangGraph 提供了几种类型实用程序,用于在定义节点和条件边时提供更好的 TypeScript 类型安全性。

GraphNode

使用 GraphNode 为在图构建器外部定义的节点函数添加类型:
import { GraphNode, StateSchema, Command } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  count: z.number().default(0),
  result: z.string(),
});

// 基本节点 - 接收状态,返回部分更新
const incrementNode: GraphNode<typeof State> = (state) => {
  return { count: state.count + 1 };
};

// 异步节点
const fetchNode: GraphNode<typeof State> = async (state, config) => {
  const response = await fetch(`/api/data/${state.count}`);
  return { result: await response.text() };
};

// 带有 Command 路由的节点 - 指定有效目标
const routerNode: GraphNode<typeof State, "process" | "done"> = (state) => {
  if (state.count >= 10) {
    return new Command({ goto: "done" });
  }
  return new Command({
    update: { count: state.count + 1 },
    goto: "process"
  });
};

State.Node 简写

每个 StateSchema 实例都有一个 Node 属性,为节点提供简写类型:
const State = new StateSchema({
  messages: MessagesValue,
  step: z.string(),
});

// 这些是等效的:
const myNode1: GraphNode<typeof State> = (state) => ({ step: "done" });
const myNode2: typeof State.Node = (state) => ({ step: "done" });

ConditionalEdgeRouter

使用 ConditionalEdgeRouter 用于条件边中的路由函数(无状态更新,仅路由):
import { ConditionalEdgeRouter, END } from "@langchain/langgraph";

const State = new StateSchema({
  shouldContinue: z.boolean(),
  step: z.string(),
});

// 路由器返回节点名称或 END
const router: ConditionalEdgeRouter<typeof State, "process" | "summarize"> = (state) => {
  if (!state.shouldContinue) {
    return END;
  }
  return state.step === "initial" ? "process" : "summarize";
};

// 在图中使用
graph.addConditionalEdges("check", router);

StateSchema.StateStateSchema.Update

从模式中提取状态和更新类型,用于自定义类型定义:
import { StateSchema } from "@langchain/langgraph";

const MyStateSchema = new StateSchema({
  messages: MessagesValue,
  count: z.number().default(0),
});

// 提取完整状态类型
type MyState = typeof MyStateSchema.State;
// { messages: BaseMessage[], count: number }

// 提取更新类型(部分,带有 reducer 输入类型)
type MyUpdate = typeof MyStateSchema.Update;
// { messages?: Messages, count?: number }

在图状态中处理消息

为什么使用消息?

大多数现代 LLM 提供商都有一个聊天模型接口,该接口接受消息列表作为输入。LangChain 的聊天模型接口特别接受消息对象列表作为输入。这些消息有多种形式,例如 HumanMessage(用户输入)或 AIMessage(LLM 响应)。 要了解更多关于消息对象的信息,请参阅消息概念指南

在图中使用消息

在许多情况下,将先前的对话历史存储为图状态中的消息列表很有帮助。为此,您可以使用预构建的 MessagesValue,它提供了一个消息感知的 reducer,可以自动处理消息 ID、更新和删除。 MessagesValue reducer 对于告诉图如何使用每个状态更新来更新状态中的 Message 对象列表至关重要。如果您不指定 reducer,每个状态更新都会用最近提供的值覆盖消息列表。MessagesValue 正确处理此问题:对于全新消息,它会追加到现有列表;对于现有消息(通过 ID 匹配),它会就地更新它们。
MessagesValue 实际上是 ReducedValue 的一个特例,预配置了内部的 messagesStateReducer,用于处理消息列表和更新。这为 LangGraph 图中的聊天消息历史提供了方便、消息感知的状态管理。

序列化

除了跟踪消息 ID 之外,MessagesValue 还会在收到 messages 通道上的状态更新时尝试将消息反序列化为 LangChain Message 对象。这允许以以下格式发送图输入/状态更新:
// 这是受支持的
{
  messages: [new HumanMessage("message")];
}

// 这也是受支持的
{
  messages: [{ role: "human", content: "message" }];
}
由于在使用 MessagesValue 时,状态更新总是被反序列化为 LangChain Messages,因此您应该使用点表示法访问消息属性,如 state.messages.at(-1).content。以下是使用 MessagesValue 的图的示例:
import { StateGraph, StateSchema, MessagesValue } from "@langchain/langgraph";

const State = new StateSchema({
  messages: MessagesValue,
});

const graph = new StateGraph(State)
  ...
messages 字段定义为 MessagesValue,这是一个带有内置 reducer 的 BaseMessage 对象列表。通常,有比消息更多的状态需要跟踪,因此人们会扩展此状态并添加更多字段,例如:
import { StateSchema, MessagesValue } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  documents: z.array(z.string()),
});

节点

在 LangGraph 中,节点通常是函数(同步或异步),接受以下参数:
  1. state—图的状态
  2. config—包含配置信息(如 thread_id)和跟踪信息(如 tags)的 RunnableConfig 对象
您可以使用 addNode 方法将节点添加到图中。为了更好的类型安全性,请使用 GraphNode 类型实用程序或 State.Node 为节点函数添加类型:
import { StateGraph, StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  input: z.string(),
  results: z.string(),
});

// 选项 1:使用 GraphNode 类型实用程序
const myNode: GraphNode<typeof State> = (state, config) => {
  console.log("In node: ", config?.configurable?.user_id);
  return { results: `Hello, ${state.input}!` };
};

// 选项 2:使用 State.Node 简写
const otherNode: typeof State.Node = (state) => {
  return state;
};

const builder = new StateGraph(State)
  .addNode("myNode", myNode)
  .addNode("otherNode", otherNode)
  ...
在幕后,函数被转换为 RunnableLambda,它为您的函数添加了批处理和异步支持,以及原生跟踪和调试 如果您在添加节点时未指定名称,它将被赋予一个默认名称,该名称等同于函数名称。
builder.addNode(myNode);
// 然后您可以通过引用 `"myNode"` 来创建到此节点和从此节点的边

START 节点

START 节点是一个特殊节点,表示将用户输入发送到图的节点。引用此节点的主要目的是确定应首先调用哪些节点。
import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

END 节点

END 节点是一个特殊节点,表示终端节点。当您想表示哪些边在完成后没有操作时,会引用此节点。
import { END } from "@langchain/langgraph";

graph.addEdge("nodeA", END);

节点缓存

LangGraph 支持基于节点输入缓存任务/节点。要使用缓存:
  • 在编译图时指定缓存(或指定入口点)
  • 为节点指定缓存策略。每个缓存策略支持:
    • keyFunc,用于根据节点输入生成缓存键。
    • ttl,缓存的生存时间(以秒为单位)。如果未指定,缓存将永不过期。
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import { InMemoryCache } from "@langchain/langgraph-checkpoint";
import { z } from "zod/v4";

const State = new StateSchema({
  x: z.number(),
  result: z.number(),
});

const expensiveNode: GraphNode<typeof State> = async (state) => {
  // 模拟昂贵的操作
  await new Promise((resolve) => setTimeout(resolve, 3000));
  return { result: state.x * 2 };
};

const graph = new StateGraph(State)
  .addNode("expensive_node", expensiveNode, { cachePolicy: { ttl: 3 } })
  .addEdge(START, "expensive_node")
  .compile({ cache: new InMemoryCache() });

await graph.invoke({ x: 5 }, { streamMode: "updates" });
// [{"expensive_node": {"result": 10}}]
await graph.invoke({ x: 5 }, { streamMode: "updates" });
// [{"expensive_node": {"result": 10}, "__metadata__": {"cached": true}}]

边定义了逻辑如何路由以及图如何决定停止。这是代理工作方式以及不同节点如何相互通信的重要组成部分。边有几种关键类型:
  • 普通边:直接从一个节点到下一个节点。
  • 条件边:调用函数以确定下一个要转到的节点。
  • 入口点:用户输入到达时首先调用的节点。
  • 条件入口点:调用函数以确定用户输入到达时首先调用的节点。
一个节点可以有多个传出边。如果一个节点有多个传出边,所有这些目标节点将作为下一个超级步骤的一部分并行执行。

普通边

如果您总是想从节点 A 转到节点 B,可以直接使用 addEdge 方法。
graph.addEdge("nodeA", "nodeB");

条件边

如果您想可选地路由到一个或多个边(或可选地终止),可以使用 addConditionalEdges 方法。此方法接受节点名称和在该节点执行后调用的“路由函数”:
graph.addConditionalEdges("nodeA", routingFunction);
与节点类似,routingFunction 接受图的当前 state 并返回一个值。 默认情况下,routingFunction 的返回值用作下一个要发送状态的节点(或节点列表)的名称。所有这些节点将作为下一个超级步骤的一部分并行运行。 您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。
graph.addConditionalEdges("nodeA", routingFunction, {
  true: "nodeB",
  false: "nodeC",
});
如果您想在一个函数中组合状态更新和路由,请使用 Command 代替条件边。

入口点

入口点是图启动时运行的第一个节点。您可以使用虚拟 START 节点的 addEdge 方法来指定进入图的位置。
import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

条件入口点

条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点的 addConditionalEdges 来实现这一点。
import { START } from "@langchain/langgraph";

graph.addConditionalEdges(START, routingFunction);
您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。
graph.addConditionalEdges(START, routingFunction, {
  true: "nodeB",
  false: "nodeC",
});

Send

默认情况下,NodesEdges 是预先定义的,并在相同的共享状态上操作。但是,在某些情况下,确切的边可能不是预先知道的,和/或您可能希望同时存在不同版本的 State。一个常见的例子是 map-reduce 设计模式。在此设计模式中,第一个节点可能生成一个对象列表,并且您可能希望将另一个节点应用于所有这些对象。对象的数量可能预先未知(意味着边的数量可能未知),并且下游 Node 的输入 State 应该不同(每个生成的对象一个)。 为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:第一个是节点的名称,第二个是要传递给该节点的状态。
import { Send } from "@langchain/langgraph";

graph.addConditionalEdges("nodeA", (state) => {
  return state.subjects.map(
    (subject) => new Send("generateJoke", { subject })
  );
});

Command

Command 是一个多功能的原语,用于控制图执行。它接受四个参数:
  • update:应用状态更新(类似于从节点返回更新)。
  • goto:导航到特定节点(类似于条件边)。
  • graph:在从子图导航时定位父图。
  • resume:在中断后提供值以恢复执行。
Command 在三种上下文中使用:

从节点返回

updategoto

从节点函数返回 Command 以更新状态并路由到下一个节点:
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  return new Command({
    update: { foo: "bar" },
    goto: "myOtherNode",
  });
});
使用 Command 您也可以实现动态控制流行为(与条件边相同):
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  if (state.foo === "bar") {
    return new Command({
      update: { foo: "baz" },
      goto: "myOtherNode",
    });
  }
});
当您需要同时更新状态路由到不同的节点时,请使用 Command。如果您只需要路由而不更新状态,请改用条件边 在节点函数中使用 Command 时,必须在添加节点时添加 ends 参数以指定它可以路由到的节点:
builder.addNode("myNode", myNode, {
  ends: ["myOtherNode", END],
});
Command 仅添加动态边——使用 add_edge / addEdge 定义的静态边仍会执行。例如,如果 node_a 返回 Command(goto="my_other_node") 并且您还有 graph.add_edge("node_a", "node_b"),则 node_bmy_other_node 都将运行。
查看此操作指南 以获取如何使用 Command 的端到端示例。

graph

如果您使用子图,可以通过在 Command 中指定 graph: Command.PARENT 来从子图内的节点导航到父图中的不同节点:
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  return new Command({
    update: { foo: "bar" },
    goto: "otherSubgraph", // 其中 `otherSubgraph` 是父图中的一个节点
    graph: Command.PARENT,
  });
});
graph 设置为 Command.PARENT 将导航到最近的父图。当您从子图节点向父图节点发送更新,且该键由父图和子图状态模式共享时,您必须为父图状态中要更新的键定义一个reducer
这对于实现多代理交接特别有用。有关详细信息,请查看导航到父图中的节点

输入到 invokestream

new Command({ resume: ... })唯一旨在作为 invoke()/stream() 输入的 Command 模式。不要使用 new Command({ update: ... }) 作为输入来继续多轮对话——因为传递任何 Command 作为输入都会从最新检查点恢复(即最后运行的步骤,而不是 __start__),如果图已经完成,它将看起来卡住。要在现有线程上继续对话,请传递一个普通输入对象:
// 错误 - 图从最新检查点恢复
// (最后运行的步骤),看起来卡住
await graph.invoke(new Command({ update: { messages: [{ role: "user", content: "follow up" }] } }), config);

// 正确 - 普通对象从 __start__ 重新开始
await graph.invoke({ messages: [{ role: "user", content: "follow up" }] }, config);

resume

使用 new Command({ resume: ... }) 提供一个值并在中断后恢复图执行。传递给 resume 的值将成为暂停节点内 interrupt() 调用的返回值:
import { Command, interrupt } from "@langchain/langgraph";

const humanReview = async (state: typeof StateAnnotation.State) => {
  // 暂停图并等待一个值
  const answer = interrupt("Do you approve?");
  return { messages: [{ role: "user", content: answer }] };
};

// 第一次调用 - 触及中断并暂停
const result = await graph.invoke({ messages: [...] }, config);

// 使用值恢复 - interrupt() 调用返回 "yes"
const resumed = await graph.invoke(new Command({ resume: "yes" }), config);
查看中断概念指南 以获取有关中断模式的完整详细信息,包括多个中断和验证循环。

从工具返回

您可以从工具返回 Command 以更新图状态和控制流。使用 update 修改状态(例如,保存在对话期间查找的客户信息),并在工具完成后使用 goto 路由到特定节点。
在工具内部使用时,goto 会添加动态边——在调用工具的节点上定义的任何静态边仍将执行。
有关详细信息,请参阅在工具内部使用

图迁移

LangGraph 可以轻松处理图定义(节点、边和状态)的迁移,即使使用检查点器跟踪状态也是如此。
  • 对于图末尾的线程(即未中断),您可以更改图的整个拓扑(即所有节点和边、删除、添加、重命名等)
  • 对于当前中断的线程,我们支持除重命名/删除节点之外的所有拓扑更改(因为该线程可能即将进入一个不再存在的节点)——如果这是一个障碍,请联系我们,我们可以优先考虑解决方案。
  • 对于修改状态,我们对添加和删除键具有完全的向后和向前兼容性
  • 重命名的状态键将在现有线程中丢失其保存的状态
  • 以不兼容方式更改类型的状态键可能会导致在更改前具有状态的线程中出现问题——如果这是一个障碍,请联系我们,我们可以优先考虑解决方案。

运行时上下文

创建图时,您可以为传递给节点的运行时上下文指定 contextSchema。这对于向节点传递不属于图状态的信息很有用。例如,您可能想传递依赖项,如模型名称或数据库连接。
import { StateGraph, StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  input: z.string(),
  output: z.string(),
});

const ContextSchema = z.object({
  llm: z.union([z.literal("openai"), z.literal("anthropic")]),
});

const graph = new StateGraph(State, ContextSchema);
然后,您可以使用 context 属性将此配置传递到图中。
const config = { context: { llm: "anthropic" } };

await graph.invoke(inputs, config);
然后,您可以在节点或条件边中访问和使用此上下文:
import { Runtime, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const nodeA: GraphNode<typeof State> = (state, config) => {
  const llm = getLLM(runtime.context?.llm);
  // ...
  return {};
};
有关配置的完整细分,请参阅添加运行时配置
graph.addNode("myNode", (state, config) => {
  const llmType = config.context?.llm || "openai";
  const llm = getLLM(llmType);
  return { results: `Hello, ${state.input}!` };
});

递归限制

递归限制设置图在单次执行期间可以执行的超级步骤的最大数量。达到限制后,LangGraph 将引发 GraphRecursionError。默认情况下,此值设置为 25 步。递归限制可以在任何图上运行时设置,并通过配置对象传递给 invoke/stream。重要的是,recursionLimit 是一个独立的 config 键,不应像所有其他用户定义的配置一样传递在 configurable 键内。请参见下面的示例:
await graph.invoke(inputs, {
  recursionLimit: 5,
  context: { llm: "anthropic" },
});

访问和处理递归计数器

当前步骤计数器可在任何节点内的 config.metadata.langgraph_step 中访问,从而允许在达到递归限制之前主动进行递归处理。这使您能够在图逻辑中实现优雅的降级策略。

工作原理

步骤计数器存储在 config.metadata.langgraph_step 中。递归限制检查遵循逻辑:step > stop,其中 stop = step + recursionLimit + 1。当超过限制时,LangGraph 会引发 GraphRecursionError

访问当前步骤计数器

您可以在任何节点内访问当前步骤计数器以监视执行进度。
import { RunnableConfig } from "@langchain/core/runnables";
import { StateGraph } from "@langchain/langgraph";

const myNode: GraphNode<typeof State> = async (state, config) => {
  const currentStep = config.metadata?.langgraph_step;
  console.log(`Currently on step: ${currentStep}`);
  return state;
}
设计具有显式终止条件的图,并捕获 GraphRecursionError 作为安全网:
import {
  StateGraph,
  StateSchema,
  ReducedValue,
  GraphNode,
  ConditionalEdgeRouter,
  END,
  GraphRecursionError
} from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});

// 构建具有显式终止逻辑的图
const graph = new StateGraph(State)
  .addNode("reasoning", async (state) => {
    // 正常处理 - 设计具有显式终止条件的图
    return {
      messages: ["thinking..."]
    };
  })
  .addConditionalEdges("reasoning", (state) => {
    // 在此处添加终止条件
    if (state.messages.length >= 5) {
      return END;
    }
    return "reasoning";
  });

const app = graph.compile();

// 捕获 GraphRecursionError 作为安全网
try {
  const result = await app.invoke(
    { messages: [] },
    { recursionLimit: 10 }
  );
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion limit reached, handling gracefully");
    // 处理错误 - 返回部分结果、通知用户等。
  }
}

主动与被动方法

处理递归限制有两种主要方法:主动(在图内监控)和被动(在外部捕获错误)。
import {
  StateGraph,
  StateSchema,
  ReducedValue,
  GraphNode,
  ConditionalEdgeRouter,
  END,
  GraphRecursionError
} from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});


// 构建具有显式终止逻辑的图
const builder = new StateGraph(State)
  .addNode("agent", async (state) => {
    return {
      messages: ["Processing..."]
    };
  })
  .addConditionalEdges("agent", (state) => {
    // 将终止条件设计到图中
    if (state.messages.length >= 5) {
      return END;
    }
    return "agent";
  });

const graph = builder.compile();

// 被动方法 - 捕获 GraphRecursionError 作为安全网
try {
  const result = await graph.invoke(
    { messages: [] },
    { recursionLimit: 10 }
  );
} catch (error) {
  if (error instanceof GraphRecursionError) {
    // 在图执行失败后在外部处理
    console.log("Recursion limit exceeded, handling gracefully");
  }
}
被动方法在超过限制后捕获 GraphRecursionError。设计具有显式终止条件的图以避免首先达到限制。
方法检测处理控制流
被动(捕获 GraphRecursionError超过限制后在图外部使用 try/catch图执行终止
被动方法的优点:
  • 实现简单
  • 无需修改图逻辑
  • 集中式错误处理

其他可用的元数据

除了 langgraph_step,以下元数据也可在 config.metadata 中使用:
const inspectMetadata: GraphNode<typeof State> = async (state, config) => {
  const metadata = config.metadata;

  console.log(`Step: ${metadata?.langgraph_step}`);
  console.log(`Node: ${metadata?.langgraph_node}`);
  console.log(`Triggers: ${metadata?.langgraph_triggers}`);
  console.log(`Path: ${metadata?.langgraph_path}`);
  console.log(`Checkpoint NS: ${metadata?.langgraph_checkpoint_ns}`);

  return state;
}

可视化

能够可视化图通常很有用,尤其是在图变得更加复杂时。LangGraph 附带了几种内置的可视化图的方法。有关更多信息,请参阅可视化您的图

可观察性和跟踪

要跟踪、调试和评估您的代理,请使用 LangSmith

了解更多