Skip to main content
LangGraph 内置了一个持久化层,通过检查点(checkpointer)实现。当你使用检查点编译图时,检查点会在每个超步(super-step)保存图状态的 checkpoint。这些检查点被保存到 thread(线程)中,可以在图执行完成后进行访问。由于 threads 允许在执行后访问图的状态,因此可以实现多种强大的功能,包括人机协作(human-in-the-loop)、记忆(memory)、时间旅行(time travel)和容错(fault-tolerance)。下面,我们将更详细地讨论这些概念。 检查点
Agent Server 自动处理检查点 当使用 Agent Server 时,你无需手动实现或配置检查点。服务器会在后台为你处理所有持久化基础设施。

线程

线程是分配给检查点保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当一个运行被执行时,助手底层图的状态将被持久化到线程中。 当使用检查点调用图时,你必须在配置的 configurable 部分指定 thread_id
{"configurable": {"thread_id": "1"}}
线程的当前和历史状态可以被检索。要持久化状态,必须在执行运行之前创建线程。LangSmith API 提供了多个用于创建和管理线程及线程状态的端点。有关详细信息,请参见 API 参考 检查点使用 thread_id 作为存储和检索检查点的主键。没有它,检查点就无法保存状态或在中断后恢复执行,因为检查点使用 thread_id 来加载已保存的状态。

检查点

线程在特定时间点的状态称为检查点。检查点是在每个超步保存的图状态快照,由 StateSnapshot 对象表示,具有以下关键属性:
  • config:与此检查点关联的配置。
  • metadata:与此检查点关联的元数据。
  • values:此时间点的状态通道值。
  • next:图中下一个要执行的节点名称的元组。
  • tasks:包含下一个要执行任务信息的 PregelTask 对象元组。如果该步骤之前已尝试过,它将包含错误信息。如果图在节点内动态中断,tasks 将包含与中断相关的额外数据。
检查点会被持久化,并可用于在稍后恢复线程的状态。 让我们看看当简单图如下调用时保存了哪些检查点:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": "", "bar":[]}, config)
运行图后,我们预期会看到恰好 4 个检查点:
  • START 为下一个要执行节点的空检查点
  • 包含用户输入 {'foo': '', 'bar': []}node_a 为下一个要执行节点的检查点
  • 包含 node_a 输出 {'foo': 'a', 'bar': ['a']}node_b 为下一个要执行节点的检查点
  • 包含 node_b 输出 {'foo': 'b', 'bar': ['a', 'b']} 且没有下一个要执行节点的检查点
注意,bar 通道值包含两个节点的输出,因为我们为 bar 通道定义了 reducer。

获取状态

在与保存的图状态交互时,你必须指定线程标识符。你可以通过调用 graph.get_state(config) 查看图的_最新_状态。这将返回一个 StateSnapshot 对象,对应于配置中提供的线程 ID 的最新检查点,或者如果提供了检查点 ID,则对应于该线程的特定检查点。
# 获取最新状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# 获取特定 checkpoint_id 的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state 的输出将如下所示:
StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

获取状态历史

你可以通过调用 graph.get_state_history(config) 获取给定线程的完整图执行历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最新的检查点/StateSnapshot 排在列表的第一位。
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
在我们的示例中,get_state_history 的输出将如下所示:
[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]
状态

重放

也可以回放之前的图执行。如果我们使用 thread_idcheckpoint_id 调用图,那么我们将_重放_对应 checkpoint_id 检查点_之前_已执行的步骤,并且只执行检查点_之后_的步骤。
  • thread_id 是线程的 ID。
  • checkpoint_id 是引用线程中特定检查点的标识符。
在调用图时,你必须将这些作为配置的 configurable 部分传入:
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
重要的是,LangGraph 知道某个特定步骤之前是否已经执行过。如果已经执行过,LangGraph 只是_重放_图中的那个特定步骤,不会重新执行该步骤,但只对提供的 checkpoint_id _之前_的步骤执行此操作。checkpoint_id _之后_的所有步骤都将被执行(即创建新的分支),即使它们之前已经执行过。请参见时间旅行使用指南以了解更多关于重放的信息 重放

