Skip to main content

概览

路由器模式是一种多代理架构,其中路由步骤对输入进行分类并将其定向到专门的代理,结果被综合成一个组合响应。当您的组织的知识存在于不同的垂直领域——每个都需要自己代理的独立知识领域(带有专门的工具和提示)时,这种模式非常出色。 在本教程中,您将构建一个多源知识库路由器,通过一个现实的企业场景展示这些优势。该系统将协调三位专家:
  • GitHub 代理:搜索代码、问题和拉取请求。
  • Notion 代理:搜索内部文档和维基。
  • Slack 代理:搜索相关线程和讨论。
当用户问“我如何验证 API 请求?”时,路由器将查询分解为特定于来源的子问题,并行将它们路由到相关代理,并将结果综合成一个连贯的答案。

为什么使用路由器?

路由器模式提供了几个优势:
  • 并行执行:同时查询多个来源,与顺序方法相比减少了延迟。
  • 专门代理:每个垂直领域都有针对其领域优化的专注工具和提示。
  • 选择性路由:并非每个查询都需要每个来源——路由器智能地选择相关的垂直领域。
  • 针对性子问题:每个代理接收针对其领域定制的问题,提高结果质量。
  • 清晰综合:来自多个来源的结果被组合成一个单一、连贯的响应。

概念

我们将涵盖以下概念:
路由器与子代理子代理模式也可以路由到多个代理。当您需要专门的预处理、自定义路由逻辑或希望显式控制并行执行时,请使用路由器模式。当您希望 LLM 动态决定调用哪些代理时,请使用子代理模式。

设置

安装

本教程需要 langchainlanggraph 包:
npm install langchain @langchain/langgraph
有关更多详细信息,请参阅我们的安装指南

LangSmith

设置 LangSmith 以检查代理内部发生的情况。然后设置以下环境变量:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."

选择 LLM

从 LangChain 的集成套件中选择一个聊天模型:
👉 Read the OpenAI chat model integration docs
npm install @langchain/openai
import { initChatModel } from "langchain";

process.env.OPENAI_API_KEY = "your-api-key";

const model = await initChatModel("gpt-5.2");

1. 定义状态

首先,定义状态模式。我们使用三种类型:
  • AgentInput:传递给每个子代理的简单状态(仅查询)
  • AgentOutput:每个子代理返回的结果(来源名称 + 结果)
  • RouterState:跟踪查询、分类、结果和最终答案的主工作流状态
