Skip to main content
持久化执行是一种技术,其中进程或工作流在关键点保存其进度,允许其暂停并在之后从中断处恢复执行。这在需要人工介入的场景中特别有用,用户可以在继续之前检查、验证或修改流程;同时也适用于可能遇到中断或错误的长时间运行任务(例如,调用 LLM 超时)。通过保存已完成的工作,持久化执行使进程能够在不重新处理先前步骤的情况下恢复——即使经过了较长时间(例如一周后)。 LangGraph 内置的持久化层为工作流提供了持久化执行功能,确保每个执行步骤的状态都保存到持久化存储中。此功能保证了如果工作流被中断——无论是由于系统故障还是人工介入交互——都可以从其最后记录的状态恢复。
如果您正在使用带有检查点的 LangGraph,您已经启用了持久化执行。您可以在任意时间点暂停和恢复工作流,即使在中断或失败之后。 为了充分利用持久化执行,请确保您的工作流被设计为确定性的幂等的,并将任何副作用或非确定性操作包装在 tasks 中。您可以在 StateGraph(图 API)Functional API 中使用 tasks

前提条件

要在 LangGraph 中使用持久化执行,您需要:
  1. 通过指定一个检查点来为工作流启用持久化,该检查点将保存工作流进度。
  2. 在执行工作流时指定一个线程标识符。这将跟踪工作流特定实例的执行历史。
  3. 将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)包装在 tasks 中,以确保当工作流恢复时,这些操作不会在特定运行中重复执行,而是从持久化层检索其结果。更多信息请参阅确定性与一致性重放

确定性与一致性重放

当您恢复工作流运行时,代码不会从执行停止的同一行代码处恢复;而是会识别一个合适的起始点,从该处继续执行。这意味着工作流将从起始点重放所有步骤,直到到达停止点为止。 因此,当您编写用于持久化执行的工作流时,必须将任何非确定性操作(例如随机数生成)和任何具有副作用的操作(例如文件写入、API 调用)包装在 tasksnodes 中。 为确保您的工作流具有确定性并能够一致性重放,请遵循以下准则:
  • 避免重复工作:如果一个 node 包含多个具有副作用的操作(例如日志记录、文件写入或网络调用),请将每个操作包装在单独的 task 中。这确保了当工作流恢复时,操作不会重复,其结果将从持久化层检索。
  • 封装非确定性操作:将任何可能产生非确定性结果的代码(例如随机数生成)包装在 tasksnodes 中。这确保了在恢复时,工作流遵循相同结果的精确记录步骤序列。
  • 使用幂等操作:在可能的情况下,确保副作用(例如 API 调用、文件写入)是幂等的。这意味着如果在工作流中某个操作在失败后被重试,它将与第一次执行时产生相同的效果。这对于导致数据写入的操作尤为重要。如果某个 task 开始执行但未能成功完成,工作流恢复时将重新运行该 task,依赖记录的结果来保持一致性。使用幂等性键或验证现有结果以避免意外重复,确保工作流执行顺畅且可预测。
有关需要避免的陷阱示例,请参阅 functional API 中的常见陷阱部分,该部分展示了如何使用 tasks 构建代码以避免这些问题。相同的原则也适用于 StateGraph(图 API)

持久化模式

LangGraph 支持三种持久化模式,允许您根据应用程序的需求在性能和数据一致性之间进行平衡。较高的持久化模式会给工作流执行增加更多开销。您可以在调用任何图执行方法时指定持久化模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)
持久化模式从最低到最高持久性依次如下:
  • "exit":LangGraph 仅在图执行成功退出、出现错误或由于人工介入中断时才持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态不会被保存,因此您无法从执行过程中发生的系统故障(如进程崩溃)中恢复。
  • "async":LangGraph 在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但存在一小风险,即如果进程在执行期间崩溃,LangGraph 可能不会写入检查点。
  • "sync":LangGraph 在下一步开始之前同步持久化更改。这确保 LangGraph 在继续执行之前写入每个检查点,以一些性能开销为代价提供高持久性。

在节点中使用 tasks

如果一个 node 包含多个操作,您可能会发现将每个操作转换为 task 比将操作重构为单独的节点更容易。
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# Define a TypedDict to represent the state
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """Example node that makes an API request."""
    result = requests.get(state['url']).text[:100]  # Side-effect  #
    return {
        "result": result
    }

# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# Specify a checkpointer
checkpointer = InMemorySaver()

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph
graph.invoke({"url": "https://www.example.com"}, config)

恢复工作流

一旦在工作流中启用了持久化执行,您可以在以下场景中恢复执行:
  • 暂停和恢复工作流:使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新后的状态恢复它。更多详情请参阅中断
  • 从失败中恢复:在发生异常(例如 LLM 提供商中断)后,从最后一个成功的检查点自动恢复工作流。这涉及使用相同的线程标识符执行工作流,并将 None 作为输入值提供(参见 functional API 的示例)。

恢复工作流的起始点

  • 如果您使用的是 StateGraph(图 API),起始点是执行停止处的 node 的开头。
  • 如果您在节点内进行子图调用,起始点将是调用了被中断子图的节点。 在子图内部,起始点将是执行停止处的特定 node
  • 如果您使用的是 Functional API,起始点是执行停止处的 entrypoint 的开头。