Skip to main content
函数式 API 允许您以最小的代码更改将 LangGraph 的关键功能(持久化内存人机回环流式处理)添加到您的应用程序中。 它旨在将这些功能集成到可能使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和控制流的现有代码中。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行严格执行模型的情况下整合这些功能。 函数式 API 使用两个关键构建块:
  • entrypoint:一个入口点封装了工作流逻辑并管理执行流,包括处理长时间运行的任务和中断。
  • task:表示一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 future 的对象,可以被等待或同步解析。
这为构建具有状态管理和流式处理的工作流提供了一个最小的抽象。
有关如何使用函数式 API 的信息,请参阅 使用函数式 API

函数式 API 与图 API

对于更喜欢声明式方法的用户,LangGraph 的 图 API 允许您使用图范式定义工作流。两个 API 共享相同的底层运行时,因此您可以在同一应用程序中一起使用它们。 以下是一些关键区别:
  • 控制流:函数式 API 不需要考虑图结构。您可以使用标准的 Python 构造来定义工作流。这通常会减少您需要编写的代码量。
  • 短期记忆图 API 需要声明一个 状态,并且可能需要定义 归约器 来管理图状态的更新。@entrypoint@tasks 不需要显式状态管理,因为它们的状态是函数作用域内的,并且不会在函数之间共享。
  • 检查点:两个 API 都会生成和使用检查点。在 图 API 中,每个 超级步骤 之后都会生成一个新的检查点。在 函数式 API 中,当任务执行时,其结果会保存到与给定入口点关联的现有检查点中,而不是创建新的检查点。
  • 可视化:图 API 可以轻松地将工作流可视化为图,这对于调试、理解工作流和与他人共享非常有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。

示例

下面我们演示一个简单的应用程序,它撰写一篇文章并 中断 以请求人工审查。
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // 长时间运行任务的占位符。
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // 任何可 JSON 序列化的有效负载作为参数提供给 interrupt。
      // 当从工作流流式传输数据时,它将在客户端显示为 Interrupt。
      essay, // 我们想要审查的文章。
      // 我们可以添加任何需要的额外信息。
      // 例如,引入一个名为 "action" 的键并附带一些说明。
      action: "Please approve/reject the essay",
    });

    return {
      essay, // 生成的文章
      isApproved, // 来自 HIL 的响应
    };
  }
);
此工作流将撰写一篇关于主题 “cat” 的文章,然后暂停以获取人工审查。工作流可以中断无限时间,直到提供审查。当工作流恢复时,它会从头开始执行,但由于 writeEssay 任务的结果已保存,任务结果将从检查点加载,而不是重新计算。
import { v7 as uuid7 } from "uuid";
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // 这是长时间运行任务的占位符。
  await new Promise(resolve => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // 任何可 JSON 序列化的有效负载作为参数提供给 interrupt。
      // 当从工作流流式传输数据时,它将在客户端显示为 Interrupt。
      essay, // 我们想要审查的文章。
      // 我们可以添加任何需要的额外信息。
      // 例如,引入一个名为 "action" 的键并附带一些说明。
      action: "Please approve/reject the essay",
    });

    return {
      essay, // 生成的文章
      isApproved, // 来自 HIL 的响应
    };
  }
);

const threadId = uuid7();

const config = {
  configurable: {
    thread_id: threadId
  }
};

for await (const item of workflow.stream("cat", config)) {
  console.log(item);
}
{ writeEssay: 'An essay about topic: cat' }
{
  __interrupt__: [{
    value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
    resumable: true,
    ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
    when: 'during'
  }]
}
一篇文章已撰写完毕,准备接受审查。一旦提供审查,我们可以恢复工作流:
import { Command } from "@langchain/langgraph";

// 从用户获取审查(例如,通过 UI)
// 在本例中,我们使用布尔值,但这可以是任何可 JSON 序列化的值。
const humanReview = true;

