Skip to main content
功能性 API 允许你在最少改动现有代码的情况下,将 LangGraph 的关键功能(持久化记忆人工介入流式处理)添加到你的应用中。 该 API 设计用于将这些功能整合到可能使用标准控制流语法(例如 if 语句、for 循环和函数调用)的现有代码中。与许多要求将代码重构为明确管道或 DAG 的数据编排框架不同,功能性 API 允许你在不强制采用刚性执行模型的情况下集成这些能力。 功能性 API 使用两个关键构建块:
  • entrypoint – 封装工作流逻辑并管理执行流,包括处理长时任务和中断。
  • task – 表示一个独立的工作单元(如 API 调用或数据处理步骤),可在 entrypoint 内异步执行。任务返回类似 future 的对象,可被 await 或同步解析。
这为具有状态管理和流式能力的工作流提供了一个最小抽象。
有关如何使用功能性 API 的信息,请参见 Use Functional API

功能性 API 与 图 API

对于喜欢声明式方式的用户,LangGraph 的 图 API 允许你使用图的范式来定义工作流。两个 API 使用相同的运行时,因此可以在同一应用中混合使用。 关键差异:
  • 控制流:功能性 API 不要求考虑图结构,你可以使用常规 TypeScript/JavaScript 结构来定义工作流,通常能减少需编写的代码量。
  • 短期记忆Graph API 要求声明 State,并可能需要定义 reducers 来管理状态更新。@entrypoint@tasks 不要求显式状态管理,因为它们的状态作用域限于函数本身,函数间不共享状态。
  • 检查点:两者都会生成并使用检查点。在 Graph API 中,每个 superstep 后都会生成新的检查点。而在功能性 API 中,当任务执行时,其结果会保存到与给定 entrypoint 关联的现有检查点中,而不是创建新的检查点。
  • 可视化:Graph API 便于将工作流可视化为图,便于调试和交流。功能性 API 没有此可视化支持,因为图是在运行时动态生成的。

示例

下面演示一个简单应用:写一篇文章并通过 interrupts 暂停以请求人工审核。
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // 模拟一个长时间运行的任务。
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // 传入到 interrupt 的任何可 JSON 序列化的负载。
      // 在从工作流流式传输数据到客户端时,会作为 Interrupt 被展示。
      essay, // 我们希望人工审核的文章。
      // 可以添加任何附加信息,例如添加一个名为 "action" 的键作为指示。
      action: "Please approve/reject the essay",
    });

    return {
      essay, // 生成的文章
      isApproved, // 来自人工的响应
    };
  },
);
该工作流将为主题 “cat” 写一篇文章,然后暂停以等待人工审核。工作流可以无限期中断,直到提供审核结果。当工作流恢复时,它会从开始处执行,但由于 writeEssay 任务的结果已经保存,任务结果将从检查点加载,而不会重新计算。
import { v4 as uuidv4 } from "uuid";
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // 这是一个长时间运行任务的占位符。
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      essay,
      action: "Please approve/reject the essay",
    });

    return {
      essay,
      isApproved,
    };
  },
);

const threadId = uuidv4();

const config = {
  configurable: {
    thread_id: threadId,
  },
};

for await (const item of workflow.stream("cat", config)) {
  console.log(item);
}
{ writeEssay: 'An essay about topic: cat' }
{
  __interrupt__: [{
    value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
    resumable: true,
    ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
    when: 'during'
  }]
}
一篇文章已生成并等待审核。一旦提供审核,我们可以恢复工作流:
import { Command } from "@langchain/langgraph";

// 从用户处获取审核(例如通过 UI)
// 这里使用布尔值,但可以是任何可 JSON 序列化的值。
const humanReview = true;

const stream = await workflow.stream(
  new Command({ resume: humanReview }),
  config,
);
for await (const item of stream) {
  console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
工作流完成,审核结果已添加到文章中。

Entrypoint

entrypoint 函数可用于从函数创建工作流。它封装了工作流逻辑并管理执行流,包括处理长时任务和 中断

定义

entrypoint 通过将配置和函数传入 entrypoint 函数来定义。 该函数必须接受单个位置参数,该参数作为工作流输入。如果需要传递多项数据,请将一个对象作为第一个参数的输入类型。 使用函数创建 entrypoint 将生成一个工作流实例,用于管理工作流执行(例如处理流式、恢复和检查点)。 通常需要将 checkpointer 传递给 entrypoint 以启用持久化并使用人工介入等功能。
import { entrypoint } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: Record<string, any>): Promise<number> => {
    // 可能涉及长时任务(如 API 调用),并可能被人工中断
    return result;
  },
);
序列化Entrypoint 的 输入输出 必须是可 JSON 序列化的,以支持检查点。请参阅 序列化 部分了解更多细节。

执行

使用 entrypoint 函数会返回一个对象,可以使用 invokestream 方法执行。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};
await myWorkflow.invoke(someInput, config); // 等待结果

恢复执行

在遇到 interrupt 后恢复执行,可以通过向 Command 原语传入 resume 值来完成。
import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
在错误后恢复 要在错误后恢复,请使用相同的 thread id(config)以 null 作为输入运行 entrypoint。 这假设底层 错误 已被修复并且执行可以继续。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(null, config);

短期记忆