更新状态

除了从特定 checkpoints 重放图之外,我们还可以_编辑_图状态。我们使用 update_state 来实现这一点。此方法接受三个不同的参数:

config

config 应包含指定要更新哪个线程的 thread_id。当仅传入 thread_id 时,我们更新(或分叉)当前状态。可选地,如果我们包含 checkpoint_id 字段,则我们分叉所选的检查点。

values

这些是将用于更新状态的值。请注意,此更新与来自节点的任何更新完全相同。这意味着这些值将传递给reducer 函数(如果为图状态中的某些通道定义了 reducer)。这意味着 update_state 不会自动覆盖每个通道的值,而只覆盖没有 reducer 的通道。让我们来看一个例子。 假设你已用以下 schema 定义了图的状态(见上面的完整示例):
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]
现在假设图的当前状态为
{"foo": 1, "bar": ["a"]}
如果你如下更新状态:
graph.update_state(config, {"foo": 2, "bar": ["b"]})
那么图的新状态将是:
{"foo": 2, "bar": ["a", "b"]}
foo 键(通道)被完全替换(因为该通道没有指定 reducer,所以 update_state 会覆盖它)。但是,bar 键指定了一个 reducer,因此它会将 "b" 追加到 bar 的状态中。

as_node

在调用 update_state 时,你可以选择指定的最后一个参数是 as_node。如果你提供了它,更新将被应用,就像它来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不存在歧义)。这很重要,因为下一个要执行的步骤取决于最后一个给出更新的节点,所以这可以用来控制接下来执行哪个节点。请参见时间旅行使用指南以了解更多关于分叉状态的信息 更新

内存存储

共享状态模型 状态 schema 指定了一组在图执行时填充的键。如上所述,状态可以由检查点在每个图步骤写入线程,从而实现状态持久化。 但是,如果我们想在_跨线程_保留某些信息怎么办?考虑这样一个聊天机器人的案例,我们希望在与该用户的_所有_聊天对话(例如线程)中保留有关用户的特定信息! 仅使用检查点,我们无法跨线程共享信息。这就引出了对 Store 接口的需求。作为说明,我们可以定义一个 InMemoryStore 来跨线程存储用户信息。我们只需像以前一样使用检查点编译图,并传入 store。
LangGraph API 自动处理存储 当使用 LangGraph API 时,你无需手动实现或配置 store。API 会在后台为你处理所有存储基础设施。
InMemoryStore 适用于开发和测试。对于生产环境,请使用持久化存储,如 PostgresStoreRedisStore。所有实现都扩展了 BaseStore,这是在节点函数签名中使用的类型注解。

基本用法

首先,让我们在不使用 LangGraph 的情况下单独演示这一点。
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
记忆(memories)通过 tuple 命名空间化,在此特定示例中将是 (<user_id>, "memories")。命名空间可以是任意长度,表示任何内容,不必是特定于用户的。
user_id = "1"
namespace_for_memory = (user_id, "memories")
我们使用 store.put 方法将记忆保存到存储中的命名空间。执行此操作时,我们指定如上定义的命名空间,以及记忆的键值对:键只是记忆的唯一标识符(memory_id),值(字典)是记忆本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search 方法读取命名空间中的记忆,该方法将以列表形式返回给定用户的所有记忆。最新的记忆是列表中的最后一个。
memories = store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种记忆类型都是一个 Python 类(Item),具有特定属性。我们可以通过如上所示的 .dict 转换将其作为字典访问。 它具有的属性有:
  • value:此记忆的值(本身是一个字典)
  • key:此命名空间中此记忆的唯一键
  • namespace:字符串元组,此记忆类型的命名空间
    虽然类型是 tuple[str, ...],但在转换为 JSON 时可能会被序列化为列表(例如,['1', 'memories'])。
  • created_at:此记忆创建时的时间戳
  • updated_at:此记忆更新时的时间戳

