LangGraph 的核心是将智能体工作流建模为图。您可以使用三个关键组件定义智能体的行为:
-
State:表示应用程序当前快照的共享数据结构。它可以是任何数据类型,但通常使用共享状态模式定义。
-
Nodes:编码智能体逻辑的函数。它们接收当前状态作为输入,执行一些计算或副作用,并返回更新后的状态。
-
Edges:根据当前状态确定下一个要执行的 Node 的函数。它们可以是条件分支或固定转换。
通过组合 Nodes 和 Edges,您可以创建复杂的、循环的工作流,使状态随时间演变。然而,真正的威力来自于 LangGraph 如何管理该状态。
需要强调的是:Nodes 和 Edges 不过是函数——它们可以包含 LLM,也可以只是普通的代码。
简而言之:节点执行工作,边决定下一步做什么。
LangGraph 的底层图算法使用消息传递来定义通用程序。当一个节点完成其操作时,它会沿着一条或多条边向其他节点发送消息。这些接收节点然后执行它们的函数,将结果消息传递给下一组节点,该过程继续进行。受 Google 的 Pregel 系统启发,程序以离散的“超级步骤”进行。
一个超级步骤可以被视为对图节点的一次迭代。并行运行的节点属于同一个超级步骤,而顺序运行的节点属于不同的超级步骤。在图执行开始时,所有节点都处于 inactive(非活动)状态。当节点在其任何传入边(或“通道”)上接收到新消息(状态)时,它变为 active(活动)状态。活动节点然后运行其函数并响应更新。在每个超级步骤结束时,没有传入消息的节点通过将自身标记为 inactive 来投票 halt(停止)。当所有节点都处于 inactive 状态且没有消息在传输中时,图执行终止。
StateGraph
StateGraph 类是要使用的主要图类。它由用户定义的 State 对象参数化。
编译您的图
要构建您的图,您首先定义状态,然后添加节点和边,然后编译它。编译图到底是什么,为什么需要它?
编译是一个相当简单的步骤。它对图的结构提供一些基本检查(例如没有孤立节点等)。它也是您可以指定运行时参数(如检查点器和断点)的地方。您只需调用 .compile 方法来编译您的图:
graph = graph_builder.compile(...)
定义图时,首先要定义图的 State(状态)。State 包括图的模式以及指定如何将更新应用于状态的reducer 函数。State 的模式将是图中所有 Nodes 和 Edges 的输入模式,并且可以是 TypedDict 或 Pydantic 模型。所有 Nodes 都会向 State 发出更新,然后使用指定的 reducer 函数应用这些更新。
指定图模式的主要文档化方法是使用 TypedDict。如果您想在状态中提供默认值,请使用 dataclass。如果您想要递归数据验证,我们还支持使用 Pydantic BaseModel 作为您的图状态(但请注意,Pydantic 的性能不如 TypedDict 或 dataclass)。
默认情况下,图将具有相同的输入和输出模式。如果您想更改此设置,也可以直接指定显式的输入和输出模式。当您有很多键,并且其中一些明确用于输入而另一些用于输出时,这非常有用。有关更多信息,请参阅指南。
多种模式
通常,所有图节点都使用单一模式进行通信。这意味着它们将读写相同的状态通道。但是,在某些情况下,我们希望对此有更多控制:
- 内部节点可以传递图输入/输出不需要的信息。
- 我们可能还想为图使用不同的输入/输出模式。例如,输出可能只包含一个相关的输出键。
可以在图内部让节点写入私有状态通道以进行内部节点通信。我们可以简单地定义一个私有模式 PrivateState。
也可以为图定义显式的输入和输出模式。在这些情况下,我们定义一个包含图操作所有相关键的“内部”模式。但是,我们还定义 input 和 output 模式,它们是“内部”模式的子集,以约束图的输入和输出。有关更多详细信息,请参阅定义输入和输出模式。
让我们看一个例子:
class InputState(TypedDict):
user_input: str
class OutputState(TypedDict):
graph_output: str
class OverallState(TypedDict):
foo: str
user_input: str
graph_output: str
class PrivateState(TypedDict):
bar: str
def node_1(state: InputState) -> OverallState:
# 写入 OverallState
return {"foo": state["user_input"] + " name"}
def node_2(state: OverallState) -> PrivateState:
# 从 OverallState 读取,写入 PrivateState
return {"bar": state["foo"] + " is"}
def node_3(state: PrivateState) -> OutputState:
# 从 PrivateState 读取,写入 OutputState
return {"graph_output": state["bar"] + " Lance"}
builder = StateGraph(OverallState,input_schema=InputState,output_schema=OutputState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", "node_3")
builder.add_edge("node_3", END)
graph = builder.compile()
graph.invoke({"user_input":"My"})
# {'graph_output': 'My name is Lance'}
这里有两点微妙而重要的注意事项:
-
我们将
state: InputState 作为输入模式传递给 node_1。但是,我们写入 foo,这是 OverallState 中的一个通道。我们如何写入输入模式中未包含的状态通道?这是因为节点_可以写入图状态中的任何状态通道。_ 图状态是初始化时定义的状态通道的并集,其中包括 OverallState 以及过滤器 InputState 和 OutputState。
-
我们使用以下方式初始化图:
StateGraph(
OverallState,
input_schema=InputState,
output_schema=OutputState
)
我们如何在 node_2 中写入 PrivateState?如果 PrivateState 没有在 StateGraph 初始化中传递,图如何获得对该模式的访问权限?
我们可以这样做,因为 _nodes 也可以声明额外的状态通道_,只要状态模式定义存在。在这种情况下,PrivateState 模式已定义,因此我们可以将 bar 添加为图中的新状态通道并写入它。
Reducer
Reducer 对于理解节点更新如何应用于 State 至关重要。State 中的每个键都有其自己独立的 reducer 函数。如果没有显式指定 reducer 函数,则假定对该键的所有更新都应覆盖它。有几种不同类型的 reducer,从默认类型的 reducer 开始:
默认 reducer
这两个示例展示了如何使用默认 reducer:
from typing_extensions import TypedDict
class State(TypedDict):
foo: int
bar: list[str]
在此示例中,没有为任何键指定 reducer 函数。假设图的输入是:
{"foo": 1, "bar": ["hi"]}。然后假设第一个 Node 返回 {"foo": 2}。这被视为对状态的更新。请注意,Node 不需要返回整个 State 模式——只需一个更新。应用此更新后,State 将变为 {"foo": 2, "bar": ["hi"]}。如果第二个节点返回 {"bar": ["bye"]},则 State 将变为 {"foo": 2, "bar": ["bye"]}。
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
在此示例中,我们使用 Annotated 类型为第二个键 (bar) 指定 reducer 函数 (operator.add)。请注意,第一个键保持不变。假设图的输入是 {"foo": 1, "bar": ["hi"]}。然后假设第一个 Node 返回 {"foo": 2}。这被视为对状态的更新。请注意,Node 不需要返回整个 State 模式——只需一个更新。应用此更新后,State 将变为 {"foo": 2, "bar": ["hi"]}。如果第二个节点返回 {"bar": ["bye"]},则 State 将变为 {"foo": 2, "bar": ["hi", "bye"]}。请注意,这里 bar 键通过将两个列表相加来更新。
在图状态中处理消息
为什么使用消息?
大多数现代 LLM 提供商都有一个聊天模型接口,该接口接受消息列表作为输入。LangChain 的聊天模型接口尤其接受消息对象列表作为输入。这些消息有多种形式,例如 HumanMessage(用户输入)或 AIMessage(LLM 响应)。
要了解更多关于消息对象的信息,请参阅消息概念指南。
在图中使用消息
在许多情况下,将先前的对话历史记录作为消息列表存储在图状态中很有帮助。为此,我们可以在图状态中添加一个键(通道)来存储 Message 对象列表,并使用 reducer 函数对其进行注释(请参阅下面示例中的 messages 键)。reducer 函数对于告诉图如何使用每个状态更新(例如,当节点发送更新时)更新状态中的 Message 对象列表至关重要。如果您不指定 reducer,每个状态更新都会用最近提供的值覆盖消息列表。如果您只想将消息附加到现有列表,可以使用 operator.add 作为 reducer。
但是,您可能还想手动更新图状态中的消息(例如,人机回环)。如果您使用 operator.add,您发送给图的手动状态更新将被附加到现有消息列表中,而不是更新现有消息。为了避免这种情况,您需要一个可以跟踪消息 ID 并在更新时覆盖现有消息的 reducer。为此,您可以使用预构建的 add_messages 函数。对于全新消息,它只会附加到现有列表,但它也会正确处理现有消息的更新。
序列化
除了跟踪消息 ID 之外,add_messages 函数还会在收到 messages 通道上的状态更新时,尝试将消息反序列化为 LangChain Message 对象。
有关更多信息,请参阅 LangChain 序列化/反序列化。这允许以以下格式发送图输入/状态更新:
# 这是受支持的
{"messages": [HumanMessage(content="message")]}
# 这也是受支持的
{"messages": [{"type": "human", "content": "message"}]}
由于在使用 add_messages 时,状态更新总是被反序列化为 LangChain Messages,因此您应该使用点表示法来访问消息属性,例如 state["messages"][-1].content。
下面是一个使用 add_messages 作为其 reducer 函数的图示例。
from langchain.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict
class GraphState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
MessagesState
由于在状态中拥有消息列表非常常见,因此存在一个预构建的状态 MessagesState,可以轻松使用消息。MessagesState 定义了一个单一的 messages 键,该键是 AnyMessage 对象的列表,并使用 add_messages reducer。通常,除了消息之外,还有更多状态需要跟踪,因此我们看到人们会子类化此状态并添加更多字段,例如:
from langgraph.graph import MessagesState
class State(MessagesState):
documents: list[str]
在 LangGraph 中,节点是 Python 函数(同步或异步),接受以下参数:
state—图的状态
config—一个 RunnableConfig 对象,包含配置信息(如 thread_id)和跟踪信息(如 tags)
runtime—一个 Runtime 对象,包含运行时 context 以及其他信息,如 store、stream_writer、execution_info 和 server_info
类似于 NetworkX,您可以使用 add_node 方法将这些节点添加到图中:
from dataclasses import dataclass
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
class State(TypedDict):
input: str
results: str
@dataclass
class Context:
user_id: str
builder = StateGraph(State)
def plain_node(state: State):
return state
def node_with_runtime(state: State, runtime: Runtime[Context]):
print("In node: ", runtime.context.user_id)
return {"results": f"Hello, {state['input']}!"}
def node_with_execution_info(state: State, runtime: Runtime):
print("In node with thread_id: ", runtime.execution_info.thread_id)
return {"results": f"Hello, {state['input']}!"}
builder.add_node("plain_node", plain_node)
builder.add_node("node_with_runtime", node_with_runtime)
builder.add_node("node_with_execution_info", node_with_execution_info)
...
在幕后,函数被转换为 RunnableLambda,它为您的函数添加了批处理和异步支持,以及原生跟踪和调试。
如果您在添加节点时未指定名称,它将被赋予一个默认名称,该名称等同于函数名称。
builder.add_node(my_node)
# 然后您可以通过引用 `"my_node"` 来创建到此节点的边
START 节点
START 节点是一个特殊节点,表示将用户输入发送到图的节点。引用此节点的主要目的是确定应首先调用哪些节点。
from langgraph.graph import START
graph.add_edge(START, "node_a")
END 节点
END 节点是一个特殊节点,表示终端节点。当您想表示哪些边在完成后没有操作时,会引用此节点。
from langgraph.graph import END
graph.add_edge("node_a", END)
节点缓存
LangGraph 支持基于节点输入缓存任务/节点。要使用缓存:
- 在编译图时指定缓存(或指定入口点)
- 为节点指定缓存策略。每个缓存策略支持:
key_func 用于根据节点输入生成缓存键,默认使用 pickle 对输入进行 hash。
ttl,缓存的生存时间(以秒为单位)。如果未指定,缓存将永不过期。
例如:
import time
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
class State(TypedDict):
x: int
result: int
builder = StateGraph(State)
def expensive_node(state: State) -> dict[str, int]:
# 昂贵的计算
time.sleep(2)
return {"result": state["x"] * 2}
builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))
builder.set_entry_point("expensive_node")
builder.set_finish_point("expensive_node")
graph = builder.compile(cache=InMemoryCache())
print(graph.invoke({"x": 5}, stream_mode='updates'))
# [{'expensive_node': {'result': 10}}]
print(graph.invoke({"x": 5}, stream_mode='updates'))
# [{'expensive_node': {'result': 10}, '__metadata__': {'cached': True}}]
- 第一次运行需要两秒钟(由于模拟的昂贵计算)。
- 第二次运行利用缓存并快速返回。
边定义了逻辑如何路由以及图如何决定停止。这是智能体工作方式以及不同节点如何相互通信的重要组成部分。边有几种关键类型:
- 普通边:直接从一个节点到下一个节点。
- 条件边:调用函数以确定下一个要转到的节点。
- 入口点:用户输入到达时首先调用的节点。
- 条件入口点:调用函数以确定用户输入到达时首先调用的节点。
一个节点可以有多个传出边。如果一个节点有多个传出边,所有这些目标节点将作为下一个超级步骤的一部分并行执行。
普通边
如果您总是想从节点 A 转到节点 B,可以直接使用 add_edge 方法。
graph.add_edge("node_a", "node_b")
条件边
如果您想可选地路由到一个或多个边(或可选地终止),可以使用 add_conditional_edges 方法。此方法接受节点名称和在该节点执行后调用的“路由函数”:
graph.add_conditional_edges("node_a", routing_function)
类似于节点,routing_function 接受图的当前 state 并返回一个值。
默认情况下,routing_function 的返回值用作下一个要发送状态的节点(或节点列表)的名称。所有这些节点将作为下一个超级步骤的一部分并行运行。
您可以选择提供一个字典,将 routing_function 的输出映射到下一个节点的名称。
graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})
如果您想在一个函数中结合状态更新和路由,请使用 Command 代替条件边。
入口点
入口点是图启动时首先运行的节点。您可以使用虚拟 START 节点的 add_edge 方法来指定进入图的位置。
from langgraph.graph import START
graph.add_edge(START, "node_a")
条件入口点
条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点的 add_conditional_edges 来实现这一点。
from langgraph.graph import START
graph.add_conditional_edges(START, routing_function)
您可以选择提供一个字典,将 routing_function 的输出映射到下一个节点的名称。
graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})
Send
默认情况下,Nodes 和 Edges 是预先定义的,并在相同的共享状态上操作。但是,在某些情况下,确切的边可能不是预先知道的,和/或您可能希望同时存在不同版本的 State。一个常见的例子是map-reduce 设计模式。在此设计模式中,第一个节点可能生成一个对象列表,并且您可能希望将某个其他节点应用于所有这些对象。对象的数量可能预先未知(意味着边的数量可能未知),并且下游 Node 的输入 State 应该不同(每个生成的对象一个)。
为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:第一个是节点的名称,第二个是要传递给该节点的状态。
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
graph.add_conditional_edges("node_a", continue_to_jokes)
Command
Command 是一个多功能的原语,用于控制图执行。它接受四个参数:
update:应用状态更新(类似于从节点返回更新)。
goto:导航到特定节点(类似于条件边)。
graph:在从子图导航时定位父图。
resume:在中断后提供一个值以恢复执行。
Command 在三种上下文中使用:
从节点返回
update 和 goto
从节点函数返回 Command 以更新状态并路由到下一个节点:
def my_node(state: State) -> Command[Literal["my_other_node"]]:
return Command(
# 状态更新
update={"foo": "bar"},
# 控制流
goto="my_other_node"
)
使用 Command 您还可以实现动态控制流行为(与条件边相同):
def my_node(state: State) -> Command[Literal["my_other_node"]]:
if state["foo"] == "bar":
return Command(update={"foo": "baz"}, goto="my_other_node")
当您需要同时更新状态并路由到不同节点时,请使用 Command。如果您只需要路由而不需要更新状态,请改用条件边。
当在节点函数中返回 Command 时,您必须添加带有节点路由到的节点名称列表的返回类型注释,例如 Command[Literal["my_other_node"]]。这对于图渲染是必要的,并告诉 LangGraph my_node 可以导航到 my_other_node。
Command 仅添加动态边——使用 add_edge / addEdge 定义的静态边仍会执行。例如,如果 node_a 返回 Command(goto="my_other_node") 并且您还有 graph.add_edge("node_a", "node_b"),则 node_b 和 my_other_node 都将运行。
查看此操作指南 以获取如何使用 Command 的端到端示例。
graph
如果您正在使用子图,您可以通过在 Command 中指定 graph=Command.PARENT 来从子图内的节点导航到父图中的不同节点:
def my_node(state: State) -> Command[Literal["other_subgraph"]]:
return Command(
update={"foo": "bar"},
goto="other_subgraph", # 其中 `other_subgraph` 是父图中的一个节点
graph=Command.PARENT
)
将 graph 设置为 Command.PARENT 将导航到最近的父图。当您从子图节点向父图节点发送更新时,如果该键由父图和子图状态模式共享,您必须为父图状态中要更新的键定义一个reducer。请参阅此示例。
这在实现多智能体交接时特别有用。请查看导航到父图中的节点 以获取详细信息。
输入到 invoke 或 stream
Command(resume=...) 是唯一旨在作为 invoke()/stream() 输入的 Command 模式。不要使用 Command(update=...) 作为输入来继续多轮对话——因为传递任何 Command 作为输入都会从最新检查点(即最后运行的步骤,而不是 __start__)恢复,如果图已经完成,它将看起来卡住。要在现有线程上继续对话,请传递一个普通的输入字典:# 错误 - 图从最新检查点恢复
# (最后运行的步骤),看起来卡住
graph.invoke(Command(update={
"messages": [{"role": "user", "content": "follow up"}]
}), config)
# 正确 - 普通字典从 __start__ 重新开始
graph.invoke( {
"messages": [{"role": "user", "content": "follow up"}]
}, config)
resume
使用 Command(resume=...) 提供一个值并在中断后恢复图执行。传递给 resume 的值将成为暂停节点内 interrupt() 调用的返回值:
from langgraph.types import Command, interrupt
def human_review(state: State):
# 暂停图并等待一个值
answer = interrupt("Do you approve?")
return {"messages": [{"role": "user", "content": answer}]}
# 第一次调用 - 触发中断并暂停
result = graph.invoke({"messages": [...]}, config)
# 使用值恢复 - interrupt() 调用返回 "yes"
result = graph.invoke(Command(resume="yes"), config)
查看中断概念指南 以获取有关中断模式的完整详细信息,包括多个中断和验证循环。
从工具返回
您可以从工具返回 Command 以更新图状态和控制流。使用 update 修改状态(例如,保存在对话期间查找的客户信息),并在工具完成后使用 goto 路由到特定节点。
当在工具内部使用时,goto 会添加一个动态边——在调用工具的节点上定义的任何静态边仍将执行。
有关详细信息,请参阅在工具内部使用。
图迁移
LangGraph 可以轻松处理图定义(节点、边和状态)的迁移,即使使用检查点器来跟踪状态也是如此。
- 对于图末尾的线程(即未中断),您可以更改图的整个拓扑(即所有节点和边,删除、添加、重命名等)
- 对于当前中断的线程,我们支持除重命名/删除节点之外的所有拓扑更改(因为该线程可能即将进入一个不再存在的节点)——如果这是一个障碍,请联系我们,我们可以优先考虑解决方案。
- 对于修改状态,我们对添加和删除键具有完全的向后和向前兼容性
- 重命名的状态键在现有线程中会丢失其保存的状态
- 以不兼容方式更改类型的状态键目前可能导致在更改前具有状态的线程中出现问题——如果这是一个障碍,请联系我们,我们可以优先考虑解决方案。
运行时上下文
创建图时,您可以为传递给节点的运行时上下文指定 context_schema。这对于向节点传递不属于图状态的信息非常有用。例如,您可能想传递依赖项,如模型名称或数据库连接。
@dataclass
class ContextSchema:
llm_provider: str = "openai"
graph = StateGraph(State, context_schema=ContextSchema)
然后,您可以使用 invoke 方法的 context 参数将此上下文传递到图中。
graph.invoke(inputs, context={"llm_provider": "anthropic"})
然后,您可以在节点或条件边中访问和使用此上下文:
from langgraph.runtime import Runtime
def node_a(state: State, runtime: Runtime[ContextSchema]):
llm = get_llm(runtime.context.llm_provider)
# ...
请参阅添加运行时配置 以获取配置的完整细分。
递归限制
递归限制设置了图在单次执行期间可以执行的超级步骤的最大数量。达到限制后,LangGraph 将引发 GraphRecursionError。从版本 1.0.6 开始,默认递归限制设置为 1000 步。递归限制可以在任何图上运行时设置,并通过配置字典传递给 invoke/stream。重要的是,recursion_limit 是一个独立的 config 键,不应像所有其他用户定义的配置一样传递到 configurable 键内。请参阅下面的示例:
graph.invoke(inputs, config={"recursion_limit": 5}, context={"llm": "anthropic"})
阅读递归限制 以了解递归限制的工作原理。
访问和处理递归计数器
当前步骤计数器可在任何节点内的 config["metadata"]["langgraph_step"] 中访问,允许在达到递归限制之前主动处理递归。这使您能够在图逻辑中实现优雅的降级策略。
工作原理
步骤计数器存储在 config["metadata"]["langgraph_step"] 中。递归限制检查遵循逻辑:step > stop,其中 stop = step + recursion_limit + 1。当超过限制时,LangGraph 会引发 GraphRecursionError。
访问当前步骤计数器
您可以在任何节点内访问当前步骤计数器以监视执行进度。
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
def my_node(state: dict, config: RunnableConfig) -> dict:
current_step = config["metadata"]["langgraph_step"]
print(f"Currently on step: {current_step}")
return state
主动递归处理
LangGraph 提供了一个 RemainingSteps 管理值,用于跟踪在达到递归限制之前剩余的步骤数。这允许在图内进行优雅的降级。
from typing import Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.managed import RemainingSteps
class State(TypedDict):
messages: Annotated[list, lambda x, y: x + y]
remaining_steps: RemainingSteps # 管理值 - 跟踪直到限制的步骤
def reasoning_node(state: State) -> dict:
# RemainingSteps 由 LangGraph 自动填充
remaining = state["remaining_steps"]
# 检查步骤是否不足
if remaining <= 2:
return {"messages": ["Approaching limit, wrapping up..."]}
# 正常处理
return {"messages": ["thinking..."]}
def route_decision(state: State) -> Literal["reasoning_node", "fallback_node"]:
"""基于剩余步骤路由"""
if state["remaining_steps"] <= 2:
return "fallback_node"
return "reasoning_node"
def fallback_node(state: State) -> dict:
"""处理递归限制接近的情况"""
return {"messages": ["Reached complexity limit, providing best effort answer"]}
# 构建图
builder = StateGraph(State)
builder.add_node("reasoning_node", reasoning_node)
builder.add_node("fallback_node", fallback_node)
builder.add_edge(START, "reasoning_node")
builder.add_conditional_edges("reasoning_node", route_decision)
builder.add_edge("fallback_node", END)
graph = builder.compile()
# RemainingSteps 适用于任何 recursion_limit
result = graph.invoke({"messages": []}, {"recursion_limit": 10})
主动与被动方法
处理递归限制主要有两种方法:主动(在图内监控)和被动(在外部捕获错误)。
from typing import Annotated, Literal, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed import RemainingSteps
from langgraph.errors import GraphRecursionError
class State(TypedDict):
messages: Annotated[list, lambda x, y: x + y]
remaining_steps: RemainingSteps
# 主动方法(推荐) - 使用 RemainingSteps
def agent_with_monitoring(state: State) -> dict:
"""主动监控并在图内处理递归"""
remaining = state["remaining_steps"]
# 早期检测 - 路由到内部处理
if remaining <= 2:
return {
"messages": ["Approaching limit, returning partial result"]
}
# 正常处理
return {"messages": [f"Processing... ({remaining} steps remaining)"]}
def route_decision(state: State) -> Literal["agent", END]:
if state["remaining_steps"] <= 2:
return END
return "agent"
# 构建图
builder = StateGraph(State)
builder.add_node("agent", agent_with_monitoring)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", route_decision)
graph = builder.compile()
# 主动:图优雅完成
result = graph.invoke({"messages": []}, {"recursion_limit": 10})
# 被动方法(后备) - 在外部捕获错误
try:
result = graph.invoke({"messages": []}, {"recursion_limit": 10})
except GraphRecursionError as e:
# 在图执行失败后在外部处理
result = {"messages": ["Fallback: recursion limit exceeded"]}
这些方法之间的主要区别如下:
| 方法 | 检测 | 处理 | 控制流 |
|---|
主动(使用 RemainingSteps) | 在达到限制之前 | 通过条件路由在图内 | 图继续到完成节点 |
被动(捕获 GraphRecursionError) | 在超过限制之后 | 在图外使用 try/catch | 图执行终止 |
主动优势:
- 在图内优雅降级
- 可以在检查点保存中间状态
- 更好的用户体验,带有部分结果
- 图正常完成(无异常)
被动优势:
其他可用元数据
除了 langgraph_step 之外,以下元数据也可在 config["metadata"] 中使用:
def inspect_metadata(state: dict, config: RunnableConfig) -> dict:
metadata = config["metadata"]
print(f"Step: {metadata['langgraph_step']}")
print(f"Node: {metadata['langgraph_node']}")
print(f"Triggers: {metadata['langgraph_triggers']}")
print(f"Path: {metadata['langgraph_path']}")
print(f"Checkpoint NS: {metadata['langgraph_checkpoint_ns']}")
return state
可视化
能够可视化图通常很有用,尤其是在图变得更加复杂时。LangGraph 附带了几种内置的可视化图的方法。有关更多信息,请参阅可视化您的图。
可观察性和跟踪
要跟踪、调试和评估您的智能体,请使用 LangSmith。
了解更多