Skip to main content
通过实现在代理执行流程特定节点运行的钩子来构建自定义中间件。

钩子

中间件提供两种钩子风格来拦截代理执行:

节点式钩子

在特定执行节点按顺序运行。适用于日志记录、验证和状态更新。 可用钩子:
  • before_agent - 代理启动前(每次调用执行一次)
  • before_model - 每次模型调用前
  • after_model - 每次模型响应后
  • after_agent - 代理完成后(每次调用执行一次)
示例:
from langchain.agents.middleware import before_model, after_model, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@before_model(can_jump_to=["end"])
def check_message_limit(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    if len(state["messages"]) >= 50:
        return {
            "messages": [AIMessage("Conversation limit reached.")],
            "jump_to": "end"
        }
    return None

@after_model
def log_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"Model returned: {state['messages'][-1].content}")
    return None

包装式钩子

拦截执行并控制处理器的调用时机。适用于重试、缓存和数据转换。 你可以决定处理器被调用零次(短路)、一次(正常流程)或多次(重试逻辑)。 可用钩子:
  • wrap_model_call - 围绕每次模型调用
  • wrap_tool_call - 围绕每次工具调用
示例:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable


@wrap_model_call
def retry_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"Retry {attempt + 1}/3 after error: {e}")

创建中间件

你可以通过两种方式创建中间件:

基于装饰器的中间件

对于单钩子中间件快速简便。使用装饰器包装单个函数。 可用装饰器: 节点式: 包装式: 便捷式: 示例:
from langchain.agents.middleware import (
    before_model,
    wrap_model_call,
    AgentState,
    ModelRequest,
    ModelResponse,
)
from langchain.agents import create_agent
from langgraph.runtime import Runtime
from typing import Any, Callable


@before_model
def log_before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"About to call model with {len(state['messages'])} messages")
    return None

@wrap_model_call
def retry_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"Retry {attempt + 1}/3 after error: {e}")

agent = create_agent(
    model="gpt-4.1",
    middleware=[log_before_model, retry_model],
    tools=[...],
)
何时使用装饰器:
  • 只需单个钩子
  • 无需复杂配置
  • 快速原型开发

基于类的中间件

对于具有多个钩子或配置的复杂中间件更为强大。当你需要为同一钩子定义同步和异步实现,或希望在单个中间件中组合多个钩子时,请使用类。 示例:
from langchain.agents.middleware import (
    AgentMiddleware,
    AgentState,
    ModelRequest,
    ModelResponse,
)
from langgraph.runtime import Runtime
from typing import Any, Callable

class LoggingMiddleware(AgentMiddleware):
    def before_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
        print(f"About to call model with {len(state['messages'])} messages")
        return None

    def after_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
        print(f"Model returned: {state['messages'][-1].content}")
        return None

agent = create_agent(
    model="gpt-4.1",
    middleware=[LoggingMiddleware()],
    tools=[...],
)
何时使用类:
  • 为同一钩子定义同步和异步实现
  • 单个中间件中需要多个钩子
  • 需要复杂配置(如可配置阈值、自定义模型)
  • 在项目间通过初始化时配置进行复用

自定义状态模式

中间件可以用自定义属性扩展代理的状态。这使中间件能够:
  • 跨执行追踪状态:维护计数器、标志或在代理整个执行生命周期中持久存在的其他值
  • 在钩子之间共享数据:将信息从 before_model 传递到 after_model 或在不同中间件实例之间传递
  • 实现横切关注点:在不修改核心代理逻辑的情况下添加速率限制、使用量追踪、用户上下文或审计日志等功能
  • 做出条件决策:使用累积状态来决定是否继续执行、跳转到不同节点或动态修改行为
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, before_model, after_model
from typing_extensions import NotRequired
from typing import Any
from langgraph.runtime import Runtime


class CustomState(AgentState):
    model_call_count: NotRequired[int]
    user_id: NotRequired[str]


