Skip to main content
LangGraph 内置了持久化层,通过 checkpointers 实现。当你用 checkpointer 编译图时,checkpointer 会在每个 super-step 保存图状态的 checkpoint。这些检查点保存在一个 thread 下,该线程可以在图执行后被访问。由于 threads 允许在执行后访问图的状态,一些强大的功能(包括人工介入、记忆、时间旅行和容错)才成为可能。下面我们将更详细地讨论这些概念。 Checkpoints
Agent Server 会自动处理检查点 当使用 Agent Server 时,无需手动实现或配置 checkpointer。服务器在后台为你处理所有持久化基础设施。

Threads(线程)

线程是由 checkpointer 为每个保存的检查点分配的唯一标识符。它包含一系列 运行 的累计状态。当运行被执行时,助手底层图的 state 将被持久化到该线程。 在使用带有 checkpointer 的图时,必须在配置的 configurable 部分指定 thread_id
{
  configurable: {
    thread_id: "1";
  }
}
线程的当前和历史状态可以被检索。要持久化状态,线程必须在执行运行之前被创建。LangSmith API 提供了若干端点用于创建和管理线程及其状态。详情见 API 参考 checkpointer 使用 thread_id 作为存储和检索检查点的主键。没有它,checkpointer 无法保存状态或在 interrupt 后恢复执行。

检查点(Checkpoints)

线程在某一时刻的状态称为检查点。检查点是保存于每个 super-step 的图状态快照,由 StateSnapshot 对象表示,具有以下主要属性:
  • config: 与此检查点关联的配置。
  • metadata: 与此检查点关联的元数据。
  • values: 在该时间点的状态通道值。
  • next: 一个将执行的下一个节点名称的元组。
  • tasks: 一个包含下一步将执行任务信息的 PregelTask 对象元组。如果该步骤之前尝试过,它将包含错误信息;如果图在节点内部被动态中断,tasks 会包含与中断相关的附加数据。
检查点被持久化,可用于稍后恢复线程的状态。 下面演示当简单图被调用时会保存哪些检查点:
import { StateGraph, StateSchema, ReducedValue, START, END, MemorySaver } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  foo: z.string(),
  bar: new ReducedValue(
    z.array(z.string()).default(() => []),
    {
      inputSchema: z.array(z.string()),
      reducer: (x, y) => x.concat(y),
    }
  ),
});

const workflow = new StateGraph(State)
  .addNode("nodeA", (state) => {
    return { foo: "a", bar: ["a"] };
  })
  .addNode("nodeB", (state) => {
    return { foo: "b", bar: ["b"] };
  })
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "", bar: [] }, config);
执行后,我们期望看到恰好 4 个检查点:
  • 空检查点,START 为下一个要执行的节点。
  • 包含用户输入 {'foo': '', 'bar': []}nodeA 为下一个要执行节点的检查点。
  • 包含 nodeA 输出 {'foo': 'a', 'bar': ['a']}nodeB 为下一个要执行节点的检查点。
  • 包含 nodeB 输出 {'foo': 'b', 'bar': ['a', 'b']} 且没有下一个要执行的节点的检查点。
注意 bar 通道包含来自两个节点的输出,因为我们为 bar 通道定义了 reducer。

获取状态

与已保存图状态交互时,必须指定 线程标识符。可以通过调用 graph.getState(config) 查看图的 最新 状态,该调用将返回与所提供线程 ID 关联的最新检查点或指定 checkpoint_id 的检查点对应的 StateSnapshot 对象。
// 获取最新状态快照
const config = { configurable: { thread_id: "1" } };
await graph.getState(config);

// 获取特定 checkpoint_id 的状态快照
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "1ef663ba-28fe-6528-8002-5a559208592c",
  },
};
await graph.getState(config);
在我们的示例中,getState 的输出类似:
StateSnapshot {
  values: { foo: 'b', bar: ['a', 'b'] },
  next: [],
  config: { ... },
  metadata: { source: 'loop', writes: { nodeB: { foo: 'b', bar: ['b'] } }, step: 2 },
  createdAt: '2024-08-29T19:19:38.821749+00:00',
  parentConfig: { ... },
  tasks: []
}

获取状态历史

可以通过调用 graph.getStateHistory(config) 获取给定线程的完整执行历史。它将按时间顺序返回与线程 ID 关联的一系列 StateSnapshot,最新的检查点在列表首位。
const config = { configurable: { thread_id: "1" } };
for await (const state of graph.getStateHistory(config)) {
  console.log(state);
}