import { StateSchema, ReducedValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const AgentOutput = z.object({
  source: z.string(),
  result: z.string(),
});

const RouterState = new StateSchema({
  query: z.string(),
  classifications: z.array(
    z.object({
      source: z.enum(["github", "notion", "slack"]),
      query: z.string(),
    })
  ),
  results: new ReducedValue(
    z.array(AgentOutput).default(() => []),
    { reducer: (current, update) => current.concat(update) }
  ),
  finalAnswer: z.string(),
});
results 字段使用reducer(Python 中的 operator.add,JS 中的 concat 函数)将来自并行代理执行的输出收集到单个列表中。

2. 为每个垂直领域定义工具

为每个知识领域创建工具。在生产系统中,这些将调用实际 API。对于本教程,我们使用返回模拟数据的存根实现。我们定义了跨越 3 个垂直领域的 7 个工具:GitHub(搜索代码、问题、PR)、Notion(搜索文档、获取页面)和 Slack(搜索消息、获取线程)。
import { tool } from "langchain";
import { z } from "zod";

const searchCode = tool(
  async ({ query, repo }) => {
    return `Found code matching '${query}' in ${repo || "main"}: authentication middleware in src/auth.py`;
  },
  {
    name: "search_code",
    description: "Search code in GitHub repositories.",
    schema: z.object({
      query: z.string(),
      repo: z.string().optional().default("main"),
    }),
  }
);

const searchIssues = tool(
  async ({ query }) => {
    return `Found 3 issues matching '${query}': #142 (API auth docs), #89 (OAuth flow), #203 (token refresh)`;
  },
  {
    name: "search_issues",
    description: "Search GitHub issues and pull requests.",
    schema: z.object({
      query: z.string(),
    }),
  }
);

const searchPrs = tool(
  async ({ query }) => {
    return `PR #156 added JWT authentication, PR #178 updated OAuth scopes`;
  },
  {
    name: "search_prs",
    description: "Search pull requests for implementation details.",
    schema: z.object({
      query: z.string(),
    }),
  }
);

const searchNotion = tool(
  async ({ query }) => {
    return `Found documentation: 'API Authentication Guide' - covers OAuth2 flow, API keys, and JWT tokens`;
  },
  {
    name: "search_notion",
    description: "Search Notion workspace for documentation.",
    schema: z.object({
      query: z.string(),
    }),
  }
);

const getPage = tool(
  async ({ pageId }) => {
    return `Page content: Step-by-step authentication setup instructions`;
  },
  {
    name: "get_page",
    description: "Get a specific Notion page by ID.",
    schema: z.object({
      pageId: z.string(),
    }),
  }
);

const searchSlack = tool(
  async ({ query }) => {
    return `Found discussion in #engineering: 'Use Bearer tokens for API auth, see docs for refresh flow'`;
  },
  {
    name: "search_slack",
    description: "Search Slack messages and threads.",
    schema: z.object({
      query: z.string(),
    }),
  }
);

const getThread = tool(
  async ({ threadId }) => {
    return `Thread discusses best practices for API key rotation`;
  },
  {
    name: "get_thread",
    description: "Get a specific Slack thread.",
    schema: z.object({
      threadId: z.string(),
    }),
  }
);

3. 创建专门代理

为每个垂直领域创建一个代理。每个代理都有特定领域的工具和针对其知识来源优化的提示。所有三个都遵循相同的模式——只有工具和系统提示不同。
import { createAgent } from "langchain";
import { ChatOpenAI } from "@langchain/openai";

const llm = new ChatOpenAI({ model: "gpt-4.1" });

const githubAgent = createAgent({
  model: llm,
  tools: [searchCode, searchIssues, searchPrs],
  systemPrompt: `
You are a GitHub expert. Answer questions about code,
API references, and implementation details by searching
repositories, issues, and pull requests.
  `.trim(),
});

const notionAgent = createAgent({
  model: llm,
  tools: [searchNotion, getPage],
  systemPrompt: `
You are a Notion expert. Answer questions about internal
processes, policies, and team documentation by searching
the organization's Notion workspace.
  `.trim(),
});

const slackAgent = createAgent({
  model: llm,
  tools: [searchSlack, getThread],
  systemPrompt: `
You are a Slack expert. Answer questions by searching
relevant threads and discussions where team members have
shared knowledge and solutions.
  `.trim(),
});

4. 构建路由器工作流

现在使用 StateGraph 构建路由器工作流。工作流有四个主要步骤:
  1. 分类:分析查询并确定要调用哪些代理以及使用什么子问题
  2. 路由:使用 Send 并行扇出到选定的代理
  3. 查询代理:每个代理接收一个简单的 AgentInput 并返回一个 AgentOutput
  4. 综合:将收集的结果组合成一个连贯的响应
import { StateGraph, START, END, Send } from "@langchain/langgraph";
import { z } from "zod";

const routerLlm = new ChatOpenAI({ model: "gpt-4.1-mini" });


// Define structured output schema for the classifier
const ClassificationResultSchema = z.object({
  classifications: z.array(z.object({
    source: z.enum(["github", "notion", "slack"]),
    query: z.string(),
  })).describe("List of agents to invoke with their targeted sub-questions"),
});


async function classifyQuery(state: typeof RouterState.State) {
  const structuredLlm = routerLlm.withStructuredOutput(ClassificationResultSchema);

  const result = await structuredLlm.invoke([
    {
      role: "system",
      content: `Analyze this query and determine which knowledge bases to consult.
For each relevant source, generate a targeted sub-question optimized for that source.

Available sources:
- github: Code, API references, implementation details, issues, pull requests
- notion: Internal documentation, processes, policies, team wikis
- slack: Team discussions, informal knowledge sharing, recent conversations

Return ONLY the sources that are relevant to the query. Each source should have
a targeted sub-question optimized for that specific knowledge domain.

Example for "How do I authenticate API requests?":
- github: "What authentication code exists? Search for auth middleware, JWT handling"
- notion: "What authentication documentation exists? Look for API auth guides"
(slack omitted because it's not relevant for this technical question)`
    },
    { role: "user", content: state.query }
  ]);

  return { classifications: result.classifications };
}


function routeToAgents(state: typeof RouterState.State): Send[] {
  return state.classifications.map(
    (c) => new Send(c.source, { query: c.query })  
  );
}


async function queryGithub(state: AgentInput) {
  const result = await githubAgent.invoke({
    messages: [{ role: "user", content: state.query }]  
  });
  return { results: [{ source: "github", result: result.messages.at(-1)?.content }] };
}


async function queryNotion(state: AgentInput) {
  const result = await notionAgent.invoke({
    messages: [{ role: "user", content: state.query }]  
  });
  return { results: [{ source: "notion", result: result.messages.at(-1)?.content }] };
}


async function querySlack(state: AgentInput) {
  const result = await slackAgent.invoke({
    messages: [{ role: "user", content: state.query }]  
  });
  return { results: [{ source: "slack", result: result.messages.at(-1)?.content }] };
}


async function synthesizeResults(state: typeof RouterState.State) {
  if (state.results.length === 0) {
    return { finalAnswer: "No results found from any knowledge source." };
  }

  // Format results for synthesis
  const formatted = state.results.map(
    (r) => `**From ${r.source.charAt(0).toUpperCase() + r.source.slice(1)}:**\n${r.result}`
  );

  const synthesisResponse = await routerLlm.invoke([
    {
      role: "system",
      content: `Synthesize these search results to answer the original question: "${state.query}"

- Combine information from multiple sources without redundancy
- Highlight the most relevant and actionable information
- Note any discrepancies between sources
- Keep the response concise and well-organized`
    },
    { role: "user", content: formatted.join("\n\n") },
  ]);

  return { finalAnswer: synthesisResponse.content };
}

5. 编译工作流

现在通过用边连接节点来组装工作流。关键是使用带有路由函数的 add_conditional_edges 来启用并行执行:
const workflow = new StateGraph(RouterState)
  .addNode("classify", classifyQuery)
  .addNode("github", queryGithub)
  .addNode("notion", queryNotion)
  .addNode("slack", querySlack)
  .addNode("synthesize", synthesizeResults)
  .addEdge(START, "classify")
  .addConditionalEdges("classify", routeToAgents, ["github", "notion", "slack"])
  .addEdge("github", "synthesize")
  .addEdge("notion", "synthesize")
  .addEdge("slack", "synthesize")
  .addEdge("synthesize", END)
  .compile();
add_conditional_edges 调用通过 route_to_agents 函数将分类节点连接到代理节点。当 route_to_agents 返回多个 Send 对象时,这些节点并行执行。

6. 使用路由器

使用跨越多个知识领域的查询测试您的路由器:
const result = await workflow.invoke({
  query: "How do I authenticate API requests?"
});

console.log("Original query:", result.query);
console.log("\nClassifications:");
for (const c of result.classifications) {
  console.log(`  ${c.source}: ${c.query}`);
}
console.log("\n" + "=".repeat(60) + "\n");
console.log("Final Answer:");
console.log(result.finalAnswer);
预期输出:
Original query: How do I authenticate API requests?

Classifications:
  github: What authentication code exists? Search for auth middleware, JWT handling
  notion: What authentication documentation exists? Look for API auth guides

============================================================

Final Answer:
To authenticate API requests, you have several options:

1. **JWT Tokens**: The recommended approach for most use cases.
   Implementation details are in `src/auth.py` (PR #156).

2. **OAuth2 Flow**: For third-party integrations, follow the OAuth2
   flow documented in Notion's 'API Authentication Guide'.

3. **API Keys**: For server-to-server communication, use Bearer tokens
   in the Authorization header.

For token refresh handling, see issue #203 and PR #178 for the latest
OAuth scope updates.
路由器分析了查询,对其进行分类以确定要调用哪些代理(GitHub 和 Notion,但对于此技术问题不包括 Slack),并行查询两个代理,并将结果综合成一个连贯的答案。

7. 理解架构

路由器工作流遵循清晰的模式:

分类阶段

classify_query 函数使用结构化输出来分析用户的查询并确定要调用哪些代理。这是路由智能所在:
  • 使用 Pydantic 模型 (Python) 或 Zod 模式 (JS) 确保有效输出
  • 返回 Classification 对象列表,每个对象都有 source 和目标 query
  • 仅包含相关来源——不相关的来源被简单地省略
这种结构化方法比自由形式的 JSON 解析更可靠,并使路由逻辑明确。

使用 send 的并行执行

route_to_agents 函数将分类映射到 Send 对象。每个 Send 指定目标节点和要传递的状态:
// Classifications: [{ source: "github", query: "..." }, { source: "notion", query: "..." }]
// Becomes:
[new Send("github", { query: "..." }), new Send("notion", { query: "..." })]
// Both agents execute simultaneously, each receiving only the query it needs
每个代理节点接收一个只有 query 字段的简单 AgentInput——而不是完整的路由器状态。这保持了接口的清晰和明确。

使用 reducer 收集结果

代理结果通过reducer流回主状态。每个代理返回:
{ results: [{ source: "github", result: "..." }] }
Reducer(Python 中的 operator.add)连接这些列表,将所有并行结果收集到 state["results"] 中。

综合阶段

所有代理完成后,synthesize_results 函数迭代收集的结果:
  • 等待所有并行分支完成(LangGraph 自动处理此操作)
  • 引用原始查询以确保答案解决了用户的问题
  • 组合来自多个来源的信息,没有冗余
部分结果:在本教程中,所有选定的代理必须在综合之前完成。

8. 完整工作示例

这是一个可运行脚本中的所有内容:

9. 高级:有状态路由器

我们目前构建的路由器是无状态的——每个请求都是独立处理的,调用之间没有记忆。对于多轮对话,您需要有状态的方法。

工具包装器方法

添加对话记忆的最简单方法是将无状态路由器包装为一个对话代理可以调用的工具:
import { MemorySaver } from "@langchain/langgraph";

const searchKnowledgeBase = tool(
  async ({ query }) => {
    const result = await workflow.invoke({ query });
    return result.finalAnswer;
  },
  {
    name: "search_knowledge_base",
    description: `Search across multiple knowledge sources (GitHub, Notion, Slack).
Use this to find information about code, documentation, or team discussions.`,
    schema: z.object({
      query: z.string().describe("The search query"),
    }),
  }
);

const conversationalAgent = createAgent({
  model: llm,
  tools: [searchKnowledgeBase],
  systemPrompt: `
You are a helpful assistant that answers questions about our organization.
Use the search_knowledge_base tool to find information across our code,
documentation, and team discussions.
  `.trim(),
  checkpointer: new MemorySaver(),
});
这种方法保持路由器无状态,而对话代理处理记忆和上下文。用户可以进行多轮对话,代理将根据需要调用路由器工具。
const config = { configurable: { thread_id: "user-123" } };
let conversationalAgentResult = await conversationalAgent.invoke(
  {
    messages: [
      { role: "user", content: "How do I authenticate API requests?" },
    ],
  },
  config
);
console.log(conversationalAgentResult.messages.at(-1)?.content);

conversationalAgentResult = await conversationalAgent.invoke(
  {
    messages: [
      {
        role: "user",
        content: "What about rate limiting for those endpoints?",
      },
    ],
  },
  config
);
console.log(conversationalAgentResult.messages.at(-1)?.content);
工具包装器方法推荐用于大多数用例。它提供了清晰的分离:路由器处理多源查询,而对话代理处理上下文和记忆。

完全持久化方法

如果您需要路由器本身维护状态——例如,在路由决策中使用先前的搜索结果——请使用持久化在路由器级别存储消息历史。
有状态路由器增加了复杂性。 当跨轮次路由到不同代理时,如果代理具有不同的语气或提示,对话可能会感觉不一致。考虑改用交接模式子代理模式——两者都为与不同代理的多轮对话提供了更清晰的语义。

10. 关键要点

当您拥有以下情况时,路由器模式表现出色:
  • 不同的垂直领域:每个都需要专门工具和提示的独立知识领域
  • 并行查询需求:受益于同时查询多个来源的问题
  • 综合要求:来自多个来源的结果需要组合成一个连贯的响应
该模式有三个阶段:分解(分析查询并生成针对性的子问题)、路由(并行执行查询)和综合(组合结果)。
何时使用路由器模式当您有多个独立的知识来源,需要低延迟并行查询,并希望显式控制路由逻辑时,请使用路由器模式。对于具有动态工具选择的更简单情况,请考虑子代理模式。对于代理需要顺序与用户交谈的工作流,请考虑交接

下一步