Skip to main content
持久执行是一种技术,它允许进程或工作流在关键点保存其进度,从而使其能够暂停并在稍后从中断的确切位置恢复。这在需要人机交互的场景中特别有用,用户可以在继续之前检查、验证或修改进程;也适用于可能遇到中断或错误(例如,调用 LLM 超时)的长时间运行的任务。通过保存已完成的工作,持久执行使进程能够恢复而无需重新处理之前的步骤——即使是在相当长的延迟之后(例如,一周后)。 LangGraph 内置的持久化层为工作流提供了持久执行,确保每个执行步骤的状态都保存到持久存储中。此功能保证了如果工作流被中断——无论是由于系统故障还是为了人机交互——它可以从最后记录的状态恢复。
如果你正在使用带有 checkpointer 的 LangGraph,你已经启用了持久执行。你可以随时暂停和恢复工作流,即使在中断或故障之后也是如此。 为了充分利用持久执行,请确保你的工作流被设计为确定性的幂等的,并将任何副作用或非确定性操作包装在任务中。你可以使用来自 StateGraph (Graph API)Functional API任务

要求

要在 LangGraph 中利用持久执行,你需要:
  1. 通过指定将保存工作流进度的 checkpointer 在你的工作流中启用持久化
  2. 在执行工作流时指定一个线程标识符。这将跟踪特定工作流实例的执行历史。
  3. 将任何非确定性操作(例如,随机数生成)或具有副作用的操作(例如,文件写入、API 调用)包装在任务中,以确保当工作流恢复时,这些操作不会为特定的运行重复执行,而是从持久化层检索其结果。有关更多信息,请参阅确定性和一致重放

确定性和一致重放

当你恢复工作流运行时,代码会从执行停止的同一行代码恢复;相反,它将识别一个适当的起点,从那里开始继续执行。这意味着工作流将从起点重放所有步骤,直到到达停止点。 因此,当你编写用于持久执行的工作流时,你必须将任何非确定性操作(例如,随机数生成)和任何具有副作用的操作(例如,文件写入、API 调用)包装在任务节点中。 为确保你的工作流是确定性的并且可以一致地重放,请遵循以下准则:
  • 避免重复工作:如果一个节点包含多个具有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作包装在一个单独的 task 中。这确保了当工作流恢复时,操作不会重复,并且从持久化层检索其结果。
  • 封装非确定性操作: 将任何可能产生非确定性结果的代码(例如,随机数生成)包装在 tasksnodes 中。这确保了在恢复时,工作流遵循确切记录的步骤序列并产生相同的结果。
  • 使用幂等操作:尽可能确保副作用(例如,API 调用、文件写入)是幂等的。这意味着如果操作在工作流失败后重试,它将具有与第一次执行时相同的效果。这对于导致数据写入的操作尤为重要。如果 task 启动但未能成功完成,工作流的恢复将重新运行 task,依靠记录的结果来保持一致性。使用幂等键或验证现有结果以避免意外重复,确保平滑和可预测的工作流执行。
有关要避免的陷阱的一些示例,请参阅功能 API 中的常见陷阱部分,其中展示了 如何使用 tasks 构建代码以避免这些问题。相同的原则适用于 StateGraph (Graph API)

持久性模式

LangGraph 支持三种持久性模式,允许你根据应用程序的要求平衡性能和数据一致性。更高的持久性模式会给工作流执行增加更多的开销。你可以在调用任何图执行方法时指定持久性模式:
await graph.stream(
  { input: "test" },
  { durability: "sync" }
)
持久性模式从最低到最高如下:
  • "exit":LangGraph 仅在图执行成功退出、出错或由于人机交互中断时持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态不会保存,因此你无法从执行过程中发生的系统故障(如进程崩溃)中恢复。
  • "async":LangGraph 在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但在执行期间进程崩溃时,LangGraph 存在不写入检查点的小风险。
  • "sync":LangGraph 在下一步开始之前同步持久化更改。这确保了 LangGraph 在继续执行之前写入每个检查点,以一定的性能开销为代价提供高持久性。

在节点中使用任务

如果一个节点包含多个操作,你可能会发现将每个操作转换为一个 task 比将操作重构为单个节点更容易。
import { StateGraph, StateSchema, GraphNode, START, END, MemorySaver } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";
import * as z from "zod";

// 定义 StateSchema 以表示状态
const State = new StateSchema({
  url: z.string(),
  result: z.string().optional(),
});

const callApi: GraphNode<typeof State> = async (state) => {
  const response = await fetch(state.url);
  const text = await response.text();
  const result = text.slice(0, 100); // 副作用
  return {
    result,
  };
};

// 创建 StateGraph 构建器并为 callApi 函数添加节点
const builder = new StateGraph(State)
  .addNode("callApi", callApi)
  .addEdge(START, "callApi")
  .addEdge("callApi", END);

// 指定 checkpointer
const checkpointer = new MemorySaver();

// 使用 checkpointer 编译图
const graph = builder.compile({ checkpointer });

// 定义带有线程 ID 的配置。
const threadId = uuidv4();
const config = { configurable: { thread_id: threadId } };

// 调用图
await graph.invoke({ url: "https://www.example.com" }, config);

恢复工作流

一旦你在工作流中启用了持久执行,你可以在以下场景中恢复执行:
  • 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新的状态恢复它。有关更多详细信息,请参阅中断
  • 从故障中恢复: 在异常(例如,LLM 提供商中断)后,从最后成功的检查点自动恢复工作流。这涉及通过提供 null 作为输入值来使用相同的线程标识符执行工作流(请参阅功能 API 的此示例)。

恢复工作流的起点

  • 如果你使用的是 StateGraph (Graph API),起点是执行停止的节点的开始。
  • 如果你在节点内进行子图调用,起点将是调用已停止子图的节点。 在子图内部,起点将是执行停止的特定节点
  • 如果你使用的是 Functional API,起点是执行停止的入口点的开始。