Skip to main content
当你用LangGraph构建智能体时,你会首先将其分解为称为节点的离散步骤。然后,你将描述每个节点的不同决策和转换。最后,你通过一个共享的状态将节点连接起来,每个节点都可以从该状态读取和写入。 在本指南中,我们将引导你了解使用LangGraph构建客户支持邮件智能体的思考过程。

从你想要自动化的过程开始

假设你需要构建一个处理客户支持邮件的AI智能体。你的产品团队给你提供了以下要求:
智能体应该:

- 读取传入的客户邮件
- 按紧急程度和主题对它们进行分类
- 搜索相关文档以回答问题
- 起草适当的回复
- 将复杂问题升级给人工客服
- 在需要时安排后续跟进

要处理的示例场景:

1. 简单的产品问题:"我如何重置密码?"
2. 错误报告:"当我选择PDF格式时,导出功能会崩溃"
3. 紧急的账单问题:"我的订阅被重复扣费了!"
4. 功能请求:"你能为移动应用添加深色模式吗?"
5. 复杂的技术问题:"我们的API集成间歇性地出现504错误"
要在LangGraph中实现一个智能体,你通常会遵循相同的五个步骤。

步骤1:将你的工作流映射为离散步骤

首先识别你流程中的不同步骤。每个步骤将成为一个节点(一个执行特定功能的函数)。然后,草拟这些步骤如何相互连接。 此图中的箭头显示了可能的路径,但实际选择哪条路径的决策发生在每个节点内部。 现在我们已经识别了工作流中的组件,让我们了解每个节点需要做什么:
  • 读取邮件:提取和解析邮件内容
  • 分类意图:使用LLM对紧急程度和主题进行分类,然后路由到适当的操作
  • 文档搜索:查询你的知识库以获取相关信息
  • 错误跟踪:在跟踪系统中创建或更新问题
  • 起草回复:生成适当的回复
  • 人工审核:升级给人工客服进行批准或处理
  • 发送回复:发送邮件回复
注意,一些节点决定下一步去哪里(分类意图起草回复人工审核),而其他节点总是进行到相同的下一步(读取邮件总是到分类意图文档搜索总是到起草回复)。

步骤2:识别每个步骤需要做什么

对于图中的每个节点,确定它代表什么类型的操作以及它需要什么上下文才能正常工作。

LLM步骤

当你需要理解、分析、生成文本或进行推理决策时使用

数据步骤

当你需要从外部源检索信息时使用

操作步骤

当你需要执行外部操作时使用

用户输入步骤

当你需要人工干预时使用

LLM步骤

当一个步骤需要理解、分析、生成文本或进行推理决策时:
  • 静态上下文(提示):分类类别、紧急程度定义、回复格式
  • 动态上下文(来自状态):邮件内容、发件人信息
  • 期望结果:决定路由的结构化分类
  • 静态上下文(提示):语气指南、公司政策、回复模板
  • 动态上下文(来自状态):分类结果、搜索结果、客户历史
  • 期望结果:可供审核的专业邮件回复

数据步骤

当一个步骤需要从外部源检索信息时:
  • 参数:根据意图和主题构建的查询
  • 重试策略:是,对于瞬时故障使用指数退避
  • 缓存:可以缓存常见查询以减少API调用
  • 参数:来自状态的客户电子邮件或ID
  • 重试策略:是,但如果不可用则回退到基本信息
  • 缓存:是,使用生存时间来平衡新鲜度和性能

操作步骤

当一个步骤需要执行外部操作时:
  • 何时执行节点:批准后(人工或自动)
  • 重试策略:是,对于网络问题使用指数退避
  • 不应缓存:每次发送都是一个独特的操作
  • 何时执行节点:当意图为”错误”时总是执行
  • 重试策略:是,关键在于不丢失错误报告
  • 返回:要包含在回复中的工单ID

用户输入步骤

当一个步骤需要人工干预时:
  • 决策上下文:原始邮件、草稿回复、紧急程度、分类 - 期望输入格式:批准布尔值加上可选的编辑后回复 - 何时触发:高紧急程度、复杂问题或质量问题

步骤3:设计你的状态