回放(Replay)

也可以回放先前的图执行。如果使用 thread_idcheckpoint_id 调用 invoke,那么会在对应 checkpoint_id 的检查点之前 重放 之前执行的步骤,并仅执行该检查点之后的步骤。
  • thread_id 是线程的 ID。
  • checkpoint_id 指向线程内特定检查点。
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "0c62ca34-ac19-445d-bbb0-5b4984975b2a",
  },
};
await graph.invoke(null, config);
重要的是,LangGraph 知道某一步骤是否已被先前执行。如果已执行,LangGraph 会对该步骤进行 重放(而不是重新执行),但仅对在提供的 checkpoint_id 之前的步骤;checkpoint_id 之后的步骤将被执行(即发生新的分叉),即使它们此前也已执行。详情请参见时间旅行的指南 use-time-travel

更新状态(Update state)

除了从特定 checkpoints 回放图外,还可以 编辑 图的状态,使用 graph.updateState() 方法。此方法接受三个参数:

config

config 应包含指定要更新的线程的 thread_id。如果只传递 thread_id,则更新(或分叉)当前状态。可选地,如果包含 checkpoint_id 字段,则对选定的检查点进行分叉。

values

这些是要用于更新状态的值。注意,此更新将像来自节点的任何更新一样被处理。这意味着这些值将传递给通道的 reducer 函数(如果为某些通道定义了 reducer)。也就是说,update_state(或 updateState)并不会自动覆盖每个通道的值,而只会覆盖没有 reducer 的通道。下面通过示例说明。 假设你用如下 schema 定义了图状态(见上方完整示例):
import { StateSchema, ReducedValue } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  foo: z.number(),
  bar: new ReducedValue(
    z.array(z.string()).default(() => []),
    {
      inputSchema: z.array(z.string()),
      reducer: (x, y) => x.concat(y),
    }
  ),
});
假设当前图状态为:
{ foo: 1, bar: ["a"] }
如果你如下更新状态:
await graph.updateState(config, { foo: 2, bar: ["b"] });
那么新的状态将为:
{ foo: 2, bar: ["a", "b"] }
foo 键被完全替换(因为该通道没有 reducer,updateState 会覆盖它)。而 bar 键有 reducer,因此它将 “b” 追加到 bar 的状态中。

as_node

调用 updateState 时,你可以可选地指定 asNode。如果提供,该更新将被视为来自节点 asNode。如果未提供,默认会设置为最后更新状态的节点(如果不歧义)。之所以重要,是因为下一个要执行的步骤取决于最后一个提供更新的节点,因此可以通过此参数控制下一步执行的节点。有关更多信息,请参见时间旅行指南 use-time-travel

存储(Memory store)

Model of shared state state schema 指定了一组在图执行过程中会被填充的键。如上所述,状态可以由 checkpointer 在每个图步骤写入到线程,从而实现状态持久化。 但如果我们希望保留跨线程的信息,例如在聊天机器人中,我们希望在与某用户的所有对话(线程)间保留关于该用户的特定信息(例如:喜欢的食物)? 仅靠 checkpointers 无法在线程间共享信息,这就引出了 Store 接口的需求。作为示例,我们可以定义一个 InMemoryStore 来跨线程存储关于用户的信息。我们照常用 checkpointer 编译图,并传入 store。
LangGraph API 会自动处理 stores 当使用 LangGraph API 时,通常无需手动实现或配置 stores,API 在后台管理这些存储基础设施。
InMemoryStore 适用于开发和测试;在生产中请使用持久化存储(如 PostgresStoreRedisStore)。所有实现都扩展自 BaseStore,在节点函数签名中使用该类型注释。

基本使用

import { MemoryStore } from "@langchain/langgraph";

const memoryStore = new MemoryStore();
记忆按 tuple 命名空间分组,例如 (<user_id>, "memories")。命名空间可以任意长度且不必仅限于用户相关。
const userId = "1";
const namespaceForMemory = [userId, "memories"];
使用 store.put 方法将记忆保存到命名空间:传入命名空间、唯一记忆标识 memory_id 以及记忆值(字典)。
import { v4 as uuidv4 } from "uuid";

