Skip to main content
中断允许您在特定点暂停图执行,并在继续之前等待外部输入。这实现了人机回环模式,当您需要外部输入才能继续时非常有用。当触发中断时,LangGraph 使用其持久化层保存图状态,并无限期等待,直到您恢复执行。 中断通过在图节点中的任何位置调用 interrupt() 函数来工作。该函数接受任何 JSON 可序列化值,该值将呈现给调用者。当您准备好继续时,通过使用 Command 重新调用图来恢复执行,然后该值将成为节点内部 interrupt() 调用的返回值。 与静态断点(在特定节点之前或之后暂停)不同,中断是动态的:它们可以放置在代码中的任何位置,并且可以根据应用程序逻辑有条件地触发。
  • 检查点保存您的位置:检查点写入确切的图状态,因此即使处于错误状态,您也可以稍后恢复。
  • thread_id 是您的指针:设置 config={"configurable": {"thread_id": ...}} 以告诉检查点器加载哪个状态。
  • 中断负载通过 chunk["interrupts"] 呈现:当使用 version="v2" 流式传输时,您传递给 interrupt() 的值会出现在 values 流部分的 interrupts 字段中,因此您知道图正在等待什么。
您选择的 thread_id 实际上是您的持久化游标。重用它将恢复相同的检查点;使用新值将启动一个具有空状态的全新线程。

使用 interrupt 暂停

interrupt 函数暂停图执行并向调用者返回一个值。当您在节点内调用 interrupt 时,LangGraph 保存当前图状态并等待您使用输入恢复执行。 要使用 interrupt,您需要:
  1. 一个检查点器来持久化图状态(在生产中使用持久的检查点器)
  2. 配置中的线程 ID,以便运行时知道从哪个状态恢复
  3. 在您想要暂停的位置调用 interrupt()(负载必须是 JSON 可序列化的)
from langgraph.types import interrupt

def approval_node(state: State):
    # 暂停并请求批准
    approved = interrupt("Do you approve this action?")

    # 当您恢复时,Command(resume=...) 在此处返回该值
    return {"approved": approved}
当您调用 interrupt 时,会发生以下情况:
  1. 图执行在调用 interrupt 的确切点被挂起
  2. 状态被保存使用检查点器,以便稍后可以恢复执行。在生产中,这应该是一个持久的检查点器(例如,由数据库支持)
  3. 值被返回给调用者,位于 __interrupt__ 下;它可以是任何 JSON 可序列化值(字符串、对象、数组等)
  4. 图无限期等待,直到您使用响应恢复执行
  5. 响应被传回节点,当您恢复时,成为 interrupt() 调用的返回值

恢复中断

中断暂停执行后,您可以通过使用包含恢复值的 Command 再次调用图来恢复图。恢复值被传回 interrupt 调用,允许节点使用外部输入继续执行。
from langgraph.types import Command

# 初始运行 - 触达中断并暂停
# thread_id 是持久化指针(在生产中存储稳定的 ID)
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"input": "data"}, config=config, version="v2")

# result 是一个 GraphOutput,包含 .value 和 .interrupts
# .interrupts 包含传递给 interrupt() 的负载
print(result.interrupts)
# > (Interrupt(value='Do you approve this action?'),)

# 使用人类的响应恢复
# 恢复负载成为节点内部 interrupt() 的返回值
graph.invoke(Command(resume=True), config=config, version="v2")
关于恢复的关键点:
  • 您必须使用与中断发生时相同的线程 ID 进行恢复
  • 传递给 Command(resume=...) 的值成为 interrupt 调用的返回值
  • 恢复时,节点从调用 interrupt 的节点开头重新启动,因此 interrupt 之前的任何代码将再次运行
  • 您可以传递任何 JSON 可序列化值作为恢复值
Command(resume=...)唯一旨在作为 invoke()/stream() 输入的 Command 模式。其他 Command 参数(updategotograph)设计用于从节点函数返回。不要将 Command(update=...) 作为输入传递以继续多轮对话——而是传递一个普通的输入字典。

常见模式

