Skip to main content
中断允许你在图执行的特定节点暂停,并等待外部输入后再继续。这支持了需要外部输入才能推进的人机协作(human-in-the-loop)模式。当中断触发时,LangGraph 通过其持久化层保存图状态,并无限期等待直到你恢复执行。 中断的工作方式是在图节点的任意位置调用 interrupt() 函数。该函数接受任何 JSON 可序列化的值,并将其暴露给调用方。当你准备继续时,可以使用 Command 重新调用图来恢复执行,该值将成为节点内 interrupt() 调用的返回值。 与静态断点(在特定节点前后暂停)不同,中断是动态的——它们可以放置在代码的任意位置,并可根据应用逻辑设置条件。
  • 检查点保存你的位置: 检查点器会写入精确的图状态,让你之后可以恢复执行,即使处于错误状态也可以。
  • thread_id 是你的指针: 设置 config={"configurable": {"thread_id": ...}} 以告诉检查点器加载哪个状态。
  • 中断载荷以 __interrupt__ 形式暴露: 你传给 interrupt() 的值会通过 __interrupt__ 字段返回给调用方,让你知道图在等待什么。
你选择的 thread_id 实际上就是你的持久游标。重用它会恢复同一个检查点;使用新值则会启动一个状态为空的全新线程。

使用 interrupt 暂停

interrupt 函数会暂停图执行并将一个值返回给调用方。当你在节点中调用 interrupt 时,LangGraph 会保存当前图状态,并等待你带着输入恢复执行。 使用 interrupt 需要:
  1. 一个检查点器来持久化图状态(生产环境中请使用持久化检查点器)
  2. 配置中的一个 thread ID,让运行时知道从哪个状态恢复
  3. 在你想要暂停的地方调用 interrupt()(载荷必须是 JSON 可序列化的)
from langgraph.types import interrupt

def approval_node(state: State):
    # Pause and ask for approval
    approved = interrupt("Do you approve this action?")

    # When you resume, Command(resume=...) returns that value here
    return {"approved": approved}
当你调用 interrupt 时,会发生以下情况:
  1. 图执行被暂停,精确停在 interrupt 被调用的位置
  2. 状态被保存,使用检查点器以便之后恢复执行。生产环境中应使用持久化检查点器(例如由数据库支持)
  3. 值被返回给调用方,以 __interrupt__ 形式呈现;可以是任何 JSON 可序列化的值(字符串、对象、数组等)
  4. 图无限期等待,直到你带着响应恢复执行
  5. 响应被传回节点,成为 interrupt() 调用的返回值

恢复中断

中断暂停执行后,你可以通过再次调用图并传入包含恢复值的 Command 来恢复图执行。恢复值会被传回到 interrupt 调用处,让节点能够利用外部输入继续执行。
from langgraph.types import Command

# Initial run - hits the interrupt and pauses
# thread_id is the persistent pointer (stores a stable ID in production)
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"input": "data"}, config=config)

# Check what was interrupted
# __interrupt__ contains the payload that was passed to interrupt()
print(result["__interrupt__"])
# > [Interrupt(value='Do you approve this action?')]

# Resume with the human's response
# The resume payload becomes the return value of interrupt() inside the node
graph.invoke(Command(resume=True), config=config)
恢复的关键要点:
  • 恢复时必须使用与中断发生时相同的 thread ID
  • 传给 Command(resume=...) 的值会成为 interrupt 调用的返回值
  • 恢复时,节点会从 interrupt 所在节点的开头重新开始,因此 interrupt 之前的代码会再次运行
  • 可以传入任何 JSON 可序列化的值作为恢复值
Command(resume=...)唯一设计为 invoke()/stream() 输入的 Command 模式。其他 Command 参数(updategotograph)是为从节点函数返回而设计的。不要将 Command(update=...) 作为输入传入以继续多轮对话——请传入普通的输入字典。

常见模式

中断解锁的核心能力是暂停执行并等待外部输入。这在多种场景下都很有用,包括:
  • 审批工作流:在执行关键操作(API 调用、数据库变更、金融交易)之前暂停
  • 处理多个中断:在单次调用中恢复多个中断时,将中断 ID 与恢复值对应
  • 审查与编辑:让人类在继续之前审查并修改 LLM 输出或工具调用
  • 中断工具调用:在工具调用执行前暂停,以便在执行前审查和编辑工具调用
  • 验证人类输入:在进入下一步之前暂停以验证人类输入

