当您使用 LangGraph 构建智能体时,首先会将其分解为离散的步骤,称为节点。然后,您将描述每个节点的不同决策和转换。最后,您通过一个共享的状态将节点连接起来,每个节点都可以从中读取和写入。
在本指南中,我们将引导您完成使用 LangGraph 构建客户支持邮件智能体的思维过程。
从您想要自动化的过程开始
想象您需要构建一个处理客户支持邮件的 AI 智能体。您的产品团队给出了以下要求:
智能体应:
- 读取收到的客户邮件
- 按紧急程度和主题分类
- 搜索相关文档以回答问题
- 起草适当的回复
- 将复杂问题升级给人工坐席
- 在需要时安排后续跟进
需处理的示例场景:
1. 简单的产品问题:"如何重置我的密码?"
2. 错误报告:"当我选择 PDF 格式时,导出功能会崩溃"
3. 紧急计费问题:"我的订阅被重复收费了!"
4. 功能请求:"能否在移动应用中添加深色模式?"
5. 复杂技术问题:"我们的 API 集成偶尔失败,出现 504 错误"
要在 LangGraph 中实现智能体,您通常需要遵循以下五个步骤。
步骤 1:将工作流程映射为离散步骤
首先识别流程中的不同步骤。每个步骤将成为一个节点(执行特定功能的函数)。然后,草拟这些步骤如何相互连接。
此图中的箭头显示了可能的路径,但实际选择哪条路径的决策发生在每个节点内部。
现在我们已经识别了工作流程中的组件,让我们了解每个节点需要做什么:
读取邮件:提取和解析邮件内容
分类意图:使用 LLM 对紧急程度和主题进行分类,然后路由到适当的操作
文档搜索:查询您的知识库以获取相关信息
错误跟踪:在跟踪系统中创建或更新问题
起草回复:生成适当的回复
人工审核:升级给人工坐席以进行批准或处理
发送回复:发送邮件回复
请注意,有些节点会决定下一步去哪里(分类意图、起草回复、人工审核),而其他节点总是进入相同的下一步(读取邮件 总是进入 分类意图,文档搜索 总是进入 起草回复)。
步骤 2:确定每个步骤需要做什么
对于图中的每个节点,确定它代表什么类型的操作以及它需要什么上下文才能正常工作。
LLM 步骤
当您需要理解、分析、生成文本或做出推理决策时使用
LLM 步骤
当一个步骤需要理解、分析、生成文本或做出推理决策时:
- 静态上下文(提示):分类类别、紧急程度定义、回复格式
- 动态上下文(来自状态):邮件内容、发件人信息
- 期望结果:确定路由的结构化分类
- 静态上下文(提示):语气指南、公司政策、回复模板
- 动态上下文(来自状态):分类结果、搜索结果、客户历史
- 期望结果:可供审核的专业邮件回复
数据步骤
当一个步骤需要从外部源检索信息时:
- 参数:根据意图和主题构建的查询
- 重试策略:是,对临时故障使用指数退避
- 缓存:可以缓存常见查询以减少 API 调用
- 参数:来自状态的客户电子邮件或 ID
- 重试策略:是,但如果不可用则回退到基本信息
- 缓存:是,使用生存时间以平衡新鲜度和性能
操作步骤
当一个步骤需要执行外部操作时:
- 何时执行节点:批准后(人工或自动)
- 重试策略:是,对网络问题使用指数退避
- 不应缓存:每次发送都是唯一操作
- 何时执行节点:当意图是“错误”时始终执行
- 重试策略:是,对于不丢失错误报告至关重要
- 返回:包含在回复中的工单 ID
用户输入步骤
当一个步骤需要人工干预时:
- 决策上下文:原始邮件、草稿回复、紧急程度、分类
- 期望输入格式:批准布尔值加上可选的编辑回复
- 触发时机:高紧急程度、复杂问题或质量问题
步骤 3:设计您的状态
状态是智能体中所有节点可访问的共享内存。可以将其视为智能体用来跟踪其在处理过程中学习和决定的一切内容的笔记本。
什么属于状态?
关于每个数据片段,问自己这些问题:
包含在状态中
它是否需要跨步骤持久化?如果是,则放入状态。
不存储
您能否从其他数据派生它?如果是,则在需要时计算而不是存储在状态中。
对于我们的邮件智能体,我们需要跟踪:
- 原始邮件和发件人信息(以后无法重建)
- 分类结果(后续/下游节点需要)
- 搜索结果和客户数据(重新获取成本高)
- 草稿回复(需要在审核过程中持久化)
- 执行元数据(用于调试和恢复)
保持状态原始,按需格式化提示
一个关键原则:您的状态应存储原始数据,而不是格式化文本。在节点内部需要时格式化提示。
这种分离意味着:
- 不同节点可以根据需要以不同方式格式化相同数据
- 您可以更改提示模板而无需修改状态模式
- 调试更清晰——您可以看到每个节点接收的确切数据
- 您的智能体可以演进而不会破坏现有状态
让我们定义我们的状态:
import { StateSchema } from "@langchain/langgraph";
import * as z from "zod";
// 定义邮件分类的结构
const EmailClassificationSchema = z.object({
intent: z.enum(["question", "bug", "billing", "feature", "complex"]),
urgency: z.enum(["low", "medium", "high", "critical"]),
topic: z.string(),
summary: z.string(),
});
const EmailAgentState = new StateSchema({
// 原始邮件数据
emailContent: z.string(),
senderEmail: z.string(),
emailId: z.string(),
// 分类结果
classification: EmailClassificationSchema.optional(),
// 原始搜索/API 结果
searchResults: z.array(z.string()).optional(), // 原始文档块列表
customerHistory: z.record(z.string(), z.any()).optional(), // 来自 CRM 的原始客户数据
// 生成的内容
responseText: z.string().optional(),
});
type EmailClassificationType = z.infer<typeof EmailClassificationSchema>;
请注意,状态仅包含原始数据——没有提示模板、没有格式化字符串、没有指令。分类输出作为单个字典存储,直接来自 LLM。
步骤 4:构建您的节点
现在我们将每个步骤实现为一个函数。LangGraph 中的节点只是一个 JavaScript 函数,它接受当前状态并返回对其的更新。
适当处理错误
不同的错误需要不同的处理策略:
| 错误类型 | 修复者 | 策略 | 使用时机 |
|---|
| 临时错误(网络问题、速率限制) | 系统(自动) | 重试策略 | 通常重试即可解决的临时故障 |
| LLM 可恢复错误(工具故障、解析问题) | LLM | 将错误存储在状态中并循环返回 | LLM 可以看到错误并调整其方法 |
| 用户可修复错误(缺少信息、指令不明确) | 人工 | 使用 interrupt() 暂停 | 需要用户输入才能继续 |
| 意外错误 | 开发人员 | 让它们冒泡 | 需要调试的未知问题 |
添加重试策略以自动重试网络问题和速率限制:import type { RetryPolicy } from "@langchain/langgraph";
workflow.addNode(
"searchDocumentation",
searchDocumentation,
{
retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
},
);
将错误存储在状态中并循环返回,以便 LLM 可以看到哪里出错并重试:import { Command, GraphNode } from "@langchain/langgraph";
const executeTool: GraphNode<typeof State> = async (state, config) => {
try {
const result = await runTool(state.toolCall);
return new Command({
update: { toolResult: result },
goto: "agent",
});
} catch (error) {
// 让 LLM 看到哪里出错并重试
return new Command({
update: { toolResult: `Tool error: ${error}` },
goto: "agent"
});
}
}
在需要时暂停并从用户收集信息(如账户 ID、订单号或澄清):import { Command, GraphNode, interrupt } from "@langchain/langgraph";
const lookupCustomerHistory: GraphNode<typeof State> = async (state, config) => {
if (!state.customerId) {
const userInput = interrupt({
message: "需要客户 ID",
request: "请提供客户的账户 ID 以查询其订阅历史",
});
return new Command({
update: { customerId: userInput.customerId },
goto: "lookupCustomerHistory",
});
}
// 现在继续查询
const customerData = await fetchCustomerHistory(state.customerId);
return new Command({
update: { customerHistory: customerData },
goto: "draftResponse",
});
}
让它们冒泡以进行调试。不要捕获无法处理的内容:import { Command, GraphNode } from "@langchain/langgraph";
const sendReply: GraphNode<typeof EmailAgentState> = async (state, config) => {
try {
await emailService.send(state.responseText);
} catch (error) {
throw error; // 显现意外错误
}
}
实现我们的邮件智能体节点
我们将每个节点实现为一个简单的函数。记住:节点接受状态,执行工作,并返回更新。
import { StateGraph, START, END, GraphNode, Command } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
import { ChatAnthropic } from "@langchain/anthropic";
const llm = new ChatAnthropic({ model: "claude-sonnet-4-6" });
const readEmail: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 提取和解析邮件内容
// 在生产环境中,这将连接到您的邮件服务
console.log(`处理邮件:${state.emailContent}`);
return {};
}
const classifyIntent: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 使用 LLM 对邮件意图和紧急程度进行分类,然后相应路由
// 创建返回 EmailClassification 对象的结构化 LLM
const structuredLlm = llm.withStructuredOutput(EmailClassificationSchema);
// 按需格式化提示,不存储在状态中
const classificationPrompt = `
分析此客户邮件并进行分类:
邮件:${state.emailContent}
来自:${state.senderEmail}
提供分类,包括意图、紧急程度、主题和摘要。
`;
// 直接获取结构化响应作为对象
const classification = await structuredLlm.invoke(classificationPrompt);
// 根据分类确定下一个节点
let nextNode: "searchDocumentation" | "humanReview" | "draftResponse" | "bugTracking";
if (classification.intent === "billing" || classification.urgency === "critical") {
nextNode = "humanReview";
} else if (classification.intent === "question" || classification.intent === "feature") {
nextNode = "searchDocumentation";
} else if (classification.intent === "bug") {
nextNode = "bugTracking";
} else {
nextNode = "draftResponse";
}
// 将分类作为单个对象存储在状态中
return new Command({
update: { classification },
goto: nextNode,
});
}
import { Command, GraphNode } from "@langchain/langgraph";
const searchDocumentation: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 搜索知识库以获取相关信息
// 根据分类构建搜索查询
const classification = state.classification!;
const query = `${classification.intent} ${classification.topic}`;
let searchResults: string[];
try {
// 在此处实现您的搜索逻辑
// 存储原始搜索结果,而不是格式化文本
searchResults = [
"通过设置 > 安全 > 更改密码重置密码",
"密码必须至少 12 个字符",
"包括大写、小写、数字和符号",
];
} catch (error) {
// 对于可恢复的搜索错误,存储错误并继续
searchResults = [`搜索暂时不可用:${error}`];
}
return new Command({
update: { searchResults }, // 存储原始结果或错误
goto: "draftResponse",
});
}
const bugTracking: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 创建或更新错误跟踪工单
// 在您的错误跟踪系统中创建工单
const ticketId = "BUG-12345"; // 将通过 API 创建
return new Command({
update: { searchResults: [`错误工单 ${ticketId} 已创建`] },
goto: "draftResponse",
});
}
import { Command, interrupt } from "@langchain/langgraph";
const draftResponse: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 使用上下文生成回复并根据质量路由
const classification = state.classification!;
// 按需从原始状态数据格式化上下文
const contextSections: string[] = [];
if (state.searchResults) {
// 为提示格式化搜索结果
const formattedDocs = state.searchResults.map(doc => `- ${doc}`).join("\n");
contextSections.push(`相关文档:\n${formattedDocs}`);
}
if (state.customerHistory) {
// 为提示格式化客户数据
contextSections.push(`客户层级:${state.customerHistory.tier ?? "标准"}`);
}
// 使用格式化上下文构建提示
const draftPrompt = `
起草对此客户邮件的回复:
${state.emailContent}
邮件意图:${classification.intent}
紧急程度:${classification.urgency}
${contextSections.join("\n\n")}
指南:
- 保持专业和乐于助人
- 解决他们的具体问题
- 在相关时使用提供的文档
`;
const response = await llm.invoke([new HumanMessage(draftPrompt)]);
// 根据紧急程度和意图确定是否需要人工审核
const needsReview = (
classification.urgency === "high" ||
classification.urgency === "critical" ||
classification.intent === "complex"
);
// 路由到适当的下一个节点
const nextNode = needsReview ? "humanReview" : "sendReply";
return new Command({
update: { responseText: response.content.toString() }, // 仅存储原始回复
goto: nextNode,
});
}
const humanReview: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 使用 interrupt 暂停以进行人工审核,并根据决策路由
const classification = state.classification!;
// interrupt() 必须首先出现 - 之前的任何代码在恢复时都会重新运行
const humanDecision = interrupt({
emailId: state.emailId,
originalEmail: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
intent: classification.intent,
action: "请审核并批准/编辑此回复",
});
// 现在处理人工决策
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
// 拒绝意味着人工将直接处理
return new Command({ update: {}, goto: END });
}
}
const sendReply: GraphNode<typeof EmailAgentState> = async (state, config) => {
// 发送邮件回复
// 与邮件服务集成
console.log(`发送回复:${state.responseText!.substring(0, 100)}...`);
return {};
}
步骤 5:将其连接在一起
现在我们将节点连接成一个工作图。由于我们的节点处理自己的路由决策,我们只需要一些基本的边。
要启用带 interrupt() 的人机协作,我们需要使用检查点进行编译,以在运行之间保存状态:
import { MemorySaver, RetryPolicy } from "@langchain/langgraph";
// 创建图
const workflow = new StateGraph(EmailAgentState)
// 添加具有适当错误处理的节点
.addNode("readEmail", readEmail)
.addNode("classifyIntent", classifyIntent)
// 为可能具有临时故障的节点添加重试策略
.addNode(
"searchDocumentation",
searchDocumentation,
{ retryPolicy: { maxAttempts: 3 } },
)
.addNode("bugTracking", bugTracking)
.addNode("draftResponse", draftResponse)
.addNode("humanReview", humanReview)
.addNode("sendReply", sendReply)
// 仅添加必要的边
.addEdge(START, "readEmail")
.addEdge("readEmail", "classifyIntent")
.addEdge("sendReply", END);
// 使用检查点进行编译以实现持久化
const memory = new MemorySaver();
const app = workflow.compile({ checkpointer: memory });
图结构是最小的,因为路由通过 Command 对象在节点内部发生。每个节点声明它可以去的地方,使流程明确且可追踪。
试用您的智能体
让我们使用一个需要人工审核的紧急计费问题来运行我们的智能体:
// 使用紧急计费问题进行测试
const initialState: EmailAgentStateType = {
emailContent: "我的订阅被重复收费了!这很紧急!",
senderEmail: "customer@example.com",
emailId: "email_123"
};
// 使用 thread_id 运行以实现持久化
const config = { configurable: { thread_id: "customer_123" } };
const result = await app.invoke(initialState, config);
// 图将在 human_review 处暂停
console.log(`草稿已准备好审核:${result.responseText?.substring(0, 100)}...`);
import { Command } from "@langchain/langgraph";
// 准备好后,提供人工输入以恢复
const humanResponse = new Command({
resume: {
approved: true,
editedResponse: "我们为重复收费深表歉意。我已启动立即退款...",
}
});
// 恢复执行
const finalResult = await app.invoke(humanResponse, config);
console.log("邮件发送成功!");
当图遇到 interrupt() 时会暂停,将所有内容保存到检查点并等待。它可以几天后恢复,从离开的地方继续。thread_id 确保此对话的所有状态都一起保存。
总结和后续步骤
关键见解
构建此邮件智能体向我们展示了 LangGraph 的思维方式:
分解为离散步骤
每个节点都做好一件事。这种分解支持流式进度更新、可暂停和恢复的持久执行,以及清晰的调试,因为您可以在步骤之间检查状态。
状态是共享内存
存储原始数据,而不是格式化文本。这允许不同节点以不同方式使用相同信息。
节点是函数
它们接受状态、执行工作并返回更新。当它们需要做出路由决策时,它们会指定状态更新和下一个目标。
错误是流程的一部分
临时故障会重试,LLM 可恢复错误会循环返回上下文,用户可修复问题会暂停等待输入,意外错误会冒泡以进行调试。
人工输入是一等公民
interrupt() 函数会无限期暂停执行,保存所有状态,并在您提供输入时从离开的地方恢复。当与节点中的其他操作结合使用时,它必须首先出现。
图结构自然出现
您定义基本连接,您的节点处理自己的路由逻辑。这使控制流明确且可追踪——您总是可以通过查看当前节点来了解智能体下一步要做什么。
高级考虑因素
本节探讨节点粒度设计中的权衡。大多数应用程序可以跳过此部分并使用上面显示的模式。
您可能想知道:为什么不将 读取邮件 和 分类意图 合并到一个节点中?或者为什么将文档搜索与起草回复分开?答案涉及弹性和可观察性之间的权衡。弹性考虑: LangGraph 的持久执行在节点边界创建检查点。当工作流在中断或故障后恢复时,它会从执行停止的节点开头开始。较小的节点意味着更频繁的检查点,这意味着如果出现问题需要重复的工作更少。如果您将多个操作合并到一个大节点中,靠近末尾的故障意味着从该节点开头重新执行所有内容。我们为邮件智能体选择此分解的原因:
-
外部服务隔离: 文档搜索和错误跟踪是单独的节点,因为它们调用外部 API。如果搜索服务缓慢或失败,我们希望将其与 LLM 调用隔离。我们可以向这些特定节点添加重试策略,而不会影响其他节点。
-
中间可见性: 将
分类意图 作为其自己的节点,让我们可以在采取行动之前检查 LLM 的决定。这对于调试和监控很有价值——您可以看到智能体何时以及为何路由到人工审核。
-
不同的故障模式: LLM 调用、数据库查找和邮件发送具有不同的重试策略。单独的节点允许您独立配置这些。
-
可重用性和测试: 较小的节点更容易单独测试并在其他工作流中重用。
另一种有效的方法:您可以将 读取邮件 和 分类意图 合并到一个节点中。您将失去在分类前检查原始邮件的能力,并且在该节点中的任何故障都会重复这两个操作。对于大多数应用程序,单独节点的可观察性和调试优势值得权衡。应用程序级问题:步骤 2 中的缓存讨论(是否缓存搜索结果)是应用程序级决策,而不是 LangGraph 框架功能。您根据特定需求在节点函数内实现缓存——LangGraph 不规定这一点。性能考虑:更多节点并不意味着执行速度更慢。LangGraph 默认在后台写入检查点(异步持久模式),因此您的图会继续运行,而无需等待检查点完成。这意味着您可以在最小性能影响下获得频繁的检查点。如果需要,您可以调整此行为——使用 "exit" 模式仅在完成时检查点,或使用 "sync" 模式在每次检查点写入时阻止执行。
后续步骤
这是关于使用 LangGraph 构建智能体的思维介绍。您可以通过以下方式扩展此基础:
人机协作模式
学习如何在执行前添加工具批准、批量批准和其他模式
可观察性
使用 LangSmith 添加可观察性以进行调试和监控
工具集成
集成更多工具以进行网络搜索、数据库查询和 API 调用