@before_model(state_schema=CustomState, can_jump_to=["end"])
def check_call_limit(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
    count = state.get("model_call_count", 0)
    if count > 10:
        return {"jump_to": "end"}
    return None


@after_model(state_schema=CustomState)
def increment_counter(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
    return {"model_call_count": state.get("model_call_count", 0) + 1}


agent = create_agent(
    model="gpt-4.1",
    middleware=[check_call_limit, increment_counter],
    tools=[],
)

# Invoke with custom state
result = agent.invoke({
    "messages": [HumanMessage("Hello")],
    "model_call_count": 0,
    "user_id": "user-123",
})

执行顺序

使用多个中间件时,请了解它们的执行方式:
agent = create_agent(
    model="gpt-4.1",
    middleware=[middleware1, middleware2, middleware3],
    tools=[...],
)
Before 钩子按顺序运行:
  1. middleware1.before_agent()
  2. middleware2.before_agent()
  3. middleware3.before_agent()
代理循环开始
  1. middleware1.before_model()
  2. middleware2.before_model()
  3. middleware3.before_model()
Wrap 钩子像函数调用一样嵌套:
  1. middleware1.wrap_model_call()middleware2.wrap_model_call()middleware3.wrap_model_call() → model
After 钩子按逆序运行:
  1. middleware3.after_model()
  2. middleware2.after_model()
  3. middleware1.after_model()
代理循环结束
  1. middleware3.after_agent()
  2. middleware2.after_agent()
  3. middleware1.after_agent()
关键规则:
  • before_* 钩子:从第一个到最后一个
  • after_* 钩子:从最后一个到第一个(逆序)
  • wrap_* 钩子:嵌套(第一个中间件包裹所有其他中间件)

代理跳转

要从中间件提前退出,返回一个包含 jump_to 的字典: 可用跳转目标:
  • 'end':跳转到代理执行结束(或第一个 after_agent 钩子)
  • 'tools':跳转到工具节点
  • 'model':跳转到模型节点(或第一个 before_model 钩子)
from langchain.agents.middleware import after_model, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    last_message = state["messages"][-1]
    if "BLOCKED" in last_message.content:
        return {
            "messages": [AIMessage("I cannot respond to that request.")],
            "jump_to": "end"
        }
    return None

最佳实践

  1. 保持中间件专注——每个中间件应只做好一件事
  2. 优雅地处理错误——不要让中间件错误导致代理崩溃
  3. 使用合适的钩子类型
    • 节点式用于顺序逻辑(日志记录、验证)
    • 包装式用于控制流(重试、回退、缓存)
  4. 清晰记录任何自定义状态属性
  5. 在集成之前对中间件进行独立单元测试
  6. 考虑执行顺序——将关键中间件放在列表的最前面
  7. 尽可能使用内置中间件

示例

动态模型选择

from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.chat_models import init_chat_model
from typing import Callable


complex_model = init_chat_model("gpt-4.1")
simple_model = init_chat_model("gpt-4.1-mini")

@wrap_model_call
def dynamic_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    # Use different model based on conversation length
    if len(request.messages) > 10:
        model = complex_model
    else:
        model = simple_model
    return handler(request.override(model=model))

工具调用监控

from langchain.agents.middleware import wrap_tool_call
from langchain.tools.tool_node import ToolCallRequest
from langchain.messages import ToolMessage
from langgraph.types import Command
from typing import Callable


@wrap_tool_call
def monitor_tool(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
    print(f"Executing tool: {request.tool_call['name']}")
    print(f"Arguments: {request.tool_call['args']}")
    try:
        result = handler(request)
        print(f"Tool completed successfully")
        return result
    except Exception as e:
        print(f"Tool failed: {e}")
        raise

动态选择工具

在运行时选择相关工具以提升性能和准确性。本节介绍如何过滤预注册工具。如需在运行时注册发现的工具(例如来自 MCP 服务器),请参阅运行时工具注册 优势:
  • 更短的提示词 - 通过只暴露相关工具来降低复杂性
  • 更高的准确性 - 模型从更少的选项中正确选择
  • 权限控制 - 根据用户访问权限动态过滤工具
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable


@wrap_model_call
def select_tools(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    """Middleware to select relevant tools based on state/context."""
    # Select a small, relevant subset of tools based on state/context
    relevant_tools = select_relevant_tools(request.state, request.runtime)
    return handler(request.override(tools=relevant_tools))

agent = create_agent(
    model="gpt-4.1",
    tools=all_tools,  # All available tools need to be registered upfront
    middleware=[select_tools],
)

处理系统消息

使用 ModelRequest 上的 system_message 字段在中间件中修改系统消息。system_message 字段包含一个 SystemMessage 对象(即使代理是用字符串 system_prompt 创建的)。 示例:向系统消息添加上下文
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import SystemMessage
from typing import Callable


@wrap_model_call
def add_context(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    # Always work with content blocks
    new_content = list(request.system_message.content_blocks) + [
        {"type": "text", "text": "Additional context."}
    ]
    new_system_message = SystemMessage(content=new_content)
    return handler(request.override(system_message=new_system_message))
示例:使用缓存控制(Anthropic) 在使用 Anthropic 模型时,你可以使用带有缓存控制指令的结构化内容块来缓存大型系统提示词:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import SystemMessage
from typing import Callable


@wrap_model_call
def add_cached_context(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    # Always work with content blocks
    new_content = list(request.system_message.content_blocks) + [
        {
            "type": "text",
            "text": "Here is a large document to analyze:\n\n<document>...</document>",
            # content up until this point is cached
            "cache_control": {"type": "ephemeral"}
        }
    ]

    new_system_message = SystemMessage(content=new_content)
    return handler(request.override(system_message=new_system_message))
注意事项:
  • ModelRequest.system_message 始终是一个 SystemMessage 对象,即使代理是用 system_prompt="string" 创建的
  • 使用 SystemMessage.content_blocks 以块列表的形式访问内容,无论原始内容是字符串还是列表
  • 修改系统消息时,使用 content_blocks 并追加新块以保留现有结构
  • 对于缓存控制等高级用例,你可以直接将 SystemMessage 对象传递给 create_agentsystem_prompt 参数

其他资源