使用人机协作(HITL)中断进行流式处理

在构建带人机协作工作流的交互式智能体时,你可以同时流式传输消息块和节点更新,从而在处理中断的同时提供实时反馈。 使用多种流模式("messages""updates")以及 subgraphs=True(如果存在子图)可以:
  • 实时流式传输 AI 响应
  • 检测图何时遇到中断
  • 无缝处理用户输入并恢复执行
async for metadata, mode, chunk in graph.astream(
    initial_input,
    stream_mode=["messages", "updates"],
    subgraphs=True,
    config=config
):
    if mode == "messages":
        # Handle streaming message content
        msg, _ = chunk
        if isinstance(msg, AIMessageChunk) and msg.content:
            # Display content in real-time
            display_streaming_content(msg.content)

    elif mode == "updates":
        # Check for interrupts
        if "__interrupt__" in chunk:
            # Stop streaming display
            interrupt_info = chunk["__interrupt__"][0].value

            # Handle user input
            user_response = get_user_input(interrupt_info)

            # Resume graph with updated input
            initial_input = Command(resume=user_response)
            break

        else:
            # Track node transitions
            current_node = list(chunk.keys())[0]
  • stream_mode=["messages", "updates"]:同时启用消息块和图状态更新的双重流式传输
  • subgraphs=True:在嵌套图中检测中断时必需
  • "__interrupt__" 检测:表示需要人类输入
  • Command(resume=...):使用用户提供的数据恢复图执行

处理多个中断

当并行分支同时中断时(例如,扇出到多个节点,每个节点都调用 interrupt()),你可能需要在单次调用中恢复多个中断。 在单次调用中恢复多个中断时,将每个中断 ID 映射到其恢复值。 这确保每个响应在运行时与正确的中断配对。
from typing import Annotated, TypedDict
import operator

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt


class State(TypedDict):
    vals: Annotated[list[str], operator.add]


def node_a(state):
    answer = interrupt("question_a")
    return {"vals": [f"a:{answer}"]}


def node_b(state):
    answer = interrupt("question_b")
    return {"vals": [f"b:{answer}"]}


graph = (
    StateGraph(State)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge(START, "b")
    .add_edge("a", END)
    .add_edge("b", END)
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# Step 1: invoke — both parallel nodes hit interrupt() and pause
interrupted_result = graph.invoke({"vals": []}, config)
print(interrupted_result)
"""
{
    'vals': [],
    '__interrupt__': [
        Interrupt(value='question_a', id='bd4f3183600f2c41dddafbf8f0f7be7b'),
        Interrupt(value='question_b', id='29963e3d3585f0cef025dd0f14323f55')
    ]
}
"""

# Step 2: resume all pending interrupts at once
resume_map = {
    i.id: f"answer for {i.value}"
    for i in interrupted_result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)

print("Final state:", result)
#> Final state: {'vals': ['a:answer for question_a', 'b:answer for question_b']}

审批或拒绝

中断最常见的用途之一是在执行关键操作之前暂停并请求审批。例如,你可能希望请求人类批准 API 调用、数据库变更或任何其他重要决策。
from typing import Literal
from langgraph.types import interrupt, Command

def approval_node(state: State) -> Command[Literal["proceed", "cancel"]]:
    # Pause execution; payload shows up under result["__interrupt__"]
    is_approved = interrupt({
        "question": "Do you want to proceed with this action?",
        "details": state["action_details"]
    })

    # Route based on the response
    if is_approved:
        return Command(goto="proceed")  # Runs after the resume payload is provided
    else:
        return Command(goto="cancel")
恢复图时,传入 true 表示批准,传入 false 表示拒绝:
# To approve
graph.invoke(Command(resume=True), config=config)

# To reject
graph.invoke(Command(resume=False), config=config)
from typing import Literal, Optional, TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ApprovalState(TypedDict):
    action_details: str
    status: Optional[Literal["pending", "approved", "rejected"]]


def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]:
    # Expose details so the caller can render them in a UI
    decision = interrupt({
        "question": "Approve this action?",
        "details": state["action_details"],
    })

    # Route to the appropriate node after resume
    return Command(goto="proceed" if decision else "cancel")