状态是智能体中所有节点可访问的共享内存。可以将其视为你的智能体用来跟踪其在处理过程中学习和决定的所有内容的笔记本。

什么属于状态?

对每条数据问自己这些问题:

包含在状态中

它需要跨步骤持久化吗?如果是,就放入状态中。

不要存储

你能从其他数据推导出它吗?如果是,就在需要时计算它,而不是存储在状态中。
对于我们的邮件智能体,我们需要跟踪:
  • 原始邮件和发件人信息(以后无法重建)
  • 分类结果(多个后续/下游节点需要)
  • 搜索结果和客户数据(重新获取成本高)
  • 草稿回复(需要在审核过程中持久化)
  • 执行元数据(用于调试和恢复)

保持状态原始,按需格式化提示

一个关键原则:你的状态应该存储原始数据,而不是格式化的文本。在需要时在节点内格式化提示。
这种分离意味着:
  • 不同的节点可以根据自己的需求以不同方式格式化相同的数据
  • 你可以更改提示模板而无需修改状态模式
  • 调试更清晰——你可以准确看到每个节点接收了什么数据
  • 你的智能体可以在不破坏现有状态的情况下演进
让我们定义我们的状态:
from typing import TypedDict, Literal

# 定义邮件分类的结构
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

class EmailAgentState(TypedDict):
    # 原始邮件数据
    email_content: str
    sender_email: str
    email_id: str

    # 分类结果
    classification: EmailClassification | None

    # 原始搜索/API结果
    search_results: list[str] | None  # 原始文档块列表
    customer_history: dict | None  # 来自CRM的原始客户数据

    # 生成的内容
    draft_response: str | None
    messages: list[str] | None
注意状态只包含原始数据——没有提示模板、没有格式化字符串、没有指令。分类输出作为单个字典存储,直接来自LLM。

步骤4:构建你的节点

现在我们将每个步骤实现为一个函数。LangGraph中的节点只是一个Python函数,它接受当前状态并返回对其的更新。

适当地处理错误

不同的错误需要不同的处理策略:
错误类型谁修复它策略何时使用
瞬时错误(网络问题、速率限制)系统(自动)重试策略通常重试后会解决的临时故障
LLM可恢复错误(工具故障、解析问题)LLM将错误存储在状态中并循环回去LLM可以看到错误并调整其方法
用户可修复错误(信息缺失、指令不清)人工使用interrupt()暂停需要用户输入才能继续
重试后可恢复的故障开发者(声明式)error_handler在重试耗尽后运行补偿/恢复分支
意外错误开发者让它们冒泡上去需要调试的未知问题
添加重试策略以自动重试网络问题和速率限制。结合timeout=来限制每次尝试。有关完整生命周期,请参阅容错性
from langgraph.types import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

实现我们的邮件智能体节点

我们将每个节点实现为一个简单的函数。记住:节点接受状态,执行工作,并返回更新。
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langchain_openai import ChatOpenAI
from langchain.messages import HumanMessage

llm = ChatOpenAI(model="gpt-5-nano")

def read_email(state: EmailAgentState) -> dict:
    """提取和解析邮件内容"""
    # 在生产环境中,这将连接到你的邮件服务
    return {
        "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")]
    }

def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
    """使用LLM对邮件意图和紧急程度进行分类,然后相应路由"""

    # 创建返回EmailClassification字典的结构化LLM
    structured_llm = llm.with_structured_output(EmailClassification)

    # 按需格式化提示,不存储在状态中
    classification_prompt = f"""
    分析这封客户邮件并对其进行分类:

    邮件:{state['email_content']}
    来自:{state['sender_email']}

    提供包括意图、紧急程度、主题和摘要的分类。
    """

    # 直接获取结构化响应作为字典
    classification = structured_llm.invoke(classification_prompt)

    # 根据分类确定下一个节点
    if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
        goto = "human_review"
    elif classification['intent'] in ['question', 'feature']:
        goto = "search_documentation"
    elif classification['intent'] == 'bug':
        goto = "bug_tracking"
    else:
        goto = "draft_response"

    # 将分类作为单个字典存储在状态中
    return Command(
        update={"classification": classification},
        goto=goto
    )
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """搜索知识库以获取相关信息"""

    # 根据分类构建搜索查询
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # 在这里实现你的搜索逻辑
        # 存储原始搜索结果,而不是格式化文本
        search_results = [
            "通过设置 > 安全 > 更改密码重置密码",
            "密码必须至少12个字符",
            "包括大写、小写字母、数字和符号"
        ]
    except SearchAPIError as e:
        # 对于可恢复的搜索错误,存储错误并继续
        search_results = [f"Search temporarily unavailable: {str(e)}"]

    return Command(
        update={"search_results": search_results},  # 存储原始结果或错误
        goto="draft_response"
    )