entrypoint 使用 checkpointer 定义时,它会在相同 thread id 的连续调用之间将信息存储在 检查点 中。 这允许使用 getPreviousState 函数访问前一次调用的状态。 默认情况下,getPreviousState 返回前一次调用的返回值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    return number + previous;
  },
);

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(1, config); // 1 (previous 未定义)
await myWorkflow.invoke(2, config); // 3 (previous 为上一次调用的 1)

entrypoint.final

entrypoint.final 是一个特殊的原语,可以从 entrypoint 返回,允许将保存在检查点中的值entrypoint 的返回值解耦。 第一个值是返回给调用者的值,第二个值将保存到检查点中。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    // 这将把 previous 返回给调用者,同时把 2 * number 保存到检查点中,
    // 下次调用时会用作 `previous` 参数。
    return entrypoint.final({
      value: previous,
      save: 2 * number,
    });
  },
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await myWorkflow.invoke(3, config); // 0 (previous 未定义)
await myWorkflow.invoke(1, config); // 6 (previous 为上一次调用保存的 3 * 2)

Task

任务(task) 表示独立的工作单元,例如 API 调用或数据处理步骤。其两个关键特性:
  • 异步执行:任务设计为异步执行,允许多个操作并发运行而不阻塞。
  • 检查点:任务结果会保存到检查点,从而在恢复工作流程时可以从上次保存的状态继续。(详见 持久化)。

定义

任务通过 task 函数定义,它包装了一个常规函数。
import { task } from "@langchain/langgraph";

const slowComputation = task("slowComputation", async (inputValue: any) => {
  // 模拟长时间运行的操作
  return result;
});
序列化 任务的 输出 必须可 JSON 序列化以支持检查点。

执行

任务 只能在 entrypoint、另一个 task状态图节点 内被调用。 任务不能直接从主应用代码中调用。 当调用 task 时,它会返回一个可 await 的 Promise。
const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: number): Promise<number> => {
    return await slowComputation(someInput);
  },
);

何时使用 task

任务 适合以下场景:
  • 检查点:需要将长时操作结果保存到检查点,以便恢复时无需重算。
  • 人工介入:构建需要人工参与的工作流时,必须使用 tasks 来封装任何随机性(例如 API 调用),以确保可以正确恢复。参见 确定性 节了解更多细节。
  • 并行执行:对于 I/O 密集型任务,tasks 支持并行执行,允许多个操作同时运行(例如调用多个 API)。
  • 可观察性:将操作包装为 tasks 可以让 LangSmith 跟踪工作流进度并监控单个操作的执行。
  • 可重试工作:当工作需要重试以应对失败或不一致时,tasks 提供了一种封装并管理重试逻辑的方式。

序列化

LangGraph 中的序列化有两个关键点:
  1. entrypoint 的输入和输出必须可 JSON 序列化。
  2. task 的输出必须可 JSON 序列化。
这些要求对于启用检查点和工作流恢复至关重要。请使用对象、数组、字符串、数字和布尔值等基本类型以保证可序列化。 提供不可序列化的输入或输出会在配置了 checkpointer 的工作流运行时产生错误。

确定性(Determinism)

要利用例如 人工介入 的功能,任何随机性都应被封装在 tasks 内。这样保证在执行被中止(例如等待人工)然后恢复时,会遵循相同的执行步骤序列,即使 task 的结果本身是非确定性的。 LangGraph 通过在执行期间持久化 task子图 的结果来实现这一点。良好设计的工作流确保恢复执行时遵循相同的已记录步骤序列,从而可以正确检索先前计算的结果而无需重新执行。 虽然不同运行可能产生不同结果,但恢复某次特定运行时应始终遵循相同的记录步骤序列,这允许 LangGraph 有效地查找在中断发生前已经执行的 tasksubgraph 结果并避免重复计算。

幂等性

幂等性保证多次运行同一操作会产生相同结果。这有助于防止在步骤重试时出现重复的 API 调用或冗余处理。务必将 API 调用放在 tasks 内以便检查点控制,并设计这些调用在重执行时是幂等的,例如使用幂等键或验证已有结果以避免重复。

常见陷阱

处理副作用

将副作用(例如写文件、发送邮件)封装在 tasks 中,以确保在恢复工作流时不会被多次执行。
在该示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时会被重复执行。
import { entrypoint, interrupt } from "@langchain/langgraph";
import fs from "fs";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow },
  async (inputs: Record<string, any>) => {
    // 这段代码在恢复工作流时会被重复执行,
    // 这通常不是期望的行为。
    fs.writeFileSync("output.txt", "Side effect executed");
    const value = interrupt("question");
    return value;
  }
);

非确定性控制流

可能每次执行都产生不同结果的操作(如获取当前时间或随机数)应封装在 tasks 中,以确保在恢复时返回相同结果。
  • 在 task 中:获取随机数 (5) → interrupt → 恢复 →(返回 5)→ …
  • 不在 task 中:获取随机数 (5) → interrupt → 恢复 → 获取新的随机数 (7) → …
这在包含多个中断调用的 人工介入 工作流中尤为重要。LangGraph 为每个 task/entrypoint 保持一组 resume 值,当遇到中断时,会基于索引将中断与对应的 resume 值匹配。因此 resume 值的顺序应当与中断的顺序一致。

了解更多