Skip to main content
函数式 API 允许您以最小的代码更改将 LangGraph 的关键功能(持久化内存人机回环流式处理)添加到您的应用程序中。
有关函数式 API 的概念性信息,请参阅 函数式 API

创建简单工作流

定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用字典。
const checkpointer = new MemorySaver();

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { value: number; anotherValue: number }) => {
    const value = inputs.value;
    const anotherValue = inputs.anotherValue;
    // ...
  }
);

await myWorkflow.invoke({ value: 1, anotherValue: 2 });
import { v7 as uuid7 } from "uuid";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// 检查数字是否为偶数的任务
const isEven = task("isEven", async (number: number) => {
  return number % 2 === 0;
});

// 格式化消息的任务
const formatMessage = task("formatMessage", async (isEven: boolean) => {
  return isEven ? "The number is even." : "The number is odd.";
});

// 创建用于持久化的检查点保存器
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (inputs: { number: number }) => {
    // 用于对数字进行分类的简单工作流
    const even = await isEven(inputs.number);
    return await formatMessage(even);
  }
);

// 使用唯一的线程 ID 运行工作流
const config = { configurable: { thread_id: uuid7() } };
const result = await workflow.invoke({ number: 7 }, config);
console.log(result);
此示例演示了如何使用 @task@entrypoint 装饰器进行语法编写。由于提供了检查点保存器,工作流结果将持久保存在检查点保存器中。
import { v7 as uuid7 } from "uuid";
import { ChatOpenAI } from "@langchain/openai";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

const model = new ChatOpenAI({ model: "gpt-3.5-turbo" });

// 任务:使用 LLM 生成文章
const composeEssay = task("composeEssay", async (topic: string) => {
  // 生成关于给定主题的文章
  const response = await model.invoke([
    { role: "system", content: "You are a helpful assistant that writes essays." },
    { role: "user", content: `Write an essay about ${topic}.` }
  ]);
  return response.content as string;
});

// 创建用于持久化的检查点保存器
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (topic: string) => {
    // 使用 LLM 生成文章的简单工作流
    return await composeEssay(topic);
  }
);

// 执行工作流
const config = { configurable: { thread_id: uuid7() } };
const result = await workflow.invoke("the history of flight", config);
console.log(result);

并行执行

可以通过并发调用任务并等待结果来并行执行任务。这对于提高 I/O 密集型任务(例如,调用 LLM 的 API)的性能非常有用。
const addOne = task("addOne", async (number: number) => {
  return number + 1;
});

const graph = entrypoint(
  { checkpointer, name: "graph" },
  async (numbers: number[]) => {
    return await Promise.all(numbers.map(addOne));
  }
);
此示例演示了如何使用 @task 并行运行多个 LLM 调用。每个调用生成一个关于不同主题的段落,并将结果合并为单个文本输出。
import { v7 as uuid7 } from "uuid";
import { ChatOpenAI } from "@langchain/openai";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// 初始化 LLM 模型
const model = new ChatOpenAI({ model: "gpt-3.5-turbo" });

// 生成关于给定主题的段落的任务
const generateParagraph = task("generateParagraph", async (topic: string) => {
  const response = await model.invoke([
    { role: "system", content: "You are a helpful assistant that writes educational paragraphs." },
    { role: "user", content: `Write a paragraph about ${topic}.` }
  ]);
  return response.content as string;
});

// 创建用于持久化的检查点保存器
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (topics: string[]) => {
    // 并行生成多个段落并合并它们
    const paragraphs = await Promise.all(topics.map(generateParagraph));
    return paragraphs.join("\n\n");
  }
);

// 运行工作流
const config = { configurable: { thread_id: uuid7() } };
const result = await workflow.invoke(["quantum computing", "climate change", "history of aviation"], config);
console.log(result);
此示例使用 LangGraph 的并发模型来提高执行时间,尤其是在任务涉及 I/O(如 LLM 补全)时。

调用图

函数式 API图 API 可以在同一应用程序中一起使用,因为它们共享相同的底层运行时。
import { entrypoint } from "@langchain/langgraph";
import { StateGraph } from "@langchain/langgraph";

const builder = new StateGraph(/* ... */);
// ...
const someGraph = builder.compile();

