Skip to main content
持久化执行是一种技术,其中进程或工作流在关键点保存其进度,允许其暂停并在之后准确地从上次中断的地方恢复。这在需要Human in the Loop的场景中特别有用,用户可以在继续之前检查、验证或修改流程;也适用于可能遇到中断或错误(例如,调用大语言模型超时)的长时间运行任务。通过保存已完成的工作,持久化执行使进程无需重新处理之前的步骤即可恢复——即使经过显著延迟(例如,一周后)。 LangGraph 内置的持久化层为工作流提供持久化执行,确保每个执行步骤的状态都保存到持久化存储中。此功能保证,如果工作流被中断——无论是由于系统故障还是Human in the Loop交互——都可以从其最后记录的状态恢复。
如果你正在使用带有检查点保存器的 LangGraph,那么你已经启用了持久化执行。你可以在任何点暂停和恢复工作流,即使在中断或故障之后。 为了充分利用持久化执行,请确保你的工作流设计为确定性幂等的,并将任何副作用或非确定性操作包装在任务中。你可以使用来自 StateGraph (Graph API)函数式 API任务

要求

要在 LangGraph 中利用持久化执行,你需要:
  1. 通过指定一个检查点保存器来启用工作流中的持久化,该保存器将保存工作流进度。
  2. 在执行工作流时指定一个线程标识符。这将跟踪特定工作流实例的执行历史。
  3. 将任何非确定性操作(例如,随机数生成)或具有副作用的操作(例如,文件写入、API 调用)包装在任务中,以确保当工作流恢复时,这些操作不会为特定运行重复执行,而是从持久化层检索其结果。更多信息,请参阅确定性与一致重放

确定性与一致重放

当你恢复工作流运行时,代码不会从执行停止的同一行代码恢复;相反,它将识别一个合适的起始点以从中断处继续。这意味着工作流将从起始点重放所有步骤,直到达到停止的点。 因此,当你为持久化执行编写工作流时,必须将任何非确定性操作(例如,随机数生成)和任何具有副作用的操作(例如,文件写入、API 调用)包装在任务节点中。 为确保你的工作流是确定性的并且可以一致重放,请遵循以下准则:
  • 避免重复工作:如果一个节点包含多个具有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作包装在单独的任务中。这确保了当工作流恢复时,这些操作不会重复执行,其结果从持久化层检索。
  • 封装非确定性操作: 将任何可能产生非确定性结果的代码(例如,随机数生成)包装在任务节点中。这确保了在恢复时,工作流遵循完全相同的记录步骤序列并产生相同的结果。
  • 使用幂等操作:尽可能确保副作用(例如,API 调用、文件写入)是幂等的。这意味着如果一个操作在工作流失败后重试,它将与第一次执行时具有相同的效果。这对于导致数据写入的操作尤为重要。如果一个任务开始但未能成功完成,工作流的恢复将重新运行该任务,依赖于记录的结果来保持一致性。使用幂等键或验证现有结果以避免意外重复,确保工作流执行顺畅且可预测。
有关需要避免的陷阱示例,请参阅函数式 API 中的常见陷阱部分,该部分展示了如何使用任务构建代码以避免这些问题。相同的原则适用于 StateGraph (Graph API)

持久化模式

LangGraph 支持三种持久化模式,允许你根据应用程序的需求平衡性能和数据一致性。更高的持久化模式会为工作流执行增加更多开销。你可以在调用任何图执行方法时指定持久化模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)
持久化模式从最低到最高持久性如下:
  • "exit":LangGraph 仅在图执行成功退出、出错或由于Human in the Loop中断时持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态未保存,因此你无法从执行过程中发生的系统故障(如进程崩溃)中恢复。
  • "async":LangGraph 在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但如果进程在执行过程中崩溃,存在 LangGraph 未写入检查点的小风险。
  • "sync":LangGraph 在下一步开始之前同步持久化更改。这确保 LangGraph 在继续执行之前写入每个检查点,以一定的性能开销为代价提供高持久性。