const stream = await workflow.stream(
  new Command({ resume: humanReview }),
  config
);
for await (const item of stream) {
  console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
工作流已完成,审查已添加到文章中。

入口点

entrypoint 函数可用于从函数创建工作流。它封装了工作流逻辑并管理执行流,包括处理 长时间运行的任务中断

定义

入口点 是通过使用配置和函数调用 entrypoint 函数来定义的。 该函数必须接受一个位置参数,该参数用作工作流输入。如果需要传递多个数据片段,请使用对象作为第一个参数的输入类型。 使用函数创建入口点会生成一个工作流实例,该实例有助于管理工作流的执行(例如,处理流式传输、恢复和检查点)。 您通常需要向 entrypoint 函数传递一个 检查点器 以启用持久化并使用 人机回环 等功能。
import { entrypoint } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: Record<string, any>): Promise<number> => {
    // 可能涉及长时间运行任务(如 API 调用)的逻辑,
    // 并且可能因人机回环而中断
    return result;
  }
);
序列化 入口点的输入输出必须是可 JSON 序列化的,以支持检查点。请参阅 序列化 部分了解更多详情。

执行

使用 entrypoint 函数将返回一个可以使用 invokestream 方法执行的对象。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};
await myWorkflow.invoke(someInput, config); // 等待结果

恢复

中断 之后恢复执行,可以通过向 Command 原语传递一个 恢复 值来完成。
import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
错误后恢复 要在错误后恢复,请使用 null 和相同的 线程 ID(配置)运行 entrypoint 这假设底层错误已解决,并且执行可以成功继续。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(null, config);

短期记忆

当使用 checkpointer 定义 entrypoint 时,它会在同一 线程 ID 的连续调用之间将信息存储在 检查点 中。 这允许使用 getPreviousState 函数访问先前调用的状态。 默认情况下,getPreviousState 函数返回先前调用的返回值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    return number + previous;
  }
);

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(1, config); // 1 (previous was undefined)
await myWorkflow.invoke(2, config); // 3 (previous was 1 from the previous invocation)

entrypoint.final

entrypoint.final 是一个特殊的原语,可以从入口点返回,并允许将保存在检查点中的值入口点的返回值解耦。 第一个值是入口点的返回值,第二个值是将保存在检查点中的值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    // 这将把先前的值返回给调用者,将
    // 2 * number 保存到检查点,该值将在下一次调用中
    // 用于 `previous` 参数。
    return entrypoint.final({
      value: previous,
      save: 2 * number,
    });
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await myWorkflow.invoke(3, config); // 0 (previous was undefined)
await myWorkflow.invoke(1, config); // 6 (previous was 3 * 2 from the previous invocation)

任务

任务 表示一个离散的工作单元,例如 API 调用或数据处理步骤。它有两个关键特征:
  • 异步执行:任务设计为异步执行,允许多个操作并发运行而不会阻塞。
  • 检查点:任务结果保存到检查点,支持从最后保存的状态恢复工作流。(有关更多详情,请参阅 持久化)。

定义

任务使用 task 函数定义,该函数包装一个常规函数。
import { task } from "@langchain/langgraph";

const slowComputation = task("slowComputation", async (inputValue: any) => {
  // 模拟长时间运行的操作
  return result;
});
序列化 任务的输出必须是可 JSON 序列化的,以支持检查点。

执行

任务 只能从 入口点、另一个 任务状态图节点 内部调用。 任务_不能_直接从主应用程序代码调用。 当您调用任务时,它会返回一个可以等待的 Promise。
const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: number): Promise<number> => {
    return await slowComputation(someInput);
  }
);

何时使用任务

任务 在以下场景中很有用:
  • 检查点:当您需要将长时间运行操作的结果保存到检查点时,这样在恢复工作流时就不需要重新计算。
  • 人机回环:如果您正在构建需要人工干预的工作流,您必须使用任务来封装任何随机性(例如 API 调用),以确保工作流可以正确恢复。有关更多详情,请参阅 确定性 部分。
  • 并行执行:对于 I/O 密集型任务,任务 支持并行执行,允许多个操作并发运行而不会阻塞(例如,调用多个 API)。
  • 可观察性:将操作包装在任务中提供了一种跟踪工作流进度和监控单个操作执行情况的方法,使用 LangSmith
  • 可重试工作:当工作需要重试以处理失败或不一致时,任务 提供了一种封装和管理重试逻辑的方法。