const someWorkflow = entrypoint(
  { name: "someWorkflow" },
  async (someInput: Record<string, any>) => {
    // 调用使用图 API 定义的图
    const result1 = await someGraph.invoke(/* ... */);
    // 调用另一个使用图 API 定义的图
    const result2 = await anotherGraph.invoke(/* ... */);
    return {
      result1,
      result2,
    };
  }
);
import { v7 as uuid7 } from "uuid";
import { entrypoint, MemorySaver, StateGraph, StateSchema } from "@langchain/langgraph";
import * as z from "zod";

// 定义共享状态类型
const State = new StateSchema({
  foo: z.number(),
});

// 使用图 API 构建图
const builder = new StateGraph(State)
  .addNode("double", (state) => {
    return { foo: state.foo * 2 };
  })
  .addEdge("__start__", "double");
const graph = builder.compile();

// 定义函数式 API 工作流
const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (x: number) => {
    const result = await graph.invoke({ foo: x });
    return { bar: result.foo };
  }
);

// 执行工作流
const config = { configurable: { thread_id: uuid7() } };
console.log(await workflow.invoke(5, config)); // 输出: { bar: 10 }

调用其他入口点

您可以从 入口点任务 内部调用其他 入口点
// 将自动使用父入口点的检查点保存器
const someOtherWorkflow = entrypoint(
  { name: "someOtherWorkflow" },
  async (inputs: { value: number }) => {
    return inputs.value;
  }
);

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { value: number }) => {
    const value = await someOtherWorkflow.invoke({ value: 1 });
    return value;
  }
);
import { v7 as uuid7 } from "uuid";
import { entrypoint, MemorySaver } from "@langchain/langgraph";

// 初始化检查点保存器
const checkpointer = new MemorySaver();

// 一个可重用的子工作流,用于乘以一个数字
const multiply = entrypoint(
  { name: "multiply" },
  async (inputs: { a: number; b: number }) => {
    return inputs.a * inputs.b;
  }
);

// 调用子工作流的主工作流
const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: { x: number; y: number }) => {
    const result = await multiply.invoke({ a: inputs.x, b: inputs.y });
    return { product: result };
  }
);

// 执行主工作流
const config = { configurable: { thread_id: uuid7() } };
console.log(await main.invoke({ x: 6, y: 7 }, config)); // 输出: { product: 42 }

流式处理

函数式 API 使用与 图 API 相同的流式处理机制。请阅读 流式处理指南 部分以获取更多详细信息。 使用流式 API 同时流式传输更新和自定义数据的示例。
import {
  entrypoint,
  MemorySaver,
  LangGraphRunnableConfig,
} from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const main = entrypoint(
  { checkpointer, name: "main" },
  async (
    inputs: { x: number },
    config: LangGraphRunnableConfig
  ): Promise<number> => {
    config.writer?.("Started processing");
    const result = inputs.x * 2;
    config.writer?.(`Result is ${result}`);
    return result;
  }
);

const config = { configurable: { thread_id: "abc" } };

for await (const [mode, chunk] of await main.stream(
  { x: 5 },
  { streamMode: ["custom", "updates"], ...config }
)) {
  console.log(`${mode}: ${JSON.stringify(chunk)}`);
}
  1. 在计算开始前发出自定义数据。
  2. 计算结果后发出另一个自定义消息。
  3. 使用 .stream() 处理流式输出。
  4. 指定要使用的流式处理模式。
updates: {"addOne": 2}
updates: {"addTwo": 3}
custom: "hello"
custom: "world"
updates: {"main": 5}

重试策略

import {
  MemorySaver,
  entrypoint,
  task,
  RetryPolicy,
} from "@langchain/langgraph";

// 此变量仅用于演示目的,以模拟网络故障。
// 在实际代码中不会出现。
let attempts = 0;

// 让我们配置 RetryPolicy 以在 ValueError 上重试。
// 默认的 RetryPolicy 针对特定网络错误进行了优化。
const retryPolicy: RetryPolicy = { retryOn: (error) => error instanceof Error };

const getInfo = task(
  {
    name: "getInfo",
    retry: retryPolicy,
  },
  () => {
    attempts += 1;

    if (attempts < 2) {
      throw new Error("Failure");
    }
    return "OK";
  }
);

const checkpointer = new MemorySaver();