def proceed_node(state: ApprovalState):
    return {"status": "approved"}


def cancel_node(state: ApprovalState):
    return {"status": "rejected"}


builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("proceed", proceed_node)
builder.add_node("cancel", cancel_node)
builder.add_edge(START, "approval")
builder.add_edge("proceed", END)
builder.add_edge("cancel", END)

# Use a more durable checkpointer in production
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "approval-123"}}
initial = graph.invoke(
    {"action_details": "Transfer $500", "status": "pending"},
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'question': ..., 'details': ...})]

# Resume with the decision; True routes to proceed, False to cancel
resumed = graph.invoke(Command(resume=True), config=config)
print(resumed["status"])  # -> "approved"

审查与编辑状态

有时你希望让人类在继续之前审查并编辑部分图状态。这对于纠正 LLM 输出、补充缺失信息或进行调整非常有用。
from langgraph.types import interrupt

def review_node(state: State):
    # Pause and show the current content for review (surfaces in result["__interrupt__"])
    edited_content = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"]
    })

    # Update the state with the edited version
    return {"generated_text": edited_content}
恢复时,提供编辑后的内容:
graph.invoke(
    Command(resume="The edited and improved text"),  # Value becomes the return from interrupt()
    config=config
)
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ReviewState(TypedDict):
    generated_text: str


def review_node(state: ReviewState):
    # Ask a reviewer to edit the generated content
    updated = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"],
    })
    return {"generated_text": updated}


builder = StateGraph(ReviewState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "review-42"}}
initial = graph.invoke({"generated_text": "Initial draft"}, config=config)
print(initial["__interrupt__"])  # -> [Interrupt(value={'instruction': ..., 'content': ...})]

# Resume with the edited text from the reviewer
final_state = graph.invoke(
    Command(resume="Improved draft after review"),
    config=config,
)
print(final_state["generated_text"])  # -> "Improved draft after review"

工具中的中断

你也可以直接在工具函数内部设置中断。这使得工具每次被调用时都会暂停等待审批,并允许在执行工具调用前进行人工审查和编辑。 首先,定义一个使用 interrupt 的工具:
from langchain.tools import tool
from langgraph.types import interrupt

@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # Pause before sending; payload surfaces in result["__interrupt__"]
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?"
    })

    if response.get("action") == "approve":
        # Resume value can override inputs before executing
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)
        return f"Email sent to {final_to} with subject '{final_subject}'"
    return "Email cancelled by user"
当你希望审批逻辑与工具本身放在一起、使其在图的不同部分可复用时,这种方式非常有用。LLM 可以自然地调用工具,每当工具被调用时中断就会暂停执行,让你可以批准、编辑或取消该操作。
import sqlite3
from typing import TypedDict

from langchain.tools import tool
from langchain_anthropic import ChatAnthropic
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class AgentState(TypedDict):
    messages: list[dict]


@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # Pause before sending; payload surfaces in result["__interrupt__"]
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?",
    })

    if response.get("action") == "approve":
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)

        # Actually send the email (your implementation here)
        print(f"[send_email] to={final_to} subject={final_subject} body={final_body}")
        return f"Email sent to {final_to}"

    return "Email cancelled by user"


model = ChatAnthropic(model="claude-sonnet-4-6").bind_tools([send_email])


def agent_node(state: AgentState):
    # LLM may decide to call the tool; interrupt pauses before sending
    result = model.invoke(state["messages"])
    return {"messages": state["messages"] + [result]}


builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)

checkpointer = SqliteSaver(sqlite3.connect("tool-approval.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "email-workflow"}}
initial = graph.invoke(
    {
        "messages": [
            {"role": "user", "content": "Send an email to alice@example.com about the meeting"}
        ]
    },
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'action': 'send_email', ...})]

# Resume with approval and optionally edited arguments
resumed = graph.invoke(
    Command(resume={"action": "approve", "subject": "Updated subject"}),
    config=config,
)
print(resumed["messages"][-1])  # -> Tool result returned by send_email

验证人类输入

