Skip to main content
Pregel 实现了 LangGraph 的运行时,管理 LangGraph 应用程序的执行。 编译 StateGraph 或创建 @entrypoint 会生成一个 Pregel 实例,该实例可以接受输入并被调用。 本指南从较高层次解释运行时,并提供直接使用 Pregel 实现应用程序的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。

概述

在 LangGraph 中,Pregel 将ActorChannel组合成单个应用程序。Actor 从 Channel 读取数据并向 Channel 写入数据。Pregel 按照 Pregel 算法/批量同步并行模型将应用程序的执行组织成多个步骤。 每个步骤包含三个阶段:
  • 计划:确定在此步骤中执行哪些 Actor。例如,在第一步中,选择订阅特殊输入 Channel 的 Actor;在后续步骤中,选择订阅上一步更新的 Channel 的 Actor
  • 执行:并行执行所有选定的 Actor,直到所有 Actor 完成,或某个 Actor 失败,或达到超时。在此阶段,Channel 更新对 Actor 不可见,直到下一步。
  • 更新:使用此步骤中 Actor 写入的值更新 Channel。
重复以上过程,直到没有 Actor 被选中执行,或达到最大步骤数。

Actor

Actor 是一个 PregelNode。它订阅 Channel、从 Channel 读取数据并向 Channel 写入数据。可以将其视为 Pregel 算法中的一个 ActorPregelNodes 实现了 LangChain 的 Runnable 接口。

Channel

Channel 用于在 Actor(PregelNode)之间通信。每个 Channel 具有值类型、更新类型和更新函数——该函数接受一系列更新并修改存储的值。Channel 可用于将数据从一个链发送到另一个链,或在未来步骤中将数据发送给自身。LangGraph 提供了多种内置 Channel:
  • LastValue:默认 Channel,存储发送到 Channel 的最后一个值,适用于输入和输出值,或用于将数据从一步传递到下一步。
  • Topic:可配置的发布/订阅主题,适用于在 Actor 之间发送多个值或累积输出。可以配置为去重值或在多个步骤中累积值。
  • BinaryOperatorAggregate:存储持久值,通过将二元运算符应用于当前值和发送到 Channel 的每个更新来更新,适用于计算多个步骤的聚合;例如,total = BinaryOperatorAggregate(int, operator.add)

示例

虽然大多数用户会通过 StateGraph API 或 @entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。 以下是一些不同的示例,让您了解 Pregel API。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}

高层 API

LangGraph 提供两种用于创建 Pregel 应用程序的高层 API:StateGraph(图 API)函数式 API
StateGraph(图 API) 是一种简化 Pregel 应用程序创建的高层抽象。它允许您定义节点和边的图。编译图时,StateGraph API 会自动为您创建 Pregel 应用程序。
from typing import TypedDict

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: str | None
    score: float | None

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
builder.add_edge("write_essay", "score_essay")

# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
已编译的 Pregel 实例将与节点和 Channel 的列表相关联。您可以通过打印来检查节点和 Channel。
print(graph.nodes)
您将看到类似如下的内容:
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)
您应该看到类似如下的内容:
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}