Skip to main content
本指南介绍如何通过 LangSmith Deployment API 取消代理的运行。您可以按 ID 取消单个运行,或按线程或状态取消多个运行。取消操作适用于停止长时间运行或卡住的运行,或当用户放弃请求时。

设置

创建客户端和线程:
from langgraph_sdk import get_client

client = get_client(url=<DEPLOYMENT_URL>)
assistant_id = "agent"
thread = await client.threads.create()

取消单个运行

以下示例创建一个运行,使用不同选项取消它,并打印运行以显示每种情况下的结果。您可以取消处于 pendingrunning 状态的运行。尝试取消不处于 pendingrunning 状态的运行将导致错误。

使用中断取消(默认)

中断 会停止执行运行的工作者,并将运行标记为 interrupted。不会删除任何内容:
  • 运行记录保留(状态为 interrupted)。您可以获取它,检查输入/输出,并查看执行历史记录。
  • 该运行的所有检查点都保留存储。最后完成步骤的线程状态得以保留。
  • 您可以稍后从检查点恢复(例如,使用时间旅行)或检查部分状态。
当您想停止运行但保留它以进行调试、审计或从检查点恢复时,请使用 中断
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
await client.runs.cancel(thread["thread_id"], run["run_id"])

run_after = await client.runs.get(thread["thread_id"], run["run_id"], wait=True)
print(run_after["status"])   # "interrupted"

使用回滚取消

回滚 会停止运行,然后从存储中删除该运行及其检查点:
  • 运行记录被删除。该运行不再出现在该线程的运行列表或历史记录中。
  • 该运行创建的所有检查点都被删除。线程的状态恢复到运行开始之前的状态(就像该运行从未执行过一样)。
  • 回滚后,您无法恢复或检查该运行。
当您想完全丢弃一个运行及其影响时(例如,在用户放弃请求后,您不需要保留部分工作),请使用 回滚
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
await client.runs.cancel(thread["thread_id"], run["run_id"], action="rollback", wait=True)

# 由于运行已删除,会抛出错误
try:
    await client.runs.get(thread["thread_id"], run["run_id"])
except Exception:
    print("Run was correctly deleted")

使用等待取消

默认情况下,取消请求在请求取消后返回,运行被异步取消。wait=True 会使取消请求阻塞,直到运行被完全取消。当您想知道运行被取消后的最终状态(例如,创建了哪些检查点,最终输出是什么)时,这很有用。
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
# 异步取消运行
await client.runs.cancel(thread["thread_id"], run["run_id"])
# 获取运行的状态
run_after = await client.runs.get(thread["thread_id"], run["run_id"])
print(run_after["status"])  # "pending" 或 "running"

# 等待运行被正确取消
await client.runs.join(thread["thread_id"], run["run_id"])
run_after = await client.runs.get(thread["thread_id"], run["run_id"])
print(run_after["status"])  # "interrupted"

取消多个运行

使用批量取消端点在一次请求中取消多个运行。支持中断和回滚操作。

按线程 ID 和运行 ID 取消

通过传递运行 ID 来取消特定运行。
run1 = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "First request"}]},
)
run2 = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Second request"}]},
    multitask_strategy="enqueue",
)

await client.runs.cancel_many(
    thread_id=thread["thread_id"],
    run_ids=[run1["run_id"], run2["run_id"]]
)

# 等待运行被取消
await client.runs.join(thread["thread_id"], run2["run_id"])
runs_after = await client.runs.list(thread["thread_id"])
for run in runs_after:
    if run["run_id"] in (run1["run_id"], run2["run_id"]):
        print(run["run_id"], run["status"])  # "interrupted"

按状态取消

取消部署中所有线程中匹配状态的所有运行。有效的状态选项是 pendingrunningall
run1 = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "First request"}]},
)
thread2 = await client.threads.create()
run2 = await client.runs.create(
    thread2["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Second request"}]},
)

await client.runs.cancel_many(
    status="running",
)

# 等待运行被取消
await client.runs.join(thread2["thread_id"], run2["run_id"])
run_after = await client.runs.get(thread["thread_id"], run1["run_id"])
print(run_after["status"])  # 正在运行的运行现在为 "interrupted"
run_after2 = await client.runs.get(thread2["thread_id"], run2["run_id"])
print(run_after2["status"])  # 所有线程中的运行都被取消

断开连接时取消

当使用流式处理启动运行或等待运行时,您可以设置 on_disconnect="cancel",以便在客户端断开连接时取消运行。这可以避免在用户关闭应用程序或失去连接时留下正在进行的运行。
# 使用 runs.wait:如果客户端断开连接,运行将被取消
result = await client.runs.wait(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
    on_disconnect="cancel",
)

# 使用 runs.stream:如果客户端断开连接,运行将被取消
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
    on_disconnect="cancel",
):
    print(chunk)

# 使用 runs.join:等待现有运行;如果客户端断开连接则取消
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
await client.runs.join(
    thread["thread_id"],
    run["run_id"],
    on_disconnect="cancel",
)

# 使用 runs.join_stream:加入现有运行并流式传输;如果客户端断开连接则取消
async for chunk in client.runs.join_stream(
    thread["thread_id"],
    run["run_id"],
    on_disconnect="cancel",
):
    print(chunk)

常见场景

  • Human in the Loop和中断:代理可以在中断处暂停以获取人类输入。取消运行会停止执行;这与中断不同,中断是运行暂停,可以通过新输入恢复。
  • 时间旅行:使用操作 interrupt 取消后,运行和检查点仍然可用。您可以从检查点恢复(时间旅行)以重放或分支执行。
  • 重复输入:当用户在运行进行中发送新输入时,多任务策略(排队、拒绝、中断、回滚)决定现有运行是被中断还是回滚,以及如何处理新运行。要从应用程序显式取消运行,请使用本页描述的取消 API。
  • Studio:在 Studio 中,使用运行 UI 中的 取消 按钮取消当前运行。