序列化

LangGraph 中的序列化有两个关键方面:
  1. entrypoint 输入和输出必须是可 JSON 序列化的。
  2. task 输出必须是可 JSON 序列化的。
这些要求对于启用检查点和工作流恢复是必要的。使用原语(如对象、数组、字符串、数字和布尔值)来确保您的输入和输出是可序列化的。 序列化确保工作流状态(例如任务结果和中间值)可以可靠地保存和恢复。这对于启用人机回环交互、容错和并行执行至关重要。 提供不可序列化的输入或输出将在配置了检查点器的工作流中导致运行时错误。

确定性

要利用人机回环等功能,任何随机性都应封装在任务内部。这保证了当执行暂停(例如,用于人机回环)然后恢复时,它将遵循相同的_步骤序列_,即使任务结果是非确定性的。 LangGraph 通过在执行时持久化任务子图 结果来实现此行为。设计良好的工作流确保恢复执行遵循_相同的步骤序列_,允许正确检索先前计算的结果,而无需重新执行它们。这对于长时间运行的任务或具有非确定性结果的任务特别有用,因为它避免了重复先前完成的工作,并允许从本质上相同的位置恢复。 虽然工作流的不同运行可能产生不同的结果,但恢复特定运行应始终遵循相同的记录步骤序列。这允许 LangGraph 高效地查找在图中断之前执行的任务子图结果,并避免重新计算它们。

幂等性

幂等性确保多次运行同一操作会产生相同的结果。这有助于防止在步骤因失败而重新运行时出现重复的 API 调用和冗余处理。始终将 API 调用放在任务函数内以进行检查点,并将它们设计为在重新执行时是幂等的。重新执行可能发生在任务开始但未成功完成时。然后,如果工作流恢复,任务将再次运行。使用幂等键或验证现有结果以避免重复。

常见陷阱

处理副作用

将副作用(例如,写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会多次执行。
在此示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时将执行第二次。
import { entrypoint, interrupt } from "@langchain/langgraph";
import fs from "fs";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow },
  async (inputs: Record<string, any>) => {
    // 此代码将在恢复工作流时执行第二次。
    // 这可能不是您想要的。
    fs.writeFileSync("output.txt", "Side effect executed");
    const value = interrupt("question");
    return value;
  }
);

非确定性控制流

可能每次给出不同结果的操作(例如获取当前时间或随机数)应封装在任务中,以确保在恢复时返回相同的结果。
  • 在任务中:获取随机数 (5) → 中断 → 恢复 → (再次返回 5) → …
  • 不在任务中:获取随机数 (5) → 中断 → 恢复 → 获取新的随机数 (7) → …
当使用具有多个中断调用的人机回环工作流时,这一点尤其重要。LangGraph 为每个任务/入口点保留一个恢复值列表。当遇到中断时,它会与相应的恢复值匹配。此匹配严格基于索引,因此恢复值的顺序应与中断的顺序匹配。 如果恢复时未保持执行顺序,一个 interrupt 调用可能会与错误的 resume 值匹配,从而导致结果不正确。 请阅读 确定性 部分了解更多详情。
在此示例中,工作流使用当前时间来确定要执行哪个任务。这是非确定性的,因为工作流的结果取决于其执行时间。
import { entrypoint, interrupt } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (inputs: { t0: number }) => {
    const t1 = Date.now();

    const deltaT = t1 - inputs.t0;

    if (deltaT > 1000) {
      const result = await slowTask(1);
      const value = interrupt("question");
      return { result, value };
    } else {
      const result = await slowTask(2);
      const value = interrupt("question");
      return { result, value };
    }
  }
);

了解更多