中断解锁的关键功能是暂停执行并等待外部输入。这对于各种用例非常有用,包括:
  • 审批工作流:在执行关键操作(API 调用、数据库更改、金融交易)之前暂停
  • 处理多个中断:在单次调用中恢复多个中断时,将中断 ID 与恢复值配对
  • 审查和编辑:让人类在继续之前审查和修改 LLM 输出或工具调用
  • 中断工具调用:在执行工具调用之前暂停,以便在执行前审查和编辑工具调用
  • 验证人类输入:在继续下一步之前暂停以验证人类输入

使用人机回环 (HITL) 中断流式传输

在构建具有人机回环工作流的交互式代理时,您可以同时流式传输消息块和节点更新,以在处理中断时提供实时反馈。 使用多个流模式("messages""updates")以及 subgraphs=True(如果存在子图)来:
  • 实时流式传输 AI 响应
  • 检测图何时遇到中断
  • 无缝处理用户输入并恢复执行
async for chunk in graph.astream(
    initial_input,
    stream_mode=["messages", "updates"],
    subgraphs=True,
    config=config,
    version="v2",
):
    if chunk["type"] == "messages":
        # 处理流式消息内容
        msg, _ = chunk["data"]
        if isinstance(msg, AIMessageChunk) and msg.content:
            display_streaming_content(msg.content)

    elif chunk["type"] == "updates":
        # 检查更新数据中的中断
        if "__interrupt__" in chunk["data"]:
            interrupt_info = chunk["data"]["__interrupt__"][0].value
            user_response = get_user_input(interrupt_info)
            initial_input = Command(resume=user_response)
            break
        else:
            current_node = list(chunk["data"].keys())[0]
  • version="v2":所有块都是具有 typensdata 键的 StreamPart 字典
  • chunk["type"]:根据流模式("messages""updates" 等)缩小范围以进行类型推断
  • chunk["ns"]:标识源图(根图为空元组,子图则填充)
  • subgraphs=True:在嵌套图中检测中断所必需
  • Command(resume=...):使用用户提供的数据恢复图执行

处理多个中断

当并行分支同时中断时(例如,分发到多个节点,每个节点都调用 interrupt()),您可能需要在单次调用中恢复多个中断。 当在单次调用中恢复多个中断时,将每个中断 ID 映射到其恢复值。 这确保每个响应在运行时与正确的中断配对。
from typing import Annotated, TypedDict
import operator

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt


class State(TypedDict):
    vals: Annotated[list[str], operator.add]


def node_a(state):
    answer = interrupt("question_a")
    return {"vals": [f"a:{answer}"]}


def node_b(state):
    answer = interrupt("question_b")
    return {"vals": [f"b:{answer}"]}


graph = (
    StateGraph(State)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge(START, "b")
    .add_edge("a", END)
    .add_edge("b", END)
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# 步骤 1:调用 - 两个并行节点触达 interrupt() 并暂停
interrupted_result = graph.invoke({"vals": []}, config)
print(interrupted_result)
"""
{
    'vals': [],
    '__interrupt__': [
        Interrupt(value='question_a', id='bd4f3183600f2c41dddafbf8f0f7be7b'),
        Interrupt(value='question_b', id='29963e3d3585f0cef025dd0f14323f55')
    ]
}
"""

# 步骤 2:一次性恢复所有挂起的中断
resume_map = {
    i.id: f"answer for {i.value}"
    for i in interrupted_result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)

print("Final state:", result)
#> Final state: {'vals': ['a:answer for question_a', 'b:answer for question_b']}

批准或拒绝

中断最常见的用途之一是在关键操作之前暂停并请求批准。例如,您可能希望让人类批准 API 调用、数据库更改或任何其他重要决策。
from typing import Literal
from langgraph.types import interrupt, Command

def approval_node(state: State) -> Command[Literal["proceed", "cancel"]]:
    # 暂停执行;负载显示在 result["__interrupt__"] 下
    is_approved = interrupt({
        "question": "Do you want to proceed with this action?",
        "details": state["action_details"]
    })

    # 根据响应路由
    if is_approved:
        return Command(goto="proceed")  # 在提供恢复负载后运行
    else:
        return Command(goto="cancel")
当您恢复图时,传递 True 以批准或 False 以拒绝:
# 批准
graph.invoke(Command(resume=True), config=config)

# 拒绝
graph.invoke(Command(resume=False), config=config)
from typing import Literal, Optional, TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ApprovalState(TypedDict):
    action_details: str
    status: Optional[Literal["pending", "approved", "rejected"]]


def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]:
    # 暴露详细信息,以便调用者可以在 UI 中呈现它们
    decision = interrupt({
        "question": "Approve this action?",
        "details": state["action_details"],
    })

    # 恢复后路由到适当的节点
    return Command(goto="proceed" if decision else "cancel")