def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """创建或更新错误跟踪工单"""

    # 在你的错误跟踪系统中创建工单
    ticket_id = "BUG-12345"  # 将通过API创建

    return Command(
        update={
            "search_results": [f"Bug ticket {ticket_id} created"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    """使用上下文生成回复,并根据质量路由"""

    classification = state.get('classification', {})

    # 按需从原始状态数据格式化上下文
    context_sections = []

    if state.get('search_results'):
        # 为提示格式化搜索结果
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"相关文档:\n{formatted_docs}")

    if state.get('customer_history'):
        # 为提示格式化客户数据
        context_sections.append(f"客户等级:{state['customer_history'].get('tier', 'standard')}")

    # 使用格式化上下文构建提示
    draft_prompt = f"""
    起草对这封客户邮件的回复:
    {state['email_content']}

    邮件意图:{classification.get('intent', 'unknown')}
    紧急程度:{classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    指南:
    - 专业且乐于助人
    - 解决他们的具体问题
    - 在相关时使用提供的文档
    """

    response = llm.invoke(draft_prompt)

    # 根据紧急程度和意图确定是否需要人工审核
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )

    # 路由到适当的下一个节点
    goto = "human_review" if needs_review else "send_reply"

    return Command(
        update={"draft_response": response.content},  # 只存储原始回复
        goto=goto
    )

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    """使用interrupt暂停进行人工审核,并根据决策路由"""

    classification = state.get('classification', {})

    # interrupt()必须首先出现——它之前的任何代码在恢复时都会重新运行
    human_decision = interrupt({
        "email_id": state.get('email_id',''),
        "original_email": state.get('email_content',''),
        "draft_response": state.get('draft_response',''),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "请审核并批准/编辑此回复"
    })

    # 现在处理人工决策
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))},
            goto="send_reply"
        )
    else:
        # 拒绝意味着人工将直接处理
        return Command(update={}, goto=END)

def send_reply(state: EmailAgentState) -> dict:
    """发送邮件回复"""
    # 与邮件服务集成
    print(f"Sending reply: {state['draft_response'][:100]}...")
    return {}

步骤5:将它们连接在一起

现在我们将节点连接成一个工作图。由于我们的节点处理自己的路由决策,我们只需要几个基本边。 要启用使用interrupt()Human in the Loop,我们需要使用检查点编译以在运行之间保存状态:

图编译代码

from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import RetryPolicy

# 创建图
workflow = StateGraph(EmailAgentState)

# 添加具有适当错误处理的节点
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)

# 为可能有瞬时故障的节点添加重试策略
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# 只添加基本边
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# 使用检查点编译以实现持久化,如果使用Local_Server运行图 --> 请在没有检查点的情况下编译
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
图结构是最小的,因为路由通过节点内的Command对象发生。每个节点使用类型提示(如Command[Literal["node1", "node2"]])声明它可以去哪里,使流程明确且可追溯。

试用你的智能体

