概述
路由器模式是一种多智能体架构,其中路由步骤对输入进行分类,并将其定向到专门的智能体,最后将结果合成为一个组合响应。当您的组织知识分布在不同的垂直领域(每个领域都需要自己的智能体,配备专门的工具和提示)时,此模式表现出色。 在本教程中,您将构建一个多源知识库路由器,通过一个真实的企业场景来展示这些优势。该系统将协调三个专家:- GitHub 智能体:搜索代码、问题和拉取请求。
- Notion 智能体:搜索内部文档和维基。
- Slack 智能体:搜索相关讨论和对话。
为什么使用路由器?
路由器模式提供以下优势:- 并行执行:同时查询多个源,与顺序方法相比可减少延迟。
- 专门的智能体:每个垂直领域都有针对其领域优化的专注工具和提示。
- 选择性路由:并非每个查询都需要所有源——路由器会智能地选择相关垂直领域。
- 针对性的子问题:每个智能体都会收到一个为其领域量身定制的问题,从而提高结果质量。
- 清晰的合成:来自多个源的结果被组合成一个单一、连贯的响应。
概念
我们将涵盖以下概念:- 多智能体系统
- 用于工作流编排的 StateGraph
- 用于并行执行的 Send API
路由器与子智能体:子智能体模式也可以路由到多个智能体。当您需要专门的预处理、自定义路由逻辑或想要显式控制并行执行时,请使用路由器模式。当您希望 LLM 动态决定调用哪些智能体时,请使用子智能体模式。
设置
安装
本教程需要langchain 和 langgraph 包:
npm install langchain @langchain/langgraph
LangSmith
设置 LangSmith 以检查智能体内部发生的情况。然后设置以下环境变量:export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."
选择 LLM
从 LangChain 的集成套件中选择一个聊天模型:- OpenAI
- Anthropic
- Azure
- Google Gemini
- Bedrock Converse
👉 阅读 OpenAI 聊天模型集成文档
npm install @langchain/openai
import { initChatModel } from "langchain";
process.env.OPENAI_API_KEY = "your-api-key";
const model = await initChatModel("gpt-5.2");
👉 阅读 Anthropic 聊天模型集成文档
npm install @langchain/anthropic
import { initChatModel } from "langchain";
process.env.ANTHROPIC_API_KEY = "your-api-key";
const model = await initChatModel("claude-sonnet-4-6");
👉 阅读 Azure 聊天模型集成文档
npm install @langchain/azure
import { initChatModel } from "langchain";
process.env.AZURE_OPENAI_API_KEY = "your-api-key";
process.env.AZURE_OPENAI_ENDPOINT = "your-endpoint";
process.env.OPENAI_API_VERSION = "your-api-version";
const model = await initChatModel("azure_openai:gpt-5.2");
👉 阅读 Google GenAI 聊天模型集成文档
npm install @langchain/google-genai
import { initChatModel } from "langchain";
process.env.GOOGLE_API_KEY = "your-api-key";
const model = await initChatModel("google-genai:gemini-2.5-flash-lite");
👉 阅读 AWS Bedrock 聊天模型集成文档
npm install @langchain/aws
import { initChatModel } from "langchain";
// 按照此处步骤配置您的凭据:
// https://docs.aws.amazon.com/bedrock/latest/userguide/getting-started.html
const model = await initChatModel("bedrock:gpt-5.2");
1. 定义状态
首先,定义状态模式。我们使用三种类型:AgentInput:传递给每个子智能体的简单状态(仅查询)AgentOutput:每个子智能体返回的结果(源名称 + 结果)RouterState:跟踪查询、分类、结果和最终答案的主工作流状态
import { StateSchema, ReducedValue } from "@langchain/langgraph";
import { z } from "zod/v4";
const AgentOutput = z.object({
source: z.string(),
result: z.string(),
});
const RouterState = new StateSchema({
query: z.string(),
classifications: z.array(
z.object({
source: z.enum(["github", "notion", "slack"]),
query: z.string(),
})
),
results: new ReducedValue(
z.array(AgentOutput).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
finalAnswer: z.string(),
});
results 字段使用归约器(Python 中的 operator.add,JS 中的 concat 函数)将并行智能体执行的输出收集到单个列表中。
2. 为每个垂直领域定义工具
为每个知识领域创建工具。在生产系统中,这些工具会调用实际的 API。在本教程中,我们使用返回模拟数据的存根实现。我们在 3 个垂直领域定义了 7 个工具:GitHub(搜索代码、问题、PR)、Notion(搜索文档、获取页面)和 Slack(搜索消息、获取讨论串)。import { tool } from "langchain";
import { z } from "zod";
const searchCode = tool(
async ({ query, repo }) => {
return `Found code matching '${query}' in ${repo || "main"}: authentication middleware in src/auth.py`;
},
{
name: "search_code",
description: "Search code in GitHub repositories.",
schema: z.object({
query: z.string(),
repo: z.string().optional().default("main"),
}),
}
);
const searchIssues = tool(
async ({ query }) => {
return `Found 3 issues matching '${query}': #142 (API auth docs), #89 (OAuth flow), #203 (token refresh)`;
},
{
name: "search_issues",
description: "Search GitHub issues and pull requests.",
schema: z.object({
query: z.string(),
}),
}
);
const searchPrs = tool(
async ({ query }) => {
return `PR #156 added JWT authentication, PR #178 updated OAuth scopes`;
},
{
name: "search_prs",
description: "Search pull requests for implementation details.",
schema: z.object({
query: z.string(),
}),
}
);
const searchNotion = tool(
async ({ query }) => {
return `Found documentation: 'API Authentication Guide' - covers OAuth2 flow, API keys, and JWT tokens`;
},
{
name: "search_notion",
description: "Search Notion workspace for documentation.",
schema: z.object({
query: z.string(),
}),
}
);
const getPage = tool(
async ({ pageId }) => {
return `Page content: Step-by-step authentication setup instructions`;
},
{
name: "get_page",
description: "Get a specific Notion page by ID.",
schema: z.object({
pageId: z.string(),
}),
}
);
const searchSlack = tool(
async ({ query }) => {
return `Found discussion in #engineering: 'Use Bearer tokens for API auth, see docs for refresh flow'`;
},
{
name: "search_slack",
description: "Search Slack messages and threads.",
schema: z.object({
query: z.string(),
}),
}
);
const getThread = tool(
async ({ threadId }) => {
return `Thread discusses best practices for API key rotation`;
},
{
name: "get_thread",
description: "Get a specific Slack thread.",
schema: z.object({
threadId: z.string(),
}),
}
);
3. 创建专门的智能体
为每个垂直领域创建一个智能体。每个智能体都有领域特定的工具和针对其知识源优化的提示。所有三个智能体都遵循相同的模式——只有工具和系统提示不同。import { createAgent } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
const llm = new ChatOpenAI({ model: "gpt-4.1" });
const githubAgent = createAgent({
model: llm,
tools: [searchCode, searchIssues, searchPrs],
systemPrompt: `
You are a GitHub expert. Answer questions about code,
API references, and implementation details by searching
repositories, issues, and pull requests.
`.trim(),
});
const notionAgent = createAgent({
model: llm,
tools: [searchNotion, getPage],
systemPrompt: `
You are a Notion expert. Answer questions about internal
processes, policies, and team documentation by searching
the organization's Notion workspace.
`.trim(),
});
const slackAgent = createAgent({
model: llm,
tools: [searchSlack, getThread],
systemPrompt: `
You are a Slack expert. Answer questions by searching
relevant threads and discussions where team members have
shared knowledge and solutions.
`.trim(),
});
4. 构建路由器工作流
现在使用 StateGraph 构建路由器工作流。该工作流有四个主要步骤:- 分类:分析查询并确定要调用哪些智能体以及使用什么子问题
- 路由:使用
Send并行分发到选定的智能体 - 查询智能体:每个智能体接收一个简单的
AgentInput并返回一个AgentOutput - 合成:将收集的结果组合成一个连贯的响应
import { StateGraph, START, END, Send } from "@langchain/langgraph";
import { z } from "zod";
const routerLlm = new ChatOpenAI({ model: "gpt-4.1-mini" });
// Define structured output schema for the classifier
const ClassificationResultSchema = z.object({
classifications: z.array(z.object({
source: z.enum(["github", "notion", "slack"]),
query: z.string(),
})).describe("List of agents to invoke with their targeted sub-questions"),
});
async function classifyQuery(state: typeof RouterState.State) {
const structuredLlm = routerLlm.withStructuredOutput(ClassificationResultSchema);
const result = await structuredLlm.invoke([
{
role: "system",
content: `Analyze this query and determine which knowledge bases to consult.
For each relevant source, generate a targeted sub-question optimized for that source.
Available sources:
- github: Code, API references, implementation details, issues, pull requests
- notion: Internal documentation, processes, policies, team wikis
- slack: Team discussions, informal knowledge sharing, recent conversations
Return ONLY the sources that are relevant to the query. Each source should have
a targeted sub-question optimized for that specific knowledge domain.
Example for "How do I authenticate API requests?":
- github: "What authentication code exists? Search for auth middleware, JWT handling"
- notion: "What authentication documentation exists? Look for API auth guides"
(slack omitted because it's not relevant for this technical question)`
},
{ role: "user", content: state.query }
]);
return { classifications: result.classifications };
}
function routeToAgents(state: typeof RouterState.State): Send[] {
return state.classifications.map(
(c) => new Send(c.source, { query: c.query })
);
}
async function queryGithub(state: AgentInput) {
const result = await githubAgent.invoke({
messages: [{ role: "user", content: state.query }]
});
return { results: [{ source: "github", result: result.messages.at(-1)?.content }] };
}
async function queryNotion(state: AgentInput) {
const result = await notionAgent.invoke({
messages: [{ role: "user", content: state.query }]
});
return { results: [{ source: "notion", result: result.messages.at(-1)?.content }] };
}
async function querySlack(state: AgentInput) {
const result = await slackAgent.invoke({
messages: [{ role: "user", content: state.query }]
});
return { results: [{ source: "slack", result: result.messages.at(-1)?.content }] };
}
async function synthesizeResults(state: typeof RouterState.State) {
if (state.results.length === 0) {
return { finalAnswer: "No results found from any knowledge source." };
}
// Format results for synthesis
const formatted = state.results.map(
(r) => `**From ${r.source.charAt(0).toUpperCase() + r.source.slice(1)}:**\n${r.result}`
);
const synthesisResponse = await routerLlm.invoke([
{
role: "system",
content: `Synthesize these search results to answer the original question: "${state.query}"
- Combine information from multiple sources without redundancy
- Highlight the most relevant and actionable information
- Note any discrepancies between sources
- Keep the response concise and well-organized`
},
{ role: "user", content: formatted.join("\n\n") }
]);
return { finalAnswer: synthesisResponse.content };
}
5. 编译工作流
现在通过连接节点和边来组装工作流。关键是使用add_conditional_edges 和路由函数来实现并行执行:
const workflow = new StateGraph(RouterState)
.addNode("classify", classifyQuery)
.addNode("github", queryGithub)
.addNode("notion", queryNotion)
.addNode("slack", querySlack)
.addNode("synthesize", synthesizeResults)
.addEdge(START, "classify")
.addConditionalEdges("classify", routeToAgents, ["github", "notion", "slack"])
.addEdge("github", "synthesize")
.addEdge("notion", "synthesize")
.addEdge("slack", "synthesize")
.addEdge("synthesize", END)
.compile();
add_conditional_edges 调用通过 route_to_agents 函数将分类节点连接到智能体节点。当 route_to_agents 返回多个 Send 对象时,这些节点会并行执行。
6. 使用路由器
使用跨越多个知识领域的查询测试您的路由器:const result = await workflow.invoke({
query: "How do I authenticate API requests?"
});
console.log("Original query:", result.query);
console.log("\nClassifications:");
for (const c of result.classifications) {
console.log(` ${c.source}: ${c.query}`);
}
console.log("\n" + "=".repeat(60) + "\n");
console.log("Final Answer:");
console.log(result.finalAnswer);
Original query: How do I authenticate API requests?
Classifications:
github: What authentication code exists? Search for auth middleware, JWT handling
notion: What authentication documentation exists? Look for API auth guides
============================================================
Final Answer:
To authenticate API requests, you have several options:
1. **JWT Tokens**: The recommended approach for most use cases.
Implementation details are in `src/auth.py` (PR #156).
2. **OAuth2 Flow**: For third-party integrations, follow the OAuth2
flow documented in Notion's 'API Authentication Guide'.
3. **API Keys**: For server-to-server communication, use Bearer tokens
in the Authorization header.
For token refresh handling, see issue #203 and PR #178 for the latest
OAuth scope updates.
7. 理解架构
路由器工作流遵循一个清晰的模式:分类阶段
classify_query 函数使用结构化输出来分析用户查询并确定要调用哪些智能体。这是路由智能所在之处:
- 使用 Pydantic 模型(Python)或 Zod 模式(JS)确保有效输出
- 返回一个
Classification对象列表,每个对象都有source和目标query - 仅包含相关源——不相关的源会被省略
使用 send 进行并行执行
route_to_agents 函数将分类映射到 Send 对象。每个 Send 指定目标节点和要传递的状态:
// Classifications: [{ source: "github", query: "..." }, { source: "notion", query: "..." }]
// Becomes:
[new Send("github", { query: "..." }), new Send("notion", { query: "..." })]
// Both agents execute simultaneously, each receiving only the query it needs
AgentInput,其中仅包含 query 字段——而不是完整的路由器状态。这保持了接口的清晰和明确。
使用归约器收集结果
智能体结果通过归约器流回主状态。每个智能体返回:{ results: [{ source: "github", result: "..." }] }
operator.add)连接这些列表,将所有并行结果收集到 state["results"] 中。
合成阶段
所有智能体完成后,synthesize_results 函数会遍历收集的结果:
- 等待所有并行分支完成(LangGraph 会自动处理此问题)
- 引用原始查询以确保答案解决用户提出的问题
- 组合来自所有源的信息,避免冗余
部分结果:在本教程中,所有选定的智能体必须在合成之前完成。
8. 完整的工作示例
以下是可运行脚本中的所有内容:Show 查看完整代码
Show 查看完整代码
/**
* Multi-Source Knowledge Router Example
*
* This example demonstrates the router pattern for multi-agent systems.
* A router classifies queries, routes them to specialized agents in parallel,
* and synthesizes results into a combined response.
*/
import { z } from "zod/v4";
import { tool } from "langchain";
import { StateGraph, START, END, Send, StateSchema, ReducedValue } from "@langchain/langgraph";
const AgentOutput = z.object({
source: z.string(),
result: z.string(),
});
const RouterState = new StateSchema({
query: z.string(),
classifications: z.array(
z.object({
source: z.enum(["github", "notion", "slack"]),
query: z.string(),
})
),
results: new ReducedValue(
z.array(AgentOutput).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
finalAnswer: z.string(),
});
const searchCode = tool(
async ({ query, repo }) => {
return `Found code matching '${query}' in ${repo || "main"}: authentication middleware in src/auth.py`;
},
{
name: "search_code",
description: "Search code in GitHub repositories.",
schema: z.object({
query: z.string(),
repo: z.string().optional().default("main"),
}),
}
);
const searchIssues = tool(
async ({ query }) => {
return `Found 3 issues matching '${query}': #142 (API auth docs), #89 (OAuth flow), #203 (token refresh)`;
},
{
name: "search_issues",
description: "Search GitHub issues and pull requests.",
schema: z.object({
query: z.string(),
}),
}
);
const searchPrs = tool(
async ({ query }) => {
return `PR #156 added JWT authentication, PR #178 updated OAuth scopes`;
},
{
name: "search_prs",
description: "Search pull requests for implementation details.",
schema: z.object({
query: z.string(),
}),
}
);
const searchNotion = tool(
async ({ query }) => {
return `Found documentation: 'API Authentication Guide' - covers OAuth2 flow, API keys, and JWT tokens`;
},
{
name: "search_notion",
description: "Search Notion workspace for documentation.",
schema: z.object({
query: z.string(),
}),
}
);
const getPage = tool(
async ({ pageId }) => {
return `Page content: Step-by-step authentication setup instructions`;
},
{
name: "get_page",
description: "Get a specific Notion page by ID.",
schema: z.object({
pageId: z.string(),
}),
}
);
const searchSlack = tool(
async ({ query }) => {
return `Found discussion in #engineering: 'Use Bearer tokens for API auth, see docs for refresh flow'`;
},
{
name: "search_slack",
description: "Search Slack messages and threads.",
schema: z.object({
query: z.string(),
}),
}
);
const getThread = tool(
async ({ threadId }) => {
return `Thread discusses best practices for API key rotation`;
},
{
name: "get_thread",
description: "Get a specific Slack thread.",
schema: z.object({
threadId: z.string(),
}),
}
);
import { createAgent } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
const llm = new ChatOpenAI({ model: "gpt-4.1" });
const githubAgent = createAgent({
model: llm,
tools: [searchCode, searchIssues, searchPrs],
systemPrompt: `
You are a GitHub expert. Answer questions about code,
API references, and implementation details by searching
repositories, issues, and pull requests.
`.trim(),
});
const notionAgent = createAgent({
model: llm,
tools: [searchNotion, getPage],
systemPrompt: `
You are a Notion expert. Answer questions about internal
processes, policies, and team documentation by searching
the organization's Notion workspace.
`.trim(),
});
const slackAgent = createAgent({
model: llm,
tools: [searchSlack, getThread],
systemPrompt: `
You are a Slack expert. Answer questions by searching
relevant threads and discussions where team members have
shared knowledge and solutions.
`.trim(),
});
const routerLlm = new ChatOpenAI({ model: "gpt-4.1-mini" });
// Define structured output schema for the classifier
const ClassificationResultSchema = z.object({
classifications: z
.array(
z.object({
source: z.enum(["github", "notion", "slack"]),
query: z.string(),
})
)
.describe("List of agents to invoke with their targeted sub-questions"),
});
async function classifyQuery(state: typeof RouterState.State) {
const structuredLlm = routerLlm.withStructuredOutput(
ClassificationResultSchema
);
const result = await structuredLlm.invoke([
{
role: "system",
content: `Analyze this query and determine which knowledge bases to consult.
For each relevant source, generate a targeted sub-question optimized for that source.
Available sources:
- github: Code, API references, implementation details, issues, pull requests
- notion: Internal documentation, processes, policies, team wikis
- slack: Team discussions, informal knowledge sharing, recent conversations
Return ONLY the sources that are relevant to the query. Each source should have
a targeted sub-question optimized for that specific knowledge domain.
Example for "How do I authenticate API requests?":
- github: "What authentication code exists? Search for auth middleware, JWT handling"
- notion: "What authentication documentation exists? Look for API auth guides"
(slack omitted because it's not relevant for this technical question)`,
},
{ role: "user", content: state.query },
]);
return { classifications: result.classifications };
}
function routeToAgents(state: typeof RouterState.State): Send[] {
return state.classifications.map(
(c) => new Send(c.source, { query: c.query })
);
}
async function queryGithub(state: typeof RouterState.State) {
const result = await githubAgent.invoke({
messages: [{ role: "user", content: state.query }],
});
return {
results: [{ source: "github", result: result.messages.at(-1)?.content }],
};
}
async function queryNotion(state: typeof RouterState.State) {
const result = await notionAgent.invoke({
messages: [{ role: "user", content: state.query }],
});
return {
results: [{ source: "notion", result: result.messages.at(-1)?.content }],
};
}
async function querySlack(state: typeof RouterState.State) {
const result = await slackAgent.invoke({
messages: [{ role: "user", content: state.query }],
});
return {
results: [{ source: "slack", result: result.messages.at(-1)?.content }],
};
}
async function synthesizeResults(state: typeof RouterState.State) {
if (state.results.length === 0) {
return { finalAnswer: "No results found from any knowledge source." };
}
// Format results for synthesis
const formatted = state.results.map(
(r) =>
`**From ${r.source.charAt(0).toUpperCase() + r.source.slice(1)}:**\n${r.result}`
);
const synthesisResponse = await routerLlm.invoke([
{
role: "system",
content: `Synthesize these search results to answer the original question: "${state.query}"
- Combine information from multiple sources without redundancy
- Highlight the most relevant and actionable information
- Note any discrepancies between sources
- Keep the response concise and well-organized`,
},
{ role: "user", content: formatted.join("\n\n") },
]);
return { finalAnswer: synthesisResponse.content };
}
const workflow = new StateGraph(RouterState)
.addNode("classify", classifyQuery)
.addNode("github", queryGithub)
.addNode("notion", queryNotion)
.addNode("slack", querySlack)
.addNode("synthesize", synthesizeResults)
.addEdge(START, "classify")
.addConditionalEdges("classify", routeToAgents, ["github", "notion", "slack"])
.addEdge("github", "synthesize")
.addEdge("notion", "synthesize")
.addEdge("slack", "synthesize")
.addEdge("synthesize", END)
.compile();
const result = await workflow.invoke({
query: "How do I authenticate API requests?",
});
console.log("Original query:", result.query);
console.log("\nClassifications:");
for (const c of result.classifications) {
console.log(` ${c.source}: ${c.query}`);
}
console.log(`\n${"=".repeat(60)}\n`);
console.log("Final Answer:");
console.log(result.finalAnswer);
9. 高级:有状态路由器
到目前为止,我们构建的路由器是无状态的(每个请求都是独立处理的,调用之间没有记忆)。对于多轮对话,您需要一种有状态的方法。工具包装器方法
添加会话记忆的最简单方法是将无状态路由器包装为一个工具,供对话智能体调用:import { MemorySaver } from "@langchain/langgraph";
const searchKnowledgeBase = tool(
async ({ query }) => {
const result = await workflow.invoke({ query });
return result.finalAnswer;
},
{
name: "search_knowledge_base",
description: `Search across multiple knowledge sources (GitHub, Notion, Slack).
Use this to find information about code, documentation, or team discussions.`,
schema: z.object({
query: z.string().describe("The search query"),
}),
}
);
const conversationalAgent = createAgent({
model: llm,
tools: [searchKnowledgeBase],
systemPrompt: `
You are a helpful assistant that answers questions about our organization.
Use the search_knowledge_base tool to find information across our code,
documentation, and team discussions.
`.trim(),
checkpointer: new MemorySaver(),
});
const config = { configurable: { thread_id: "user-123" } };
let conversationalAgentResult = await conversationalAgent.invoke(
{
messages: [
{ role: "user", content: "How do I authenticate API requests?" },
],
},
config
);
console.log(conversationalAgentResult.messages.at(-1)?.content);
conversationalAgentResult = await conversationalAgent.invoke(
{
messages: [
{
role: "user",
content: "What about rate limiting for those endpoints?",
},
],
},
config
);
console.log(conversationalAgentResult.messages.at(-1)?.content);
工具包装器方法适用于大多数用例。它提供了清晰的分离:路由器处理多源查询,而对话智能体处理上下文和记忆。
完全持久化方法
如果路由器本身需要维护状态——例如,在路由决策中使用先前的搜索结果——请使用持久化在路由器级别存储消息历史。10. 关键要点
当您具备以下条件时,路由器模式表现出色:- 不同的垂直领域:每个领域都需要专门的工具和提示的独立知识领域
- 并行查询需求:受益于同时查询多个源的问题
- 合成要求:需要将来自多个源的结果组合成连贯的响应
后续步骤
连接这些文档 到 Claude、VSCode 等,通过 MCP 获取实时答案。

