Skip to main content
Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用程序的执行。 编译一个 StateGraph 或创建一个 入口点 会产生一个 Pregel 实例,该实例可以使用输入进行调用。 本指南从高层次解释了运行时,并提供了直接使用 Pregel 实现应用程序的说明。
注意: Pregel 运行时的名称来源于 Google 的 Pregel 算法,该算法描述了一种使用图进行大规模并行计算的高效方法。

概述

在 LangGraph 中,Pregel 将 参与者通道 结合成一个单一的应用程序。参与者 从通道读取数据并向通道写入数据。Pregel 将应用程序的执行组织成多个步骤,遵循 Pregel 算法/批量同步并行 模型。 每个步骤包含三个阶段:
  • 计划:确定此步骤要执行哪些 参与者。例如,在第一步中,选择订阅特殊 输入 通道的 参与者;在后续步骤中,选择订阅在上一步中更新的通道的 参与者
  • 执行:并行执行所有选定的 参与者,直到全部完成、某个失败或达到超时。在此阶段,通道更新对参与者不可见,直到下一步。
  • 更新:使用此步骤中 参与者 写入的值更新通道。
重复此过程,直到没有选定的 参与者 需要执行,或达到最大步骤数。

参与者

一个 参与者 是一个 PregelNode。它订阅通道,从通道读取数据,并向通道写入数据。它可以被视为 Pregel 算法中的一个 参与者PregelNodes 实现了 LangChain 的 Runnable 接口。

通道

通道用于参与者(PregelNodes)之间的通信。每个通道有一个值类型、一个更新类型和一个更新函数——该函数接受一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据从链发送到自身。

LastValue

LastValue 是默认的通道类型。它存储最后写入的值,覆盖任何先前的值。用于输入和输出值,或用于将数据从一个步骤传递到下一个步骤。
import { LastValue } from "@langchain/langgraph/channels";

const channel = new LastValue<number>();

Topic

Topic 是一个可配置的 PubSub 通道,适用于在参与者之间发送多个值或跨步骤累积输出。可以配置为对值进行去重,或累积运行期间写入的所有值。
import { Topic } from "@langchain/langgraph/channels";

// 累积跨步骤写入的所有值
const channel = new Topic<string>({ accumulate: true });

BinaryOperatorAggregate

BinaryOperatorAggregate 存储一个持久值,该值通过将二元运算符应用于当前值和每个新更新来更新。用于跨步骤计算运行聚合。
import { BinaryOperatorAggregate } from "@langchain/langgraph/channels";

// 运行总计:每次写入都添加到当前值
const total = new BinaryOperatorAggregate<number>({ operator: (a, b) => a + b });

示例

虽然大多数用户将通过 StateGraph API 或 入口点 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。 以下是一些不同的示例,让您了解 Pregel API。
import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";

const node1 = new NodeBuilder()
  .subscribeOnly("a")
  .do((x: string) => x + x)
  .writeTo("b");

const app = new Pregel({
  nodes: { node1 },
  channels: {
    a: new EphemeralValue<string>(),
    b: new EphemeralValue<string>(),
  },
  inputChannels: ["a"],
  outputChannels: ["b"],
});

await app.invoke({ a: "foo" });
{ b: 'foofoo' }

高级 API

LangGraph 提供了两个用于创建 Pregel 应用程序的高级 API:StateGraph (Graph API)Functional API
StateGraph (Graph API) 是一个更高级别的抽象,简化了 Pregel 应用程序的创建。它允许您定义节点和边的图。当您编译图时,StateGraph API 会自动为您创建 Pregel 应用程序。
import { START, StateGraph } from "@langchain/langgraph";

interface Essay {
  topic: string;
  content?: string;
  score?: number;
}

const writeEssay = (essay: Essay) => {
  return {
    content: `Essay about ${essay.topic}`,
  };
};

const scoreEssay = (essay: Essay) => {
  return {
    score: 10
  };
};

const builder = new StateGraph<Essay>({
  channels: {
    topic: null,
    content: null,
    score: null,
  }
})
  .addNode("writeEssay", writeEssay)
  .addNode("scoreEssay", scoreEssay)
  .addEdge(START, "writeEssay")
  .addEdge("writeEssay", "scoreEssay");

// 编译图。
// 这将返回一个 Pregel 实例。
const graph = builder.compile();
编译后的 Pregel 实例将与节点和通道的列表相关联。您可以通过打印它们来检查节点和通道。
console.log(graph.nodes);
您将看到类似这样的内容:
{
  __start__: PregelNode { ... },
  writeEssay: PregelNode { ... },
  scoreEssay: PregelNode { ... }
}
console.log(graph.channels);
您应该看到类似这样的内容
{
  topic: LastValue { ... },
  content: LastValue { ... },
  score: LastValue { ... },
  __start__: EphemeralValue { ... },
  writeEssay: EphemeralValue { ... },
  scoreEssay: EphemeralValue { ... },
  'branch:__start__:__self__:writeEssay': EphemeralValue { ... },
  'branch:__start__:__self__:scoreEssay': EphemeralValue { ... },
  'branch:writeEssay:__self__:writeEssay': EphemeralValue { ... },
  'branch:writeEssay:__self__:scoreEssay': EphemeralValue { ... },
  'branch:scoreEssay:__self__:writeEssay': EphemeralValue { ... },
  'branch:scoreEssay:__self__:scoreEssay': EphemeralValue { ... },
  'start:writeEssay': EphemeralValue { ... }
}