语义搜索

除了简单检索之外,store 还支持语义搜索,允许你根据含义而不是精确匹配来查找记忆。要启用此功能,请使用嵌入模型配置 store:
from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)
现在在搜索时,你可以使用自然语言查询来查找相关记忆:
# 查找关于食物偏好的记忆
# (可以在将记忆放入存储后执行此操作)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # Return top 3 matches
)
你可以通过配置 fields 参数或在存储记忆时指定 index 参数来控制哪些记忆部分会被嵌入:
# 存储时指定要嵌入的字段
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # Only embed "food_preferences" field
)

# 不嵌入存储(仍可检索,但不可搜索)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

在 LangGraph 中使用

有了这些,我们在 LangGraph 中使用 store。store 与检查点协同工作:检查点将状态保存到线程(如上所述),store 允许我们存储任意信息以供_跨_线程访问。我们如下同时使用检查点和 store 编译图。
from dataclasses import dataclass
from langgraph.checkpoint.memory import InMemorySaver

@dataclass
class Context:
    user_id: str

# 我们需要这个,因为我们想要启用线程(对话)
checkpointer = InMemorySaver()

# ... 定义图 ...

# 使用检查点和 store 编译图
builder = StateGraph(MessagesState, context_schema=Context)
# ... 添加节点和边 ...
graph = builder.compile(checkpointer=checkpointer, store=store)
我们像以前一样使用 thread_id 调用图,还使用 user_id,我们将用它来为此特定用户的记忆命名空间,如上所示。
# 调用图
config = {"configurable": {"thread_id": "1"}}

# 首先让我们向 AI 打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]},
    config,
    stream_mode="updates",
    context=Context(user_id="1"),
):
    print(update)
你可以通过使用 Runtime 对象在_任何节点_中访问 store 和 user_id。当你将其作为参数添加到节点函数时,LangGraph 会自动注入 Runtime。以下是如何使用它来保存记忆:
from langgraph.runtime import Runtime
from dataclasses import dataclass

@dataclass
class Context:
    user_id: str

async def update_memory(state: MessagesState, runtime: Runtime[Context]):

    # 从运行时上下文获取用户 id
    user_id = runtime.context.user_id

    # 对记忆命名空间化
    namespace = (user_id, "memories")

    # ... 分析对话并创建新记忆

    # 创建新的记忆 ID
    memory_id = str(uuid.uuid4())

    # 我们创建一条新记忆
    await runtime.store.aput(namespace, memory_id, {"memory": memory})

如上所示,我们还可以在任何节点中访问 store 并使用 store.search 方法获取记忆。回想一下,记忆以可以转换为字典的对象列表形式返回。
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
我们可以访问记忆并在模型调用中使用它们。
from dataclasses import dataclass
from langgraph.runtime import Runtime

@dataclass
class Context:
    user_id: str

async def call_model(state: MessagesState, runtime: Runtime[Context]):
    # 从运行时上下文获取用户 id
    user_id = runtime.context.user_id

    # 对记忆命名空间化
    namespace = (user_id, "memories")

    # 基于最新消息进行搜索
    memories = await runtime.store.asearch(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... 在模型调用中使用记忆
如果我们创建一个新线程,只要 user_id 相同,我们仍然可以访问相同的记忆。
# 在新线程上调用图
config = {"configurable": {"thread_id": "2"}}

# 再次打招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]},
    config,
    stream_mode="updates",
    context=Context(user_id="1"),
):
    print(update)
当我们使用 LangSmith 时,无论是本地(例如在 Studio 中)还是通过 LangSmith 托管,基础 store 默认可用,在图编译期间无需指定。但是,要启用语义搜索,你确实需要在 langgraph.json 文件中配置索引设置。例如:
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
有关更多详细信息和配置选项,请参见部署指南

检查点库

