Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用程序的执行。
编译一个 StateGraph 或创建一个 @entrypoint 会产生一个 Pregel 实例,该实例可以使用输入进行调用。
本指南从高层次解释了运行时,并提供了直接使用 Pregel 实现应用程序的说明。
注意: Pregel 运行时的名称来源于 Google 的 Pregel 算法 ,该算法描述了一种使用图进行大规模并行计算的高效方法。
在 LangGraph 中,Pregel 将 参与者 和 通道 结合成一个单一的应用程序。参与者 从通道读取数据并向通道写入数据。Pregel 将应用程序的执行组织成多个步骤,遵循 Pregel 算法 /批量同步并行 模型。
每个步骤包含三个阶段:
计划 :确定此步骤要执行哪些 参与者 。例如,在第一步中,选择订阅特殊 输入 通道的 参与者 ;在后续步骤中,选择订阅在上一步中更新的通道的 参与者 。
执行 :并行执行所有选定的 参与者 ,直到全部完成、某个失败或达到超时。在此阶段,通道更新对参与者不可见,直到下一步。
更新 :使用此步骤中 参与者 写入的值更新通道。
重复此过程,直到没有选定的 参与者 需要执行,或达到最大步骤数。
参与者
一个 参与者 是一个 PregelNode。它订阅通道,从通道读取数据,并向通道写入数据。它可以被视为 Pregel 算法中的一个 参与者 。PregelNodes 实现了 LangChain 的 Runnable 接口。
通道用于在参与者(PregelNodes)之间进行通信。每个通道有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据从链发送到自身。
LastValue
LastValue 是默认的通道类型。它存储最后写入的值,覆盖任何先前的值。用于输入和输出值,或在步骤之间传递数据。
from langgraph . channels import LastValue
channel : LastValue [ int ] = LastValue ( int )
Topic
Topic 是一个可配置的发布订阅通道,适用于在参与者之间发送多个值或跨步骤累积输出。可以配置为对值进行去重,或累积运行期间写入的所有值。
from langgraph . channels import Topic
# 累积跨步骤写入的所有值
channel : Topic [ str ] = Topic ( str , accumulate = True )
BinaryOperatorAggregate
BinaryOperatorAggregate 存储一个持久值,该值通过将二元运算符应用于当前值和每个新更新来更新。用于计算跨步骤的运行聚合。
import operator
from langgraph . channels import BinaryOperatorAggregate
# 运行总计:每次写入都添加到当前值
total = BinaryOperatorAggregate ( int , operator . add )
DeltaChannel (beta)
DeltaChannel 需要 langgraph>=1.2,目前处于 beta 版本。API 可能在未来版本中更改。
DeltaChannel 在每个步骤仅存储增量变化,而不是完整的累积值。这对于频繁写入且随时间累积大量值的通道最有用——例如,长时间运行线程中的对话消息列表。没有增量存储时,完整列表会在每个检查点重新序列化;使用 DeltaChannel,则仅存储每个步骤写入的新消息。
当通道既频繁写入又随时间增长时,请考虑使用 DeltaChannel。一个好迹象是:如果你注意到某个通道的检查点大小随线程长度线性增长,那么 DeltaChannel 可能很适合。
在 Annotated 类型注解中使用 DeltaChannel,就像使用普通归约器一样:
from typing import Annotated , Sequence
from typing_extensions import TypedDict
from langgraph . channels import DeltaChannel
def my_reducer ( state : list [ str ], writes : Sequence [ list [ str ]]) -> list [ str ]:
result = list ( state )
for write in writes :
result . extend ( write )
return result
class State ( TypedDict ):
messages : Annotated [ list [ str ], DeltaChannel ( my_reducer )]
批量归约器要求
传递给 DeltaChannel 的 reducer 是一个 批量归约器 :它在单次调用中接收当前状态和当前步骤所有写入的 序列 ——而不是像标准归约器那样成对处理。这与在 StateGraph 中与 Annotated 一起使用的按键归约器不同,后者每次更新调用一次。
批量归约器 必须是可结合的 (批处理不变): reducer(reducer(state, [xs]), [ys]) == reducer(state, [xs, ys])
如果你的归约器不是可结合的,重建的状态可能会因 LangGraph 如何跨步骤批处理写入而异,导致不一致的行为。
以下是两种最常见情况的批量归约器:
from typing import Any , Sequence
# 列表:按顺序追加所有写入
def list_reducer ( state : list [ Any ], writes : Sequence [ list [ Any ]]) -> list [ Any ]:
result = list ( state )
for write in writes :
result . extend ( write )
return result
# 字典:合并所有写入,键冲突时最后一次写入生效
def dict_reducer (
state : dict [ str , Any ], writes : Sequence [ dict [ str , Any ]]
) -> dict [ str , Any ]:
result = dict ( state )
for write in writes :
result . update ( write )
return result
两者都是可结合的:逐批应用与一起应用产生相同的结果。
使用 snapshot_frequency 控制有界读取延迟
没有快照时,读取 DeltaChannel 值需要重放完整的写入历史——对于有 N 个步骤的线程,复杂度为 O(N)。设置 snapshot_frequency=K 会每 K 个 pregel 步骤写入一个完整快照,将读取深度限制为最多 K 个步骤:
class State ( TypedDict ):
messages : Annotated [
list [ str ],
DeltaChannel ( my_reducer , snapshot_frequency = 5 ),
]
较高的 snapshot_frequency 值会减少存储开销,但会增加读取延迟。较低的值以更大的检查点为代价更严格地限制延迟。None(默认值)完全跳过快照——适用于读取很少或线程较短的情况。
虽然大多数用户将通过 StateGraph API 或 @entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是一些不同的示例,让你了解 Pregel API。
单节点
多节点
Topic
BinaryOperatorAggregate
循环
from langgraph . channels import EphemeralValue
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" )
)
app = Pregel (
nodes = { "node1" : node1 },
channels = {
"a" : EphemeralValue ( str ),
"b" : EphemeralValue ( str ),
},
input_channels = [ "a" ],
output_channels = [ "b" ],
)
app . invoke ({ "a" : "foo" })
from langgraph . channels import LastValue , EphemeralValue
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" )
)
node2 = (
NodeBuilder (). subscribe_only ( "b" )
. do ( lambda x : x + x )
. write_to ( "c" )
)
app = Pregel (
nodes = { "node1" : node1 , "node2" : node2 },
channels = {
"a" : EphemeralValue ( str ),
"b" : LastValue ( str ),
"c" : EphemeralValue ( str ),
},
input_channels = [ "a" ],
output_channels = [ "b" , "c" ],
)
app . invoke ({ "a" : "foo" })
{'b': 'foofoo', 'c': 'foofoofoofoo'}
from langgraph . channels import EphemeralValue , Topic
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" , "c" )
)
node2 = (
NodeBuilder (). subscribe_to ( "b" )
. do ( lambda x : x [ " b " ] + x [ " b " ])
. write_to ( "c" )
)
app = Pregel (
nodes = { "node1" : node1 , "node2" : node2 },
channels = {
"a" : EphemeralValue ( str ),
"b" : EphemeralValue ( str ),
"c" : Topic ( str , accumulate = True ),
},
input_channels = [ "a" ],
output_channels = [ "c" ],
)
app . invoke ({ "a" : "foo" })
{'c': ['foofoo', 'foofoofoofoo']}
此示例演示如何使用 BinaryOperatorAggregate 通道实现归约器。 from langgraph . channels import EphemeralValue , BinaryOperatorAggregate
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" , "c" )
)
node2 = (
NodeBuilder (). subscribe_only ( "b" )
. do ( lambda x : x + x )
. write_to ( "c" )
)
def reducer ( current , update ):
if current :
return current + " | " + update
else :
return update
app = Pregel (
nodes = { "node1" : node1 , "node2" : node2 },
channels = {
"a" : EphemeralValue ( str ),
"b" : EphemeralValue ( str ),
"c" : BinaryOperatorAggregate ( str , operator = reducer ),
},
input_channels = [ "a" ],
output_channels = [ "c" ],
)
app . invoke ({ "a" : "foo" })
此示例演示如何在图中引入循环,方法是让一个链写入它订阅的通道。执行将继续,直到向通道写入 None 值。 from langgraph . channels import EphemeralValue
from langgraph . pregel import Pregel , NodeBuilder , ChannelWriteEntry
example_node = (
NodeBuilder (). subscribe_only ( "value" )
. do ( lambda x : x + x if len ( x ) < 10 else None )
. write_to ( ChannelWriteEntry ( "value" , skip_none = True ))
)
app = Pregel (
nodes = { "example_node" : example_node },
channels = {
"value" : EphemeralValue ( str ),
},
input_channels = [ "value" ],
output_channels = [ "value" ],
)
app . invoke ({ "value" : "a" })
{'value': 'aaaaaaaaaaaaaaaa'}
高级 API
LangGraph 提供了两个用于创建 Pregel 应用程序的高级 API:StateGraph (Graph API) 和 Functional API 。
StateGraph (Graph API)
Functional API
StateGraph (Graph API) 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用程序。from typing import TypedDict
from langgraph . constants import START
from langgraph . graph import StateGraph
class Essay ( TypedDict ):
topic : str
content : str | None
score : float | None
def write_essay ( essay : Essay ):
return {
"content" : f "Essay about { essay [ ' topic ' ] } " ,
}
def score_essay ( essay : Essay ):
return {
"score" : 10
}
builder = StateGraph ( Essay )
builder . add_node ( write_essay )
builder . add_node ( score_essay )
builder . add_edge ( START , "write_essay" )
builder . add_edge ( "write_essay" , "score_essay" )
# 编译图。
# 这将返回一个 Pregel 实例。
graph = builder . compile ()
编译后的 Pregel 实例将与一组节点和通道相关联。你可以通过打印它们来检查节点和通道。 你会看到类似这样的内容: {'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
你应该会看到类似这样的内容 {'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
在 Functional API 中,你可以使用 @entrypoint 创建 Pregel 应用程序。entrypoint 装饰器允许你定义一个接收输入并返回输出的函数。 from typing import TypedDict
from langgraph . checkpoint . memory import InMemorySaver
from langgraph . func import entrypoint
class Essay ( TypedDict ):
topic : str
content : str | None
score : float | None
checkpointer = InMemorySaver ()
@entrypoint ( checkpointer = checkpointer )
def write_essay ( essay : Essay ):
return {
"content" : f "Essay about { essay [ ' topic ' ] } " ,
}
print ( "Nodes: " )
print ( write_essay . nodes )
print ( "Channels: " )
print ( write_essay . channels )
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}
将这些文档 通过 MCP 连接到 Claude、VSCode 等,以获取实时答案。