"""
多源知识路由器示例
此示例演示了多智能体系统的路由器模式。
路由器对查询进行分类,将它们并行路由到专门的智能体,
并将结果合成为一个组合响应。
"""
import operator
from typing import Annotated, Literal, TypedDict
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from pydantic import BaseModel, Field
# 状态定义
class AgentInput(TypedDict):
"""每个子智能体的简单输入状态。"""
query: str
class AgentOutput(TypedDict):
"""每个子智能体的输出。"""
source: str
result: str
class Classification(TypedDict):
"""单个路由决策:使用什么查询调用哪个智能体。"""
source: Literal["github", "notion", "slack"]
query: str
class RouterState(TypedDict):
query: str
classifications: list[Classification]
results: Annotated[list[AgentOutput], operator.add]
final_answer: str
# 分类器的结构化输出模式
class ClassificationResult(BaseModel):
"""将用户查询分类为特定于智能体的子问题的结果。"""
classifications: list[Classification] = Field(
description="要调用的智能体列表及其目标子问题"
)
# 工具
@tool
def search_code(query: str, repo: str = "main") -> str:
"""在 GitHub 仓库中搜索代码。"""
return f"在 {repo} 中找到匹配 '{query}' 的代码:src/auth.py 中的身份验证中间件"
@tool
def search_issues(query: str) -> str:
"""搜索 GitHub 问题和拉取请求。"""
return f"找到 3 个匹配 '{query}' 的问题:#142(API 身份验证文档)、#89(OAuth 流程)、#203(令牌刷新)"
@tool
def search_prs(query: str) -> str:
"""搜索拉取请求以获取实现细节。"""
return f"PR #156 添加了 JWT 身份验证,PR #178 更新了 OAuth 范围"
@tool
def search_notion(query: str) -> str:
"""在 Notion 工作区中搜索文档。"""
return f"找到文档:'API 身份验证指南' - 涵盖 OAuth2 流程、API 密钥和 JWT 令牌"
@tool
def get_page(page_id: str) -> str:
"""通过 ID 获取特定的 Notion 页面。"""
return f"页面内容:分步身份验证设置说明"
@tool
def search_slack(query: str) -> str:
"""搜索 Slack 消息和讨论串。"""
return f"在 #engineering 中找到讨论:'使用 Bearer 令牌进行 API 身份验证,请参阅文档了解刷新流程'"
@tool
def get_thread(thread_id: str) -> str:
"""获取特定的 Slack 讨论串。"""
return f"讨论串讨论了 API 密钥轮换的最佳实践"
# 模型和智能体
model = init_chat_model("openai:gpt-4.1")
router_llm = init_chat_model("openai:gpt-4.1-mini")
github_agent = create_agent(
model,
tools=[search_code, search_issues, search_prs],
system_prompt=(
"您是 GitHub 专家。通过搜索仓库、问题和拉取请求,"
"回答有关代码、API 参考和实现细节的问题。"
),
)
notion_agent = create_agent(
model,
tools=[search_notion, get_page],
system_prompt=(
"您是 Notion 专家。通过搜索组织的 Notion 工作区,"
"回答有关内部流程、政策和团队文档的问题。"
),
)
slack_agent = create_agent(
model,
tools=[search_slack, get_thread],
system_prompt=(
"您是 Slack 专家。通过搜索相关讨论串和对话来回答问题,"
"团队成员在这些地方分享了知识和解决方案。"
),
)
# 工作流节点
def classify_query(state: RouterState) -> dict:
"""分类查询并确定要调用哪些智能体。"""
structured_llm = router_llm.with_structured_output(ClassificationResult)
result = structured_llm.invoke([
{
"role": "system",
"content": """分析此查询并确定要咨询哪些知识库。
对于每个相关源,生成针对该源优化的目标子问题。
可用源:
- github:代码、API 参考、实现细节、问题、拉取请求
- notion:内部文档、流程、政策、团队维基
- slack:团队讨论、非正式知识共享、最近的对话
仅返回与查询相关的源。"""
},
{"role": "user", "content": state["query"]}
])
return {"classifications": result.classifications}
def route_to_agents(state: RouterState) -> list[Send]:
"""根据分类分发到智能体。"""
return [
Send(c["source"], {"query": c["query"]})
for c in state["classifications"]
]
def query_github(state: AgentInput) -> dict:
"""查询 GitHub 智能体。"""
result = github_agent.invoke({
"messages": [{"role": "user", "content": state["query"]}]
})
return {"results": [{"source": "github", "result": result["messages"][-1].content}]}
def query_notion(state: AgentInput) -> dict:
"""查询 Notion 智能体。"""
result = notion_agent.invoke({
"messages": [{"role": "user", "content": state["query"]}]
})
return {"results": [{"source": "notion", "result": result["messages"][-1].content}]}
def query_slack(state: AgentInput) -> dict:
"""查询 Slack 智能体。"""
result = slack_agent.invoke({
"messages": [{"role": "user", "content": state["query"]}]
})
return {"results": [{"source": "slack", "result": result["messages"][-1].content}]}
def synthesize_results(state: RouterState) -> dict:
"""将所有智能体的结果组合成一个连贯的答案。"""
if not state["results"]:
return {"final_answer": "未从任何知识源找到结果。"}
formatted = [
f"**来自 {r['source'].title()}:**\n{r['result']}"
for r in state["results"]
]
synthesis_response = router_llm.invoke([
{
"role": "system",
"content": f"""合成这些搜索结果以回答原始问题:"{state['query']}"
- 组合来自多个源的信息,避免冗余
- 突出最相关和可操作的信息
- 注意源之间的任何差异
- 保持响应简洁且组织良好"""
},
{"role": "user", "content": "\n\n".join(formatted)}
])
return {"final_answer": synthesis_response.content}
# 构建工作流
workflow = (
StateGraph(RouterState)
.add_node("classify", classify_query)
.add_node("github", query_github)
.add_node("notion", query_notion)
.add_node("slack", query_slack)
.add_node("synthesize", synthesize_results)
.add_edge(START, "classify")
.add_conditional_edges("classify", route_to_agents, ["github", "notion", "slack"])
.add_edge("github", "synthesize")
.add_edge("notion", "synthesize")
.add_edge("slack", "synthesize")
.add_edge("synthesize", END)
.compile()
)
if __name__ == "__main__":
result = workflow.invoke({
"query": "How do I authenticate API requests?"
})
print("Original query:", result["query"])
print("\nClassifications:")
for c in result["classifications"]:
print(f" {c['source']}: {c['query']}")
print("\n" + "=" * 60 + "\n")
print("Final Answer:")
print(result["final_answer"])