让我们用一个需要人工审核的紧急账单问题来运行我们的智能体:
# 用紧急账单问题测试
initial_state = {
    "email_content": "我的订阅被重复扣费了!这很紧急!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# 使用thread_id运行以实现持久化
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# 图将在human_review处暂停
print(f"human review interrupt:{result['__interrupt__']}")

# 准备好后,提供人工输入以恢复
from langgraph.types import Command

human_response = Command(
    resume={
        "approved": True,
        "edited_response": "我们对重复扣费深表歉意。我已发起立即退款..."
    }
)

# 恢复执行
final_result = app.invoke(human_response, config)
print(f"邮件发送成功!")
图在遇到interrupt()时暂停,将所有内容保存到检查点,并等待。它可以在几天后恢复,从停止的地方继续。thread_id确保此对话的所有状态都一起保存。

总结和后续步骤

关键见解

构建这个邮件智能体向我们展示了LangGraph的思维方式:

分解为离散步骤

每个节点都做好一件事。这种分解支持流式进度更新、可以暂停和恢复的持久执行,以及清晰的调试,因为你可以检查步骤之间的状态。

状态是共享内存

存储原始数据,而不是格式化文本。这允许不同的节点以不同的方式使用相同的信息。

节点是函数

它们接受状态,执行工作,并返回更新。当它们需要做出路由决策时,它们同时指定状态更新和下一个目的地。

错误是流程的一部分

瞬时故障获得重试,LLM可恢复错误带着上下文循环回去,用户可修复问题暂停等待输入,意外错误冒泡上去以便调试。

人工输入是一等公民

interrupt()函数无限期暂停执行,保存所有状态,并在你提供输入时从停止的地方恢复。当与节点中的其他操作结合使用时,它必须首先出现。

图结构自然涌现

你定义基本连接,你的节点处理自己的路由逻辑。这使控制流明确且可追溯——你总是可以通过查看当前节点来理解你的智能体下一步将做什么。

高级考虑

本节探讨节点粒度设计中的权衡。大多数应用程序可以跳过此部分,使用上面显示的模式。
你可能会想:为什么不将读取邮件分类意图合并为一个节点?或者为什么将文档搜索与起草回复分开?答案涉及弹性和可观察性之间的权衡。弹性考虑: LangGraph的持久执行在节点边界创建检查点。当工作流在中断或故障后恢复时,它从执行停止的节点开头开始。较小的节点意味着更频繁的检查点,这意味着如果出现问题需要重复的工作更少。如果你将多个操作合并到一个大节点中,接近末尾的故障意味着从该节点开头重新执行所有内容。我们为什么为邮件智能体选择这种分解:
  • 外部服务隔离: 文档搜索和错误跟踪是单独的节点,因为它们调用外部API。如果搜索服务缓慢或失败,我们希望将其与LLM调用隔离。我们可以为这些特定节点添加重试策略,而不影响其他节点。
  • 中间可见性:分类意图作为单独的节点让我们可以在采取行动之前检查LLM的决定。这对于调试和监控很有价值——你可以准确看到智能体何时以及为何路由到人工审核。
  • 不同的故障模式: LLM调用、数据库查询和邮件发送具有不同的重试策略。单独的节点允许你独立配置这些。
  • 可重用性和测试: 较小的节点更容易单独测试并在其他工作流中重用。
一种不同的有效方法:你可以将读取邮件分类意图合并为一个节点。你将失去在分类前检查原始邮件的能力,并且在该节点中的任何故障时都会重复这两个操作。对于大多数应用程序,单独节点的可观察性和调试好处值得这种权衡。应用程序级别的考虑:步骤2中的缓存讨论(是否缓存搜索结果)是应用程序级别的决策,而不是LangGraph框架功能。你在节点函数内根据特定需求实现缓存——LangGraph不规定这一点。性能考虑:更多的节点并不意味着更慢的执行。LangGraph默认在后台写入检查点(异步持久性模式),因此你的图继续运行而无需等待检查点完成。这意味着你以最小的性能影响获得频繁的检查点。如果需要,你可以调整此行为——使用"exit"模式仅在完成时检查点,或使用"sync"模式在写入每个检查点之前阻止执行。

从这里去哪里

这是关于用LangGraph思考构建智能体的介绍。你可以用以下内容扩展这个基础:

Human in the Loop模式

学习如何在执行前添加工具批准、批量批准和其他模式

子图

为复杂的多步骤操作创建子图

流式传输

添加流式传输以向用户显示实时进度

可观察性

使用LangSmith添加可观察性以进行调试和监控

工具集成

集成更多工具用于网络搜索、数据库查询和API调用

重试逻辑

为失败操作实现带有指数退避的重试逻辑