在底层,检查点由符合 BaseCheckpointSaver 接口的检查点对象提供支持。LangGraph 提供了多种检查点实现,均通过独立的可安装库实现:
  • langgraph-checkpoint:检查点保存器的基础接口(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)。包含用于实验的内存检查点实现(InMemorySaver)。LangGraph 自带 langgraph-checkpoint
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库的 LangGraph 检查点实现(SqliteSaver / AsyncSqliteSaver)。适用于实验和本地工作流。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库的高级检查点(PostgresSaver / AsyncPostgresSaver),用于 LangSmith。适用于生产环境。需要单独安装。
  • langgraph-checkpoint-cosmosdb:使用 Azure Cosmos DB 的 LangGraph 检查点实现(@[CosmosDBSaver] / @[AsyncCosmosDBSaver])。适用于在 Azure 中的生产环境。支持同步和异步操作。需要单独安装。

检查点接口

每个检查点都符合 BaseCheckpointSaver 接口并实现以下方法:
  • .put - 存储带有配置和元数据的检查点。
  • .put_writes - 存储链接到检查点的中间写入(即待处理写入)。
  • .get_tuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。用于在 graph.get_state() 中填充 StateSnapshot
  • .list - 列出符合给定配置和过滤条件的检查点。用于在 graph.get_state_history() 中填充状态历史。
如果检查点与异步图执行一起使用(即通过 .ainvoke.astream.abatch 执行图),将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。
对于异步运行图,你可以使用 InMemorySaver,或 Sqlite/Postgres 检查点的异步版本——AsyncSqliteSaver / AsyncPostgresSaver 检查点。

序列化器

当检查点保存图状态时,需要序列化状态中的通道值。这通过序列化器对象完成。 langgraph_checkpoint 定义了用于实现序列化器的协议,并提供了一个默认实现(JsonPlusSerializer),它可以处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

使用 pickle 进行序列化

默认序列化器 JsonPlusSerializer 底层使用 ormsgpack 和 JSON,不适用于所有类型的对象。 如果你想对当前不受我们的 msgpack 编码器支持的对象(如 Pandas dataframes)回退到 pickle, 可以使用 JsonPlusSerializerpickle_fallback 参数:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... Define the graph ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

加密

检查点可以选择加密所有持久化状态。要启用此功能,请将 EncryptedSerializer 的实例传递给任何 BaseCheckpointSaver 实现的 serde 参数。创建加密序列化器最简单的方法是通过 from_pycryptodome_aes,它从 LANGGRAPH_AES_KEY 环境变量读取 AES 密钥(或接受 key 参数):
import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()
在 LangSmith 上运行时,只要存在 LANGGRAPH_AES_KEY,加密就会自动启用,因此你只需提供环境变量。其他加密方案可以通过实现 CipherProtocol 并将其提供给 EncryptedSerializer 来使用。

功能

人机协作

首先,检查点通过允许人工检查、中断和批准图步骤来促进人机协作工作流。这些工作流需要检查点,因为人工需要能够在任何时间点查看图的状态,并且在人工对状态进行任何更新后,图必须能够恢复执行。参见使用指南获取示例。

记忆

其次,检查点允许在交互之间实现”记忆”。在反复的人机交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留之前交互的记忆。参见添加记忆了解如何使用检查点添加和管理对话记忆。

时间旅行

第三,检查点允许”时间旅行”,允许用户回放之前的图执行以检查和/或调试特定的图步骤。此外,检查点使得在任意检查点分叉图状态成为可能,以探索替代轨迹。

容错

最后,检查点还提供容错和错误恢复:如果一个或多个节点在给定的超步失败,你可以从最后一个成功步骤重新启动图。此外,当图节点在给定超步中执行中途失败时,LangGraph 会存储在该超步中成功完成的任何其他节点的待处理检查点写入,这样每当我们从该超步恢复图执行时,就不会重新运行成功的节点。

待处理写入

此外,当图节点在给定超步中执行中途失败时,LangGraph 会存储在该超步中成功完成的任何其他节点的待处理检查点写入,这样每当我们从该超步恢复图执行时,就不会重新运行成功的节点。