def proceed_node(state: ApprovalState):
    return {"status": "approved"}


def cancel_node(state: ApprovalState):
    return {"status": "rejected"}


builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("proceed", proceed_node)
builder.add_node("cancel", cancel_node)
builder.add_edge(START, "approval")
builder.add_edge("proceed", END)
builder.add_edge("cancel", END)

# 在生产中使用更持久的检查点器
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "approval-123"}}
initial = graph.invoke(
    {"action_details": "Transfer $500", "status": "pending"},
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'question': ..., 'details': ...})]

# 使用决策恢复;True 路由到 proceed,False 路由到 cancel
resumed = graph.invoke(Command(resume=True), config=config)
print(resumed["status"])  # -> "approved"

审查和编辑状态

有时您希望让人类在继续之前审查和编辑图状态的一部分。这对于纠正 LLM、添加缺失信息或进行调整非常有用。
from langgraph.types import interrupt

def review_node(state: State):
    # 暂停并显示当前内容以供审查(显示在 result["__interrupt__"] 中)
    edited_content = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"]
    })

    # 使用编辑后的版本更新状态
    return {"generated_text": edited_content}
恢复时,提供编辑后的内容:
graph.invoke(
    Command(resume="The edited and improved text"),  # 值成为 interrupt() 的返回值
    config=config
)
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ReviewState(TypedDict):
    generated_text: str


def review_node(state: ReviewState):
    # 要求审阅者编辑生成的内容
    updated = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"],
    })
    return {"generated_text": updated}


builder = StateGraph(ReviewState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "review-42"}}
initial = graph.invoke({"generated_text": "Initial draft"}, config=config)
print(initial["__interrupt__"])  # -> [Interrupt(value={'instruction': ..., 'content': ...})]

# 使用审阅者编辑后的文本恢复
final_state = graph.invoke(
    Command(resume="Improved draft after review"),
    config=config,
)
print(final_state["generated_text"])  # -> "Improved draft after review"

工具中的中断

您也可以直接在工具函数中放置中断。这使得工具本身在每次调用时暂停以请求批准,并允许在执行前对工具调用进行人类审查和编辑。 首先,定义一个使用 interrupt 的工具:
from langchain.tools import tool
from langgraph.types import interrupt

@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # 发送前暂停;负载显示在 result["__interrupt__"] 中
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?"
    })

    if response.get("action") == "approve":
        # 恢复值可以在执行前覆盖输入
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)
        return f"Email sent to {final_to} with subject '{final_subject}'"
    return "Email cancelled by user"
当您希望批准逻辑与工具本身共存,使其可在图的不同部分重用时,这种方法非常有用。LLM 可以自然地调用工具,中断将在工具被调用时暂停执行,允许您批准、编辑或取消操作。
import sqlite3
from typing import TypedDict

from langchain.tools import tool
from langchain_anthropic import ChatAnthropic
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class AgentState(TypedDict):
    messages: list[dict]


@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # 发送前暂停;负载显示在 result["__interrupt__"] 中
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?",
    })

    if response.get("action") == "approve":
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)

        # 实际发送电子邮件(此处为您的实现)
        print(f"[send_email] to={final_to} subject={final_subject} body={final_body}")
        return f"Email sent to {final_to}"

    return "Email cancelled by user"


model = ChatAnthropic(model="claude-sonnet-4-6").bind_tools([send_email])


def agent_node(state: AgentState):
    # LLM 可能决定调用工具;中断在发送前暂停
    result = model.invoke(state["messages"])
    return {"messages": state["messages"] + [result]}


builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)

checkpointer = SqliteSaver(sqlite3.connect("tool-approval.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "email-workflow"}}
initial = graph.invoke(
    {
        "messages": [
            {"role": "user", "content": "Send an email to alice@example.com about the meeting"}
        ]
    },
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'action': 'send_email', ...})]

# 使用批准和可选的编辑参数恢复
resumed = graph.invoke(
    Command(resume={"action": "approve", "subject": "Updated subject"}),
    config=config,
)
print(resumed["messages"][-1])  # -> 由 send_email 返回的工具结果