有时你需要验证来自人类的输入,并在输入无效时重新提问。你可以在循环中使用多个 interrupt 调用来实现这一点。
from langgraph.types import interrupt

def get_age_node(state: State):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # payload surfaces in result["__interrupt__"]

        # Validate the input
        if isinstance(answer, int) and answer > 0:
            # Valid input - continue
            break
        else:
            # Invalid input - ask again with a more specific prompt
            prompt = f"'{answer}' is not a valid age. Please enter a positive number."

    return {"age": answer}
每次以无效输入恢复图时,它会用更清晰的消息再次提问。一旦提供了有效输入,节点就会完成,图继续执行。
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class FormState(TypedDict):
    age: int | None


def get_age_node(state: FormState):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # payload surfaces in result["__interrupt__"]

        if isinstance(answer, int) and answer > 0:
            return {"age": answer}

        prompt = f"'{answer}' is not a valid age. Please enter a positive number."


builder = StateGraph(FormState)
builder.add_node("collect_age", get_age_node)
builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", END)

checkpointer = SqliteSaver(sqlite3.connect("forms.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config=config)
print(first["__interrupt__"])  # -> [Interrupt(value='What is your age?', ...)]

# Provide invalid data; the node re-prompts
retry = graph.invoke(Command(resume="thirty"), config=config)
print(retry["__interrupt__"])  # -> [Interrupt(value="'thirty' is not a valid age...", ...)]

# Provide valid data; loop exits and state updates
final = graph.invoke(Command(resume=30), config=config)
print(final["age"])  # -> 30

中断规则

当你在节点中调用 interrupt 时,LangGraph 通过抛出一个特殊异常来暂停执行,该异常会通知运行时进行暂停。这个异常沿调用栈向上传播,被运行时捕获,通知图保存当前状态并等待外部输入。 当执行恢复时(在你提供了请求的输入之后),运行时会从节点开头重新启动整个节点——而不是从 interrupt 被调用的确切位置恢复。这意味着 interrupt 之前运行的任何代码都会再次执行。因此,在使用中断时有一些重要规则需要遵守,以确保其行为符合预期。

不要将 interrupt 调用包裹在 try/except 中

interrupt 在调用点暂停执行的方式是抛出一个特殊异常。如果你将 interrupt 调用包裹在 try/except 块中,就会捕获这个异常,中断将无法传回图。
  • ✅ 将 interrupt 调用与可能出错的代码分开
  • ✅ 在 try/except 块中使用特定的异常类型
def node_a(state: State):
    # ✅ Good: interrupting first, then handling
    # error conditions separately
    interrupt("What's your name?")
    try:
        fetch_data()  # This can fail
    except Exception as e:
        print(e)
    return state
  • 🔴 不要将 interrupt 调用包裹在裸 try/except 块中
def node_a(state: State):
    # ❌ Bad: wrapping interrupt in bare try/except
    # will catch the interrupt exception
    try:
        interrupt("What's your name?")
    except Exception as e:
        print(e)
    return state

不要在节点内重排 interrupt 调用

在单个节点中使用多个中断很常见,但如果处理不当可能导致意外行为。 当节点包含多个 interrupt 调用时,LangGraph 会维护一个与执行该节点的任务对应的恢复值列表。每次恢复执行时,都从节点开头开始。对于遇到的每个 interrupt,LangGraph 检查任务恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内 interrupt 调用的顺序非常重要。
  • ✅ 保持 interrupt 调用在各次节点执行中一致
def node_a(state: State):
    # ✅ Good: interrupt calls happen in the same order every time
    name = interrupt("What's your name?")
    age = interrupt("What's your age?")
    city = interrupt("What's your city?")

    return {
        "name": name,
        "age": age,
        "city": city
    }
  • 🔴 不要在节点内有条件地跳过 interrupt 调用
  • 🔴 不要使用在各次执行间不确定的逻辑循环 interrupt 调用
def node_a(state: State):
    # ❌ Bad: conditionally skipping interrupts changes the order
    name = interrupt("What's your name?")

    # On first run, this might skip the interrupt
    # On resume, it might not skip it - causing index mismatch
    if state.get("needs_age"):
        age = interrupt("What's your age?")

    city = interrupt("What's your city?")

    return {"name": name, "city": city}

不要在 interrupt 调用中返回复杂值

根据所使用的检查点器,复杂值可能无法序列化(例如,函数不能被序列化)。为使你的图能适应任何部署环境,最佳实践是只使用可以合理序列化的值。
  • ✅ 向 interrupt 传入简单的 JSON 可序列化类型
  • ✅ 传入包含简单值的字典/对象
def node_a(state: State):
    # ✅ Good: passing simple types that are serializable
    name = interrupt("What's your name?")
    count = interrupt(42)
    approved = interrupt(True)

    return {"name": name, "count": count, "approved": approved}
  • 🔴 不要向 interrupt 传入函数、类实例或其他复杂对象
def validate_input(value):
    return len(value) > 0

def node_a(state: State):
    # ❌ Bad: passing a function to interrupt
    # The function cannot be serialized
    response = interrupt({
        "question": "What's your name?",
        "validator": validate_input  # This will fail
    })
    return {"name": response}

interrupt 之前的副作用必须是幂等的

由于中断通过重新运行所在节点来工作,在 interrupt 之前调用的副作用最好是幂等的。幂等性是指同一操作可以多次执行,但结果与首次执行相同。 例如,你可能在节点内有一个更新记录的 API 调用。如果该调用完成后才调用 interrupt,节点恢复时这个 API 调用会被多次重新执行,可能会覆盖初始更新或创建重复记录。
  • ✅ 在 interrupt 之前使用幂等操作
  • ✅ 将副作用放在 interrupt 调用之后
  • ✅ 尽可能将副作用分离到独立节点中
def node_a(state: State):
    # ✅ Good: using upsert operation which is idempotent
    # Running this multiple times will have the same result
    db.upsert_user(
        user_id=state["user_id"],
        status="pending_approval"
    )

    approved = interrupt("Approve this change?")

    return {"approved": approved}
  • 🔴 不要在 interrupt 之前执行非幂等操作
  • 🔴 不要在未检查记录是否已存在的情况下创建新记录
def node_a(state: State):
    # ❌ Bad: creating a new record before interrupt
    # This will create duplicate records on each resume
    audit_id = db.create_audit_log({
        "user_id": state["user_id"],
        "action": "pending_approval",
        "timestamp": datetime.now()
    })

    approved = interrupt("Approve this change?")

    return {"approved": approved, "audit_id": audit_id}

与以函数方式调用的子图配合使用

在节点中调用子图时,父图将从调用子图的节点开头恢复执行,即触发 interrupt 的地方。同样,子图也会从调用 interrupt 的节点开头恢复。
def node_in_parent_graph(state: State):
    some_code()  # <-- This will re-execute when resumed
    # Invoke a subgraph as a function.
    # The subgraph contains an `interrupt` call.
    subgraph_result = subgraph.invoke(some_input)
    # ...

def node_in_subgraph(state: State):
    some_other_code()  # <-- This will also re-execute when resumed
    result = interrupt("What's your name?")
    # ...

使用中断进行调试

为了调试和测试图,你可以使用静态中断作为断点,逐节点地单步执行图。静态中断在节点执行前或执行后的定义位置触发。你可以在编译图时通过指定 interrupt_beforeinterrupt_after 来设置这些断点。
静态中断推荐用于人机协作工作流。请改用 interrupt 函数。
graph = builder.compile(
    interrupt_before=["node_a"],
    interrupt_after=["node_b", "node_c"],
    checkpointer=checkpointer,
)

# 向图传入 thread ID
config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# 运行图直到断点
graph.invoke(inputs, config=config)

# 恢复图
graph.invoke(None, config=config)
  1. 断点在 compile 时设置。
  2. interrupt_before 指定在节点执行之前暂停的节点。
  3. interrupt_after 指定在节点执行之后暂停的节点。
  4. 需要检查点器才能启用断点。
  5. 图运行直到命中第一个断点。
  6. 通过传入 None 作为输入来恢复图。这会运行图直到命中下一个断点。
要调试中断,请使用 LangSmith

使用 LangSmith Studio

你可以使用 LangSmith Studio 在 UI 中运行图之前设置静态中断。你还可以使用 UI 在执行过程中的任意节点检查图状态。 image