人机回环(Human-in-the-Loop, HITL)中间件 允许您为代理工具调用添加人工监督。
当模型提出可能需要审查的操作时——例如写入文件或执行 SQL——中间件可以暂停执行并等待决策。
它通过根据可配置的策略检查每个工具调用来实现这一点。如果需要干预,中间件会发出一个中断来停止执行。图状态使用 LangGraph 的持久化层保存,因此执行可以安全暂停并在稍后恢复。
然后,人工决策决定接下来发生什么:操作可以按原样批准(approve)、在运行前修改(edit)或拒绝并提供反馈(reject)。
中断决策类型
中间件 定义了三种内置方式,供人类响应中断:
| 决策类型 | 描述 | 示例用例 |
|---|
✅ approve | 操作按原样批准并执行,无需更改。 | 按原样发送电子邮件草稿 |
✏️ edit | 工具调用在修改后执行。 | 在发送电子邮件前更改收件人 |
❌ reject | 工具调用被拒绝,并向对话中添加解释。 | 拒绝电子邮件草稿并解释如何重写它 |
每个工具可用的决策类型取决于您在 interrupt_on 中配置的策略。
当多个工具调用同时暂停时,每个操作都需要单独的决策。
决策必须按照中断请求中操作出现的顺序提供。
当编辑工具参数时,请保守地进行更改。对原始参数的重大修改可能导致模型重新评估其方法,并可能多次执行工具或采取意外操作。
配置中断
要使用 HITL,在创建代理时将中间件 添加到代理的 middleware 列表中。
您通过将工具操作映射到每个操作允许的决策类型来配置它。当中断与映射中的操作匹配时,中间件将中断执行。
import { createAgent, humanInTheLoopMiddleware } from "langchain";
import { MemorySaver } from "@langchain/langgraph";
const agent = createAgent({
model: "gpt-4.1",
tools: [writeFileTool, executeSQLTool, readDataTool],
middleware: [
humanInTheLoopMiddleware({
interruptOn: {
write_file: true, // 允许所有决策 (approve, edit, reject)
execute_sql: {
allowedDecisions: ["approve", "reject"],
// 不允许编辑
description: "🚨 SQL 执行需要 DBA 批准",
},
// 安全操作,无需批准
read_data: false,
},
// 中断消息的前缀 - 与工具名称和参数组合形成完整消息
// 例如,"工具执行待批准:execute_sql with query='DELETE FROM...'"
// 单个工具可以通过在其中断配置中指定 "description" 来覆盖此设置
descriptionPrefix: "工具执行待批准",
}),
],
// 人机回环需要检查点来处理中断。
// 在生产环境中,使用持久化检查点器,如 AsyncPostgresSaver。
checkpointer: new MemorySaver(),
});
响应中断
调用代理时,它会运行直到完成或引发中断。当中断与您在 interrupt_on 中配置的策略匹配时,会触发中断。使用 version="v2" 时,结果是一个带有 interrupts 属性的 GraphOutput,其中包含需要审查的操作。然后,您可以将这些操作呈现给审查者,并在提供决策后恢复执行。
import { HumanMessage } from "@langchain/core/messages";
import { Command } from "@langchain/langgraph";
// 您必须提供线程 ID 以将执行与对话线程关联,
// 以便对话可以暂停和恢复(人工审查时需要)。
const config = { configurable: { thread_id: "some_id" } };
// 运行图直到命中中断。
const result = await agent.invoke(
{
messages: [new HumanMessage("从数据库中删除旧记录")],
},
config
);
// 中断包含完整的 HITL 请求,包含 action_requests 和 review_configs
console.log(result.__interrupt__);
// > [
// > Interrupt(
// > value: {
// > action_requests: [
// > {
// > name: 'execute_sql',
// > arguments: { query: 'DELETE FROM records WHERE created_at < NOW() - INTERVAL \'30 days\';' },
// > description: '工具执行待批准\n\n工具: execute_sql\n参数: {...}'
// > }
// > ],
// > review_configs: [
// > {
// > action_name: 'execute_sql',
// > allowed_decisions: ['approve', 'reject']
// > }
// > ]
// > }
// > )
// > ]
// 使用批准决策恢复
await agent.invoke(
new Command({
resume: { decisions: [{ type: "approve" }] }, // 或 "reject"
}),
config // 相同的线程 ID 以恢复暂停的对话
);
决策类型
✅ approve
✏️ edit
❌ reject
使用 approve 来批准工具调用并按原样执行,无需更改。await agent.invoke(
new Command({
// 决策以列表形式提供,每个审查操作一个。
// 决策的顺序必须与中断请求中操作的顺序匹配。
resume: {
decisions: [
{
type: "approve",
}
]
}
}),
config // 相同的线程 ID 以恢复暂停的对话
);
使用 edit 在执行前修改工具调用。
提供编辑后的操作,包含新的工具名称和参数。await agent.invoke(
new Command({
// 决策以列表形式提供,每个审查操作一个。
// 决策的顺序必须与中断请求中操作的顺序匹配。
resume: {
decisions: [
{
type: "edit",
// 编辑后的操作,包含工具名称和参数
editedAction: {
// 要调用的工具名称。
// 通常与原始操作相同。
name: "new_tool_name",
// 传递给工具的参数。
args: { key1: "new_value", key2: "original_value" },
}
}
]
}
}),
config // 相同的线程 ID 以恢复暂停的对话
);
当编辑工具参数时,请保守地进行更改。对原始参数的重大修改可能导致模型重新评估其方法,并可能多次执行工具或采取意外操作。
使用 reject 拒绝工具调用并提供反馈,而不是执行。await agent.invoke(
new Command({
// 决策以列表形式提供,每个审查操作一个。
// 决策的顺序必须与中断请求中操作的顺序匹配。
resume: {
decisions: [
{
type: "reject",
// 关于操作被拒绝原因的解释
message: "不,这是错误的,因为...,相反应该这样做...",
}
]
}
}),
config // 相同的线程 ID 以恢复暂停的对话
);
message 被添加到对话中作为反馈,以帮助代理理解操作被拒绝的原因以及应该做什么。
多个决策
当有多个操作需要审查时,为每个操作提供一个决策,顺序与中断中出现的顺序相同:{
decisions: [
{ type: "approve" },
{
type: "edit",
editedAction: {
name: "tool_name",
args: { param: "new_value" }
}
},
{
type: "reject",
message: "此操作不被允许"
}
]
}
使用人机回环进行流式传输
您可以使用 stream() 代替 invoke(),以便在代理运行和处理中断时获取实时更新。使用 stream_mode=['updates', 'messages'] 和 version="v2" 来以统一的 v2 格式流式传输代理进度和 LLM 令牌。
import { Command } from "@langchain/langgraph";
const config = { configurable: { thread_id: "some_id" } };
// 流式传输代理进度和 LLM 令牌直到中断
for await (const [mode, chunk] of await agent.stream(
{ messages: [{ role: "user", content: "从数据库中删除旧记录" }] },
{ ...config, streamMode: ["updates", "messages"] }
)) {
if (mode === "messages") {
// LLM 令牌
const [token, metadata] = chunk;
if (token.content) {
process.stdout.write(token.content);
}
} else if (mode === "updates") {
// 检查中断
if ("__interrupt__" in chunk) {
console.log(`\n\n中断: ${JSON.stringify(chunk.__interrupt__)}`);
}
}
}
// 人工决策后使用流式传输恢复
for await (const [mode, chunk] of await agent.stream(
new Command({ resume: { decisions: [{ type: "approve" }] } }),
{ ...config, streamMode: ["updates", "messages"] }
)) {
if (mode === "messages") {
const [token, metadata] = chunk;
if (token.content) {
process.stdout.write(token.content);
}
}
}
有关流模式的更多详细信息,请参阅流式传输指南。
执行生命周期
中间件定义了一个 after_model 钩子,该钩子在模型生成响应后但在任何工具调用执行前运行:
- 代理调用模型以生成响应。
- 中间件检查响应中的工具调用。
- 如果任何调用需要人工输入,中间件会构建一个带有
action_requests 和 review_configs 的 HITLRequest,并调用中断。
- 代理等待人工决策。
- 基于
HITLResponse 决策,中间件执行批准或编辑的调用,为拒绝的调用合成ToolMessage,并恢复执行。
自定义 HITL 逻辑
对于更专业的工作流,您可以直接使用中断 原语和中间件 抽象构建自定义 HITL 逻辑。
查看上面的执行生命周期以了解如何将中断集成到代理操作中。