在节点中使用任务

如果一个节点包含多个操作,你可能会发现将每个操作转换为任务比将操作重构为单独的节点更容易。
from typing import NotRequired
from typing_extensions import TypedDict
from langchain_core.utils.uuid import uuid7

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个 TypedDict 来表示状态
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """发出 API 请求的示例节点。"""
    result = requests.get(state['url']).text[:100]  # 副作用  #
    return {
        "result": result
    }

# 创建一个 StateGraph 构建器并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定一个检查点保存器
checkpointer = InMemorySaver()

# 使用检查点保存器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义一个带有线程 ID 的配置。
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)

恢复工作流

一旦你在工作流中启用了持久化执行,你可以在以下场景中恢复执行:
  • 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新的状态恢复它。更多详情,请参阅中断
  • 从故障中恢复: 在异常(例如,大语言模型提供商中断)后,自动从最后一个成功的检查点恢复工作流。这涉及使用相同的线程标识符执行工作流,为其提供 None 作为输入值(参见此示例使用函数式 API)。

恢复工作流的起始点

  • 如果你使用的是 StateGraph (Graph API),起始点是执行停止的节点的开头。
  • 如果你在节点内进行子图调用,起始点将是调用被中止子图的节点。 在子图内部,起始点将是执行停止的特定节点
  • 如果你使用的是函数式 API,起始点是执行停止的入口点的开头。

优雅关闭

需要 langgraph>=1.2,目前处于 alpha 阶段。
优雅关闭允许你在当前超级步骤完成后协作地停止正在进行的图运行,并保存一个可恢复的检查点。这对于处理 SIGTERM 信号或任何需要回收资源而不丢失工作的外部监督器很有用。 创建一个 RunControl 并将其作为 control= 传递给 invokestream。从任何线程调用 request_drain() 以发出运行应停止的信号:
from langgraph.runtime import RunControl
from langgraph.errors import GraphDrained

control = RunControl()

# 在信号处理程序或监督器中:
# control.request_drain("sigterm")

try:
    result = graph.invoke(inputs, config, control=control)
except GraphDrained as e:
    # 图提前停止并保存了检查点。
    # 稍后使用相同的配置恢复。
    print(f"Drained: {e.reason}")

语义

Drain 是协作式的,在超级步骤之间运行,从不抢占已经运行的工作:
场景行为
节点正在执行中运行至完成。Drain 在下一个超级步骤生效。
节点具有重试策略且当前正在重试重试循环运行至耗尽或成功。之后 Drain 生效。
图在 drain 的同一 tick 自然完成正常返回。检查 control.drain_requested 以与正常运行区分。
还有更多超级步骤抛出 GraphDrained(reason)。检查点已保存且可恢复。
子图请求 drainGraphDrained 向上传播到父图,并在其自身的下一个超级步骤边界停止它。

drain 后恢复

使用 invoke(None, config) 并使用相同的 thread_id 恢复已 drain 的运行:
result = graph.invoke(None, config)

在节点内读取 drain 状态

通过 runtime 参数访问 drain 状态,以便在达到超级步骤边界之前调整节点行为:
from langgraph.runtime import Runtime

async def my_node(state: State, runtime: Runtime) -> State:
    if runtime.drain_requested:
        # 跳过昂贵的工作并返回最小结果
        return {"status": "skipped", "reason": runtime.drain_reason}
    return {"status": await do_work()}

SIGTERM 钩子模式

处理进程关闭的推荐模式:
import signal
from langgraph.runtime import RunControl
from langgraph.errors import GraphDrained

control = RunControl()
signal.signal(signal.SIGTERM, lambda *_: control.request_drain("sigterm"))

try:
    result = graph.invoke(inputs, config, control=control)
except GraphDrained as e:
    log.info("graph drained: %s", e.reason)
    # 在下次启动时使用相同的配置恢复
request_drain() 不会取消正在运行的 asyncio 任务或终止线程。对于硬性上限,请将 drain 与优雅超时和任务取消结合使用。