LangGraph通过检查点 支持时间旅行:
重放 :从先前的检查点重试。
分叉 :从先前的检查点分叉,使用修改后的状态探索替代路径。
两者都通过从先前的检查点恢复来工作。检查点之前的节点不会重新执行(结果已保存)。检查点之后的节点会重新执行,包括任何LLM调用、API请求和中断 (这可能会产生不同的结果)。
使用先前检查点的配置调用图,以从该点重放。
重放会重新执行节点——它不仅仅是从缓存中读取。LLM调用、API请求和中断 会再次触发,并可能返回不同的结果。从最终检查点(没有next节点)重放是一个空操作。
使用get_state_history 找到要从中重放的检查点,然后使用该检查点的配置调用invoke :
from langgraph . graph import StateGraph , START
from langgraph . checkpoint . memory import InMemorySaver
from typing_extensions import TypedDict , NotRequired
from langchain_core . utils . uuid import uuid7
class State ( TypedDict ):
topic : NotRequired [ str ]
joke : NotRequired [ str ]
def generate_topic ( state : State ):
return { "topic" : "socks in the dryer" }
def write_joke ( state : State ):
return { "joke" : f "Why do { state [ ' topic ' ] } disappear? They elope!" }
checkpointer = InMemorySaver ()
graph = (
StateGraph ( State )
. add_node ( "generate_topic" , generate_topic )
. add_node ( "write_joke" , write_joke )
. add_edge ( START , "generate_topic" )
. add_edge ( "generate_topic" , "write_joke" )
. compile ( checkpointer = checkpointer )
)
# 步骤 1:运行图
config = { "configurable" : { "thread_id" : str ( uuid7 ())}}
result = graph . invoke ({}, config )
# 步骤 2:找到要从中重放的检查点
history = list ( graph . get_state_history ( config ))
# 历史记录按时间倒序排列
for state in history :
print ( f "next= { state . next } , checkpoint_id= { state . config [ ' configurable ' ][ 'checkpoint_id' ] } " )
# 步骤 3:从特定检查点重放
# 找到 write_joke 之前的检查点
before_joke = next ( s for s in history if s . next == ( "write_joke" ,))
replay_result = graph . invoke ( None , before_joke . config )
# write_joke 重新执行(再次运行),generate_topic 不执行
分叉从过去的检查点创建一个带有修改状态的新分支。在先前的检查点上调用update_state 以创建分叉,然后使用None调用invoke 以继续执行。
update_state不会 回滚线程。它创建一个从指定点分叉的新检查点。原始执行历史记录保持不变。
# 找到 write_joke 之前的检查点
history = list ( graph . get_state_history ( config ))
before_joke = next ( s for s in history if s . next == ( "write_joke" ,))
# 分叉:更新状态以更改主题
fork_config = graph . update_state (
before_joke . config ,
values = { "topic" : "chickens" },
)
# 从分叉处恢复——write_joke 使用新主题重新执行
fork_result = graph . invoke ( None , fork_config )
print ( fork_result [ " joke " ]) # 关于鸡的笑话,而不是袜子
从特定节点
当你调用update_state 时,值会使用指定节点的写入器(包括归约器 )应用。检查点记录该节点已产生更新,执行从该节点的后继节点恢复。
默认情况下,LangGraph从检查点的版本历史推断as_node。从特定检查点分叉时,此推断几乎总是正确的。
在以下情况下显式指定as_node:
并行分支 :多个节点在同一步骤中更新了状态,而LangGraph无法确定哪个是最后的(InvalidUpdateError)。
无执行历史 :在全新线程上设置状态(在测试 中常见)。
跳过节点 :将as_node设置为后面的节点,使图认为该节点已经运行。
# 图:generate_topic -> write_joke
# 将此更新视为由 generate_topic 产生。
# 执行在 write_joke(generate_topic 的后继节点)处恢复。
fork_config = graph . update_state (
before_joke . config ,
values = { "topic" : "chickens" },
as_node = "generate_topic" ,
)
如果你的图使用interrupt 进行Human in the Loop 的工作流,中断在时间旅行期间总是会重新触发。包含中断的节点会重新执行,interrupt()会暂停以等待新的Command(resume=...)。
from langgraph . types import interrupt , Command
class State ( TypedDict ):
value : list [ str ]
def ask_human ( state : State ):
answer = interrupt ( "What is your name?" )
return { "value" : [ f "Hello, { answer } !" ]}
def final_step ( state : State ):
return { "value" : [ "Done" ]}
graph = (
StateGraph ( State )
. add_node ( "ask_human" , ask_human )
. add_node ( "final_step" , final_step )
. add_edge ( START , "ask_human" )
. add_edge ( "ask_human" , "final_step" )
. compile ( checkpointer = InMemorySaver ())
)
config = { "configurable" : { "thread_id" : "1" }}
# 第一次运行:遇到中断
graph . invoke ({ "value" : []}, config )
# 使用答案恢复
graph . invoke ( Command ( resume = "Alice" ), config )
# 从 ask_human 之前重放
history = list ( graph . get_state_history ( config ))
before_ask = [ s for s in history if s . next == ( "ask_human" ,)][ - 1 ]
replay_result = graph . invoke ( None , before_ask . config )
# 在中断处暂停——等待新的 Command(resume=...)
# 从 ask_human 之前分叉
fork_config = graph . update_state ( before_ask . config , { "value" : [ "forked" ]})
fork_result = graph . invoke ( None , fork_config )
# 在中断处暂停——等待新的 Command(resume=...)
# 使用不同的答案恢复分叉的中断
graph . invoke ( Command ( resume = "Bob" ), fork_config )
# 结果:{"value": ["forked", "Hello, Bob!", "Done"]}
多个中断
如果你的图在多个点收集输入(例如,一个多步骤表单),你可以在中断之间分叉,以更改后续答案而无需重新询问之前的问题。
def ask_name ( state ):
name = interrupt ( "What is your name?" )
return { "value" : [ f "name: { name } " ]}
def ask_age ( state ):
age = interrupt ( "How old are you?" )
return { "value" : [ f "age: { age } " ]}
# 图:ask_name -> ask_age -> final
# 完成两个中断后:
# 在两个中断之间分叉(ask_name 之后,ask_age 之前)
history = list ( graph . get_state_history ( config ))
between = [ s for s in history if s . next == ( "ask_age" ,)][ - 1 ]
fork_config = graph . update_state ( between . config , { "value" : [ "modified" ]})
result = graph . invoke ( None , fork_config )
# ask_name 结果保留("name:Alice")
# ask_age 在中断处暂停——等待新答案
使用子图 进行时间旅行取决于子图是否拥有自己的检查点。这决定了你可以从中进行时间旅行的检查点的粒度。
默认情况下,子图继承父图的检查点。父图将整个子图视为单个超级步骤 ——整个子图执行只有一个父级检查点。从子图之前进行时间旅行会从头开始重新执行它。 你无法在默认子图中时间旅行到节点之间 的点——你只能从父级进行时间旅行。 # 没有自己检查点的子图(默认)
subgraph = (
StateGraph ( State )
. add_node ( "step_a" , step_a ) # 有 interrupt()
. add_node ( "step_b" , step_b ) # 有 interrupt()
. add_edge ( START , "step_a" )
. add_edge ( "step_a" , "step_b" )
. compile () # 没有检查点——从父图继承
)
graph = (
StateGraph ( State )
. add_node ( "subgraph_node" , subgraph )
. add_edge ( START , "subgraph_node" )
. compile ( checkpointer = InMemorySaver ())
)
config = { "configurable" : { "thread_id" : "1" }}
# 完成两个中断
graph . invoke ({ "value" : []}, config ) # 遇到 step_a 中断
graph . invoke ( Command ( resume = "Alice" ), config ) # 遇到 step_b 中断
graph . invoke ( Command ( resume = "30" ), config ) # 完成
# 从子图之前进行时间旅行
history = list ( graph . get_state_history ( config ))
before_sub = [ s for s in history if s . next == ( "subgraph_node" ,)][ - 1 ]
fork_config = graph . update_state ( before_sub . config , { "value" : [ "forked" ]})
result = graph . invoke ( None , fork_config )
# 整个子图从头开始重新执行
# 你无法时间旅行到 step_a 和 step_b 之间的点
在子图上设置checkpointer=True以赋予其自己的检查点历史记录。这会在子图内部 的每个步骤创建检查点,允许你从其内部的特定点进行时间旅行——例如,在两个中断之间。 使用get_state 并设置subgraphs=True来访问子图自己的检查点配置,然后从中分叉: # 拥有自己检查点的子图
subgraph = (
StateGraph ( State )
. add_node ( "step_a" , step_a ) # 有 interrupt()
. add_node ( "step_b" , step_b ) # 有 interrupt()
. add_edge ( START , "step_a" )
. add_edge ( "step_a" , "step_b" )
. compile ( checkpointer = True ) # 自己的检查点历史记录
)
graph = (
StateGraph ( State )
. add_node ( "subgraph_node" , subgraph )
. add_edge ( START , "subgraph_node" )
. compile ( checkpointer = InMemorySaver ())
)
config = { "configurable" : { "thread_id" : "1" }}
# 运行直到 step_a 中断
graph . invoke ({ "value" : []}, config )
# 恢复 step_a -> 遇到 step_b 中断
graph . invoke ( Command ( resume = "Alice" ), config )
# 获取子图自己的检查点(在 step_a 和 step_b 之间)
parent_state = graph . get_state ( config , subgraphs = True )
sub_config = parent_state . tasks [ 0 ]. state . config
# 从子图检查点分叉
fork_config = graph . update_state ( sub_config , { "value" : [ "forked" ]})
result = graph . invoke ( None , fork_config )
# step_b 重新执行,step_a 的结果保留
有关配置子图检查点的更多信息,请参阅子图持久化 。
将这些文档连接 到Claude、VSCode等,通过MCP获取实时答案。