The agent should:- Read incoming customer emails- Classify them by urgency and topic- Search relevant documentation to answer questions- Draft appropriate responses- Escalate complex issues to human agents- Schedule follow-ups when neededExample scenarios to handle:1. Simple product question: "How do I reset my password?"2. Bug report: "The export feature crashes when I select PDF format"3. Urgent billing issue: "I was charged twice for my subscription!"4. Feature request: "Can you add dark mode to the mobile app?"5. Complex technical issue: "Our API integration fails intermittently with 504 errors"
Context for decision: Original email, draft response, urgency,
classification - Expected input format: Approval boolean plus optional
edited response - When triggered: High urgency, complex issues, or quality
concerns
from typing import TypedDict, Literal# Define the structure for email classificationclass EmailClassification(TypedDict): intent: Literal["question", "bug", "billing", "feature", "complex"] urgency: Literal["low", "medium", "high", "critical"] topic: str summary: strclass EmailAgentState(TypedDict): # Raw email data email_content: str sender_email: str email_id: str # Classification result classification: EmailClassification | None # Raw search/API results search_results: list[str] | None # List of raw document chunks customer_history: dict | None # Raw customer data from CRM # Generated content draft_response: str | None messages: list[str] | None
Notice that the state contains only raw data – no prompt templates, no formatted strings, no instructions. The classification output is stored as a single dictionary, straight from the LLM.
from langgraph.types import RetryPolicyworkflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0))
将错误存储在状态中并循环返回,这样 LLM 可以看到出了什么问题并重试:
Copy
from langgraph.types import Commanddef execute_tool(state: State) -> Command[Literal["agent", "execute_tool"]]: try: result = run_tool(state['tool_call']) return Command(update={"tool_result": result}, goto="agent") except ToolError as e: # Let the LLM see what went wrong and try again return Command( update={"tool_result": f"Tool error: {str(e)}"}, goto="agent" )
在需要时暂停并从用户处收集信息(如账户 ID、订单号或澄清):
Copy
from langgraph.types import Commanddef lookup_customer_history(state: State) -> Command[Literal["draft_response"]]: if not state.get('customer_id'): user_input = interrupt({ "message": "Customer ID needed", "request": "Please provide the customer's account ID to look up their subscription history" }) return Command( update={"customer_id": user_input['customer_id']}, goto="lookup_customer_history" ) # Now proceed with the lookup customer_data = fetch_customer_history(state['customer_id']) return Command(update={"customer_history": customer_data}, goto="draft_response")
from typing import Literalfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import interrupt, Command, RetryPolicyfrom langchain_openai import ChatOpenAIfrom langchain.messages import HumanMessagellm = ChatOpenAI(model="gpt-5-nano")def read_email(state: EmailAgentState) -> dict: """Extract and parse email content""" # In production, this would connect to your email service return { "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")] }def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]: """Use LLM to classify email intent and urgency, then route accordingly""" # Create structured LLM that returns EmailClassification dict structured_llm = llm.with_structured_output(EmailClassification) # Format the prompt on-demand, not stored in state classification_prompt = f""" Analyze this customer email and classify it: Email: {state['email_content']} From: {state['sender_email']} Provide classification including intent, urgency, topic, and summary. """ # Get structured response directly as dict classification = structured_llm.invoke(classification_prompt) # Determine next node based on classification if classification['intent'] == 'billing' or classification['urgency'] == 'critical': goto = "human_review" elif classification['intent'] in ['question', 'feature']: goto = "search_documentation" elif classification['intent'] == 'bug': goto = "bug_tracking" else: goto = "draft_response" # Store classification as a single dict in state return Command( update={"classification": classification}, goto=goto )
搜索和追踪节点
Copy
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]: """Search knowledge base for relevant information""" # Build search query from classification classification = state.get('classification', {}) query = f"{classification.get('intent', '')} {classification.get('topic', '')}" try: # Implement your search logic here # Store raw search results, not formatted text search_results = [ "Reset password via Settings > Security > Change Password", "Password must be at least 12 characters", "Include uppercase, lowercase, numbers, and symbols" ] except SearchAPIError as e: # For recoverable search errors, store error and continue search_results = [f"Search temporarily unavailable: {str(e)}"] return Command( update={"search_results": search_results}, # Store raw results or error goto="draft_response" )def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]: """Create or update bug tracking ticket""" # Create ticket in your bug tracking system ticket_id = "BUG-12345" # Would be created via API return Command( update={ "search_results": [f"Bug ticket {ticket_id} created"], "current_step": "bug_tracked" }, goto="draft_response" )
响应节点
Copy
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]: """Generate response using context and route based on quality""" classification = state.get('classification', {}) # Format context from raw state data on-demand context_sections = [] if state.get('search_results'): # Format search results for the prompt formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']]) context_sections.append(f"Relevant documentation:\n{formatted_docs}") if state.get('customer_history'): # Format customer data for the prompt context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}") # Build the prompt with formatted context draft_prompt = f""" Draft a response to this customer email: {state['email_content']} Email intent: {classification.get('intent', 'unknown')} Urgency level: {classification.get('urgency', 'medium')} {chr(10).join(context_sections)} Guidelines: - Be professional and helpful - Address their specific concern - Use the provided documentation when relevant """ response = llm.invoke(draft_prompt) # Determine if human review needed based on urgency and intent needs_review = ( classification.get('urgency') in ['high', 'critical'] or classification.get('intent') == 'complex' ) # Route to appropriate next node goto = "human_review" if needs_review else "send_reply" return Command( update={"draft_response": response.content}, # Store only the raw response goto=goto )def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]: """Pause for human review using interrupt and route based on decision""" classification = state.get('classification', {}) # interrupt() must come first - any code before it will re-run on resume human_decision = interrupt({ "email_id": state.get('email_id',''), "original_email": state.get('email_content',''), "draft_response": state.get('draft_response',''), "urgency": classification.get('urgency'), "intent": classification.get('intent'), "action": "Please review and approve/edit this response" }) # Now process the human's decision if human_decision.get("approved"): return Command( update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))}, goto="send_reply" ) else: # Rejection means human will handle directly return Command(update={}, goto=END)def send_reply(state: EmailAgentState) -> dict: """Send the email response""" # Integrate with email service print(f"Sending reply: {state['draft_response'][:100]}...") return {}
from langgraph.checkpoint.memory import MemorySaverfrom langgraph.types import RetryPolicy# Create the graphworkflow = StateGraph(EmailAgentState)# Add nodes with appropriate error handlingworkflow.add_node("read_email", read_email)workflow.add_node("classify_intent", classify_intent)# Add retry policy for nodes that might have transient failuresworkflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3))workflow.add_node("bug_tracking", bug_tracking)workflow.add_node("draft_response", draft_response)workflow.add_node("human_review", human_review)workflow.add_node("send_reply", send_reply)# Add only the essential edgesworkflow.add_edge(START, "read_email")workflow.add_edge("read_email", "classify_intent")workflow.add_edge("send_reply", END)# Compile with checkpointer for persistence, in case run graph with Local_Server --> Please compile without checkpointermemory = MemorySaver()app = workflow.compile(checkpointer=memory)
The graph structure is minimal because routing happens inside nodes through Command objects. Each node declares where it can go using type hints like Command[Literal["node1", "node2"]], making the flow explicit and traceable.
# Test with an urgent billing issueinitial_state = { "email_content": "I was charged twice for my subscription! This is urgent!", "sender_email": "customer@example.com", "email_id": "email_123", "messages": []}# Run with a thread_id for persistenceconfig = {"configurable": {"thread_id": "customer_123"}}result = app.invoke(initial_state, config)# The graph will pause at human_reviewprint(f"human review interrupt:{result['__interrupt__']}")# When ready, provide human input to resumefrom langgraph.types import Commandhuman_response = Command( resume={ "approved": True, "edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund..." })# Resume executionfinal_result = app.invoke(human_response, config)print(f"Email sent successfully!")
The graph pauses when it hits interrupt(), saves everything to the checkpointer, and waits. It can resume days later, picking up exactly where it left off. The thread_id ensures all state for this conversation is preserved together.