const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: Record<string, any>) => {
    return await getInfo();
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await main.invoke({ any_input: "foobar" }, config);
'OK'

缓存任务

import {
  InMemoryCache,
  entrypoint,
  task,
  CachePolicy,
} from "@langchain/langgraph";

const slowAdd = task(
  {
    name: "slowAdd",
    cache: { ttl: 120 },
  },
  async (x: number) => {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    return x * 2;
  }
);

const main = entrypoint(
  { cache: new InMemoryCache(), name: "main" },
  async (inputs: { x: number }) => {
    const result1 = await slowAdd(inputs.x);
    const result2 = await slowAdd(inputs.x);
    return { result1, result2 };
  }
);

for await (const chunk of await main.stream(
  { x: 5 },
  { streamMode: "updates" }
)) {
  console.log(chunk);
}

//> { slowAdd: 10 }
//> { slowAdd: 10, '__metadata__': { cached: true } }
//> { main: { result1: 10, result2: 10 } }
  1. ttl 以秒为单位指定。缓存将在此时间后失效。

错误后恢复

import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// 此变量仅用于演示目的,以模拟网络故障。
// 在实际代码中不会出现。
let attempts = 0;

const getInfo = task("getInfo", async () => {
  /**
   * 模拟一个在成功前失败一次的任务。
   * 在第一次尝试时抛出异常,然后在后续尝试中返回 "OK"。
   */
  attempts += 1;

  if (attempts < 2) {
    throw new Error("Failure"); // 模拟第一次尝试失败
  }
  return "OK";
});

// 初始化内存检查点保存器以进行持久化
const checkpointer = new MemorySaver();

const slowTask = task("slowTask", async () => {
  /**
   * 通过引入 1 秒延迟来模拟运行缓慢的任务。
   */
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return "Ran slow task.";
});

const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: Record<string, any>) => {
    /**
     * 运行 slowTask 和 getInfo 任务的主工作流函数。
     *
     * 参数:
     * - inputs: Record<string, any> 包含工作流输入值。
     *
     * 工作流首先执行 `slowTask`,然后尝试执行 `getInfo`,
     * 该任务将在第一次调用时失败。
     */
    const slowTaskResult = await slowTask(); // 对 slowTask 的阻塞调用
    await getInfo(); // 第一次尝试时将在此处引发异常
    return slowTaskResult;
  }
);

// 具有唯一线程标识符的工作流执行配置
const config = {
  configurable: {
    thread_id: "1", // 用于跟踪工作流执行的唯一标识符
  },
};

// 由于 slowTask 执行,此调用将花费约 1 秒
try {
  // 第一次调用将因 `getInfo` 任务失败而引发异常
  await main.invoke({ any_input: "foobar" }, config);
} catch (err) {
  // 优雅地处理失败
}
当我们恢复执行时,不需要重新运行 slowTask,因为其结果已保存在检查点中。
await main.invoke(null, config);
'Ran slow task.'

人机回环

函数式 API 支持使用 interrupt 函数和 Command 原语的人机回环 工作流。

基本人机回环工作流

我们将创建三个任务
  1. 追加 "bar"
  2. 暂停以等待人工输入。恢复时,追加人工输入。
  3. 追加 "qux"
import { entrypoint, task, interrupt, Command } from "@langchain/langgraph";

const step1 = task("step1", async (inputQuery: string) => {
  // 追加 bar
  return `${inputQuery} bar`;
});

const humanFeedback = task("humanFeedback", async (inputQuery: string) => {
  // 追加用户输入
  const feedback = interrupt(`Please provide feedback: ${inputQuery}`);
  return `${inputQuery} ${feedback}`;
});