const memoryId = uuidv4();
const memory = { food_preference: "I like pizza" };
await memoryStore.put(namespaceForMemory, memoryId, memory);
使用 store.search 方法可以按命名空间读取记忆,返回该用户的记忆列表,最近的记忆位于列表末尾。
const memories = await memoryStore.search(namespaceForMemory);
memories[memories.length - 1];
返回的对象属性包括:
  • value: 记忆的值
  • key: 该命名空间下记忆的唯一键
  • namespace: 字符串元组,表示记忆的命名空间
  • createdAt: 创建时间戳
  • updatedAt: 更新时间戳

语义搜索

存储还支持语义搜索,允许基于语义而非精确匹配来查找记忆。要启用此功能,请在 store 中配置嵌入模型:
import { OpenAIEmbeddings } from "@langchain/openai";

const store = new InMemoryStore({
  index: {
    embeddings: new OpenAIEmbeddings({ model: "text-embedding-3-small" }),
    dims: 1536,
    fields: ["food_preference", "$"],
  },
});
随后可用自然语言查询查找相关记忆:
const memories = await store.search(namespaceForMemory, {
  query: "What does the user like to eat?",
  limit: 3,
});
可以通过配置 fields 或在存储时指定 index 参数来控制哪些记忆字段应被嵌入。

在 LangGraph 中使用

将上述内容结合起来,在 LangGraph 中使用 memoryStorememoryStore 与 checkpointer 协同工作:checkpointer 将状态保存到线程,而 memoryStore 允许我们跨线程存取任意信息。编译图时同时传入 checkpointer 和 store:
import { MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

// ... 定义图 ...

const graph = workflow.compile({ checkpointer, store: memoryStore });
调用图时仍需传入 thread_id,并可传入 user_id 以在 store 中为特定用户命名空间化记忆:
const userId = "1";
const config = { configurable: { thread_id: "1" }, context: { userId } };

for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}
在任何节点中都可以通过 runtime 参数访问 store 和 userId,如下示例展示如何保存记忆:
import { StateSchema, MessagesValue, Runtime } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";

const MessagesState = new StateSchema({
  messages: MessagesValue,
});

const updateMemory: GraphNode<typeof MessagesState> = async (state, runtime) => {
  const userId = runtime.context?.user_id;
  if (!userId) throw new Error("User ID is required");

  const namespace = [userId, "memories"];
  const memory = "Some memory content";
  const memoryId = uuidv4();
  await runtime.store?.put(namespace, memoryId, { memory });
};
同样可以在节点中使用 store.search 获取记忆,并将其用作模型调用的上下文。

Checkpointer 库

检查点功能由符合 BaseCheckpointSaver 接口的 checkpointer 对象提供。LangGraph 提供了多个可安装的 checkpointer 实现:
  • @langchain/langgraph-checkpoint: 基础接口与内存实现 MemorySaver(用于实验)。此包已内置于 LangGraph。
  • @langchain/langgraph-checkpoint-sqlite: 使用 SQLite 的实现,适合本地实验,需要单独安装。
  • @langchain/langgraph-checkpoint-postgres: 使用 Postgres 的实现(LangSmith 使用),适合生产环境,需要单独安装。
  • @langchain/langgraph-checkpoint-mongodb: 使用 MongoDB 的实现,适合生产环境。
  • @langchain/langgraph-checkpoint-redis: 使用 Redis 的实现,适合生产环境。

Checkpointer 接口

每个 checkpointer 遵循 BaseCheckpointSaver 接口并实现以下方法:
  • .put - 存储带配置和元数据的检查点。
  • .putWrites - 存储与检查点关联的中间写入(即 pending writes)。
  • .getTuple - 根据配置(thread_idcheckpoint_id)获取检查点元组,用于 graph.getState()
  • .list - 列出匹配给定配置和过滤条件的检查点,用于 graph.getStateHistory()

能力

人工介入(Human-in-the-loop)

检查点使得 人工介入工作流 成为可能:人类可以检查、中断并批准图步骤。因为需要查看图的任意时刻状态并在人工更新后恢复执行,所以 checkpointer 是必需的。

记忆(Memory)

检查点允许在交互之间保留短期记忆。在重复的人机交互(如对话)场景中,可以向同一线程发送后续消息以保留先前的上下文。

时间旅行(Time travel)

检查点允许回放先前的图执行以审查或调试特定步骤,并可以在任意检查点处分叉状态以探索替代轨迹。

容错(Fault-tolerance)

检查点也提供容错和错误恢复:如果某个 superstep 的一个或多个节点失败,可以从最后成功的步骤重新启动。此外,当图节点在 superstep 中失败时,LangGraph 会保存其他已成功完成节点的挂起写入,这样在恢复执行时就不需要重新运行已成功的节点。