验证人类输入

有时您需要验证来自人类的输入,并在无效时再次询问。您可以使用循环中的多个 interrupt 调用来实现这一点。
from langgraph.types import interrupt

def get_age_node(state: State):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # 负载显示在 result["__interrupt__"] 中

        # 验证输入
        if isinstance(answer, int) and answer > 0:
            # 有效输入 - 继续
            break
        else:
            # 无效输入 - 使用更具体的提示再次询问
            prompt = f"'{answer}' is not a valid age. Please enter a positive number."

    return {"age": answer}
每次您使用无效输入恢复图时,它都会使用更清晰的消息再次询问。一旦提供有效输入,节点完成,图继续执行。
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class FormState(TypedDict):
    age: int | None


def get_age_node(state: FormState):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # 负载显示在 result["__interrupt__"] 中

        if isinstance(answer, int) and answer > 0:
            return {"age": answer}

        prompt = f"'{answer}' is not a valid age. Please enter a positive number."


builder = StateGraph(FormState)
builder.add_node("collect_age", get_age_node)
builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", END)

checkpointer = SqliteSaver(sqlite3.connect("forms.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config=config)
print(first["__interrupt__"])  # -> [Interrupt(value='What is your age?', ...)]

# 提供无效数据;节点重新提示
retry = graph.invoke(Command(resume="thirty"), config=config)
print(retry["__interrupt__"])  # -> [Interrupt(value="'thirty' is not a valid age...", ...)]

# 提供有效数据;循环退出并更新状态
final = graph.invoke(Command(resume=30), config=config)
print(final["age"])  # -> 30

中断规则

当您在节点内调用 interrupt 时,LangGraph 通过引发异常来挂起执行,该异常向运行时发出暂停信号。此异常通过调用堆栈向上传播,并由运行时捕获,运行时通知图保存当前状态并等待外部输入。 当执行恢复(在您提供请求的输入后),运行时从头开始重新启动整个节点——它不会从调用 interrupt 的确切行恢复。这意味着 interrupt 之前的任何代码将再次执行。因此,在使用中断时,有一些重要的规则需要遵循,以确保它们按预期行为。

不要在 interrupt 调用中包装 try/except

interrupt 在调用点暂停执行的方式是抛出一个特殊的异常。如果您将 interrupt 调用包装在 try/except 块中,您将捕获此异常,中断将不会被传回图。
  • ✅ 将 interrupt 调用与易出错的代码分开
  • ✅ 在 try/except 块中使用特定的异常类型
def node_a(state: State):
    # ✅ 好:先中断,然后单独处理
    # 错误条件
    interrupt("What's your name?")
    try:
        fetch_data()  # 这可能会失败
    except Exception as e:
        print(e)
    return state
  • 🔴 不要在裸 try/except 块中包装 interrupt 调用
def node_a(state: State):
    # ❌ 坏:在裸 try/except 中包装中断
    # 将捕获中断异常
    try:
        interrupt("What's your name?")
    except Exception as e:
        print(e)
    return state

不要在节点内重新排序 interrupt 调用

在单个节点中使用多个中断很常见,但如果处理不当,可能会导致意外行为。 当节点包含多个中断调用时,LangGraph 会为执行节点的任务维护一个特定的恢复值列表。每当执行恢复时,它都会从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配值。匹配是严格基于索引的,因此节点内中断调用的顺序很重要。
  • ✅ 保持 interrupt 调用在节点执行之间一致
def node_a(state: State):
    # ✅ 好:中断调用每次都以相同的顺序发生
    name = interrupt("What's your name?")
    age = interrupt("What's your age?")
    city = interrupt("What's your city?")

    return {
        "name": name,
        "age": age,
        "city": city
    }
  • 🔴 不要在节点内有条件地跳过 interrupt 调用
  • 🔴 不要使用在执行之间非确定性的逻辑循环 interrupt 调用
def node_a(state: State):
    # ❌ 坏:有条件地跳过中断会改变顺序
    name = interrupt("What's your name?")

    # 在第一次运行时,这可能会跳过中断
    # 在恢复时,它可能不会跳过 - 导致索引不匹配
    if state.get("needs_age"):
        age = interrupt("What's your age?")

    city = interrupt("What's your city?")

    return {"name": name, "city": city}

不要在 interrupt 调用中返回复杂值

根据使用的检查点器,复杂值可能无法序列化(例如,您无法序列化函数)。为了使您的图适应任何部署,最好只使用可以合理序列化的值。
  • ✅ 传递简单、JSON 可序列化的类型给 interrupt
  • ✅ 传递具有简单值的字典/对象
def node_a(state: State):
    # ✅ 好:传递可序列化的简单类型
    name = interrupt("What's your name?")
    count = interrupt(42)
    approved = interrupt(True)

    return {"name": name, "count": count, "approved": approved}
  • 🔴 不要将函数、类实例或其他复杂对象传递给 interrupt
def validate_input(value):
    return len(value) > 0

def node_a(state: State):
    # ❌ 坏:将函数传递给中断
    # 函数无法序列化
    response = interrupt({
        "question": "What's your name?",
        "validator": validate_input  # 这将失败
    })
    return {"name": response}

interrupt 之前调用的副作用必须是幂等的

因为中断通过重新运行它们被调用的节点来工作,所以在 interrupt 之前调用的副作用应该(理想情况下)是幂等的。就上下文而言,幂等性意味着相同的操作可以应用多次,而不会改变初始执行之外的结果。 例如,您可能在节点内有一个更新记录的 API 调用。如果在调用之后调用 interrupt,则在节点恢复时,它将被多次重新运行,可能会覆盖初始更新或创建重复记录。
  • ✅ 在 interrupt 之前使用幂等操作
  • ✅ 将副作用放在 interrupt 调用之后
  • ✅ 在可能的情况下,将副作用分离到单独的节点中
def node_a(state: State):
    # ✅ 好:使用幂等的 upsert 操作
    # 多次运行此操作将产生相同的结果
    db.upsert_user(
        user_id=state["user_id"],
        status="pending_approval"
    )

    approved = interrupt("Approve this change?")

    return {"approved": approved}
  • 🔴 不要在 interrupt 之前执行非幂等操作
  • 🔴 不要在不检查是否存在的情况下创建新记录
def node_a(state: State):
    # ❌ 坏:在中断之前创建新记录
    # 这将在每次恢复时创建重复记录
    audit_id = db.create_audit_log({
        "user_id": state["user_id"],
        "action": "pending_approval",
        "timestamp": datetime.now()
    })

    approved = interrupt("Approve this change?")

    return {"approved": approved, "audit_id": audit_id}

与作为函数调用的子图一起使用

当在节点内调用子图时,父图将从调用子图并触发 interrupt 的节点的开头恢复执行。类似地,子图也将从调用 interrupt 的节点的开头恢复。
def node_in_parent_graph(state: State):
    some_code()  # <-- 恢复时将重新执行
    # 作为函数调用子图。
    # 子图包含一个 `interrupt` 调用。
    subgraph_result = subgraph.invoke(some_input)
    # ...

def node_in_subgraph(state: State):
    some_other_code()  # <-- 恢复时也将重新执行
    result = interrupt("What's your name?")
    # ...

使用中断调试

要调试和测试图,您可以使用静态中断作为断点,以逐步执行图执行,一次一个节点。静态中断在节点执行之前或之后的定义点触发。您可以在编译图时通过指定 interrupt_beforeinterrupt_after 来设置这些。
静态中断推荐用于人机回环工作流。请改用 interrupt 函数。
graph = builder.compile(
    interrupt_before=["node_a"],
    interrupt_after=["node_b", "node_c"],
    checkpointer=checkpointer,
)

# 将线程 ID 传递给图
config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# 运行图直到断点
graph.invoke(inputs, config=config)

# 恢复图
graph.invoke(None, config=config)
  1. 断点在 compile 时设置。
  2. interrupt_before 指定在节点执行之前应暂停执行的节点。
  3. interrupt_after 指定在节点执行之后应暂停执行的节点。
  4. 需要检查点器才能启用断点。
  5. 图运行直到命中第一个断点。
  6. 通过为输入传递 None 来恢复图。这将运行图直到命中下一个断点。
要调试您的中断,请使用 LangSmith

使用 LangSmith Studio

您可以使用 LangSmith Studio 在 UI 中设置图中的静态中断,然后再运行图。您还可以使用 UI 在执行的任何点检查图状态。 image