const step3 = task("step3", async (inputQuery: string) => {
  // 追加 qux
  return `${inputQuery} qux`;
});
我们现在可以在入口点中组合这些任务:
import { MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const graph = entrypoint(
  { checkpointer, name: "graph" },
  async (inputQuery: string) => {
    const result1 = await step1(inputQuery);
    const result2 = await humanFeedback(result1);
    const result3 = await step3(result2);

    return result3;
  }
);
interrupt() 在任务内部调用,使人类能够审查和编辑前一个任务的输出。先前任务的结果(在本例中为 step_1)会被持久化,因此在 interrupt 之后不会再次运行。 让我们发送一个查询字符串:
const config = { configurable: { thread_id: "1" } };

for await (const event of await graph.stream("foo", config)) {
  console.log(event);
  console.log("\n");
}
注意,我们在 step_1 之后使用 interrupt 暂停。中断提供了恢复运行的说明。要恢复,我们发出一个包含 human_feedback 任务预期数据的 Command
// 继续执行
for await (const event of await graph.stream(
  new Command({ resume: "baz" }),
  config
)) {
  console.log(event);
  console.log("\n");
}
恢复后,运行将通过剩余步骤并按预期终止。

审查工具调用

要在执行前审查工具调用,我们添加一个 review_tool_call 函数,该函数调用 interrupt。调用此函数时,执行将暂停,直到我们发出恢复命令。 给定一个工具调用,我们的函数将 interrupt 以供人工审查。此时我们可以:
  • 接受工具调用
  • 修改工具调用并继续
  • 生成自定义工具消息(例如,指示模型重新格式化其工具调用)
import { ToolCall } from "@langchain/core/messages/tool";
import { ToolMessage } from "@langchain/core/messages";

function reviewToolCall(toolCall: ToolCall): ToolCall | ToolMessage {
  // 审查工具调用,返回经过验证的版本
  const humanReview = interrupt({
    question: "Is this correct?",
    tool_call: toolCall,
  });

  const reviewAction = humanReview.action;
  const reviewData = humanReview.data;

  if (reviewAction === "continue") {
    return toolCall;
  } else if (reviewAction === "update") {
    const updatedToolCall = { ...toolCall, args: reviewData };
    return updatedToolCall;
  } else if (reviewAction === "feedback") {
    return new ToolMessage({
      content: reviewData,
      name: toolCall.name,
      tool_call_id: toolCall.id,
    });
  }

  throw new Error(`Unknown review action: ${reviewAction}`);
}
我们现在可以更新我们的入口点以审查生成的工具调用。如果工具调用被接受或修改,我们以与之前相同的方式执行。否则,我们只追加人类提供的 ToolMessage。先前任务的结果(在本例中为初始模型调用)会被持久化,因此在 interrupt 之后不会再次运行。
import {
  MemorySaver,
  entrypoint,
  interrupt,
  Command,
  addMessages,
} from "@langchain/langgraph";
import { ToolMessage, AIMessage, BaseMessage } from "@langchain/core/messages";

const checkpointer = new MemorySaver();

const agent = entrypoint(
  { checkpointer, name: "agent" },
  async (
    messages: BaseMessage[],
    previous?: BaseMessage[]
  ): Promise<BaseMessage> => {
    if (previous !== undefined) {
      messages = addMessages(previous, messages);
    }

    let modelResponse = await callModel(messages);
    while (true) {
      if (!modelResponse.tool_calls?.length) {
        break;
      }

      // 审查工具调用
      const toolResults: ToolMessage[] = [];
      const toolCalls: ToolCall[] = [];

      for (let i = 0; i < modelResponse.tool_calls.length; i++) {
        const review = reviewToolCall(modelResponse.tool_calls[i]);
        if (review instanceof ToolMessage) {
          toolResults.push(review);
        } else {
          // 是经过验证的工具调用
          toolCalls.push(review);
          if (review !== modelResponse.tool_calls[i]) {
            modelResponse.tool_calls[i] = review; // 更新消息
          }
        }
      }

      // 执行剩余的工具调用
      const remainingToolResults = await Promise.all(
        toolCalls.map((toolCall) => callTool(toolCall))
      );

      // 追加到消息列表
      messages = addMessages(messages, [
        modelResponse,
        ...toolResults,
        ...remainingToolResults,
      ]);

      // 再次调用模型
      modelResponse = await callModel(messages);
    }

    // 生成最终响应
    messages = addMessages(messages, modelResponse);
    return entrypoint.final({ value: modelResponse, save: messages });
  }
);

短期记忆

短期记忆允许在相同线程 ID 的不同调用之间存储信息。有关更多详细信息,请参阅短期记忆

管理检查点

您可以查看和删除检查点保存器存储的信息。

查看线程状态

const config = {
  configurable: {
    thread_id: "1",
    // 可选择提供特定检查点的 ID,
    // 否则将显示最新的检查点
    // checkpoint_id: "1f029ca3-1f5b-6704-8004-820c16b69a5a"
  },
};
await graph.getState(config);
StateSnapshot {
  values: {
    messages: [
      HumanMessage { content: "hi! I'm bob" },
      AIMessage { content: "Hi Bob! How are you doing today?" },
      HumanMessage { content: "what's my name?" },
      AIMessage { content: "Your name is Bob." }
    ]
  },
  next: [],
  config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1f5b-6704-8004-820c16b69a5a' } },
  metadata: {
    source: 'loop',
    writes: { call_model: { messages: AIMessage { content: "Your name is Bob." } } },
    step: 4,
    parents: {},
    thread_id: '1'
  },
  createdAt: '2025-05-05T16:01:24.680462+00:00',
  parentConfig: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1790-6b0a-8003-baf965b6a38f' } },
  tasks: [],
  interrupts: []
}

查看线程的历史记录

const config = {
  configurable: {
    thread_id: "1",
  },
};
const history = [];
for await (const state of graph.getStateHistory(config)) {
  history.push(state);
}
[
  StateSnapshot {
    values: {
      messages: [
        HumanMessage { content: "hi! I'm bob" },
        AIMessage { content: "Hi Bob! How are you doing today? Is there anything I can help you with?" },
        HumanMessage { content: "what's my name?" },
        AIMessage { content: "Your name is Bob." }
      ]
    },
    next: [],
    config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1f5b-6704-8004-820c16b69a5a' } },
    metadata: { source: 'loop', writes: { call_model: { messages: AIMessage { content: "Your name is Bob." } } }, step: 4, parents: {}, thread_id: '1' },
    createdAt: '2025-05-05T16:01:24.680462+00:00',
    parentConfig: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1790-6b0a-8003-baf965b6a38f' } },
    tasks: [],
    interrupts: []
  },
  // ... 更多状态快照
]

将返回值与保存值解耦

使用 entrypoint.final 将返回给调用方的内容与检查点中保存的内容解耦。这在以下情况下非常有用:
  • 您想返回计算结果(例如,摘要或状态),但保存不同的内部值以供下次调用使用。
  • 您需要控制下次运行时传递给 previous 参数的内容。
import { entrypoint, MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const accumulate = entrypoint(
  { checkpointer, name: "accumulate" },
  async (n: number, previous?: number) => {
    const prev = previous || 0;
    const total = prev + n;
    // 将*先前*的值返回给调用方,但将*新*总计保存到检查点。
    return entrypoint.final({ value: prev, save: total });
  }
);

const config = { configurable: { thread_id: "my-thread" } };

console.log(await accumulate.invoke(1, config)); // 0
console.log(await accumulate.invoke(2, config)); // 1
console.log(await accumulate.invoke(3, config)); // 3

聊天机器人示例

使用函数式 API 和 InMemorySaver 检查点保存器的简单聊天机器人示例。 该机器人能够记住之前的对话并从上次中断的地方继续。
import { BaseMessage } from "@langchain/core/messages";
import {
  addMessages,
  entrypoint,
  task,
  MemorySaver,
} from "@langchain/langgraph";
import { ChatAnthropic } from "@langchain/anthropic";

const model = new ChatAnthropic({ model: "claude-sonnet-4-6" });

const callModel = task(
  "callModel",
  async (messages: BaseMessage[]): Promise<BaseMessage> => {
    const response = await model.invoke(messages);
    return response;
  }
);

const checkpointer = new MemorySaver();

const workflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (
    inputs: BaseMessage[],
    previous?: BaseMessage[]
  ): Promise<BaseMessage> => {
    let messages = inputs;
    if (previous) {
      messages = addMessages(previous, inputs);
    }

    const response = await callModel(messages);
    return entrypoint.final({
      value: response,
      save: addMessages(messages, response),
    });
  }
);

const config = { configurable: { thread_id: "1" } };
const inputMessage = { role: "user", content: "hi! I'm bob" };

for await (const chunk of await workflow.stream([inputMessage], {
  ...config,
  streamMode: "values",
})) {
  console.log(chunk.content);
}

const inputMessage2 = { role: "user", content: "what's my name?" };
for await (const chunk of await workflow.stream([inputMessage2], {
  ...config,
  streamMode: "values",
})) {
  console.log(chunk.content);
}

长期记忆

长期记忆 允许在不同线程 ID 之间存储信息。这对于在一个对话中学习有关给定用户的信息并在另一个对话中使用它非常有用。

工作流

  • 工作流和代理 指南,了解有关如何使用函数式 API 构建工作流的更多示例。

与其他库集成