Skip to main content
本指南提供了开始使用 Drasi 工具的快速概览。有关所有 Drasi 功能、参数和配置的详细列表,请访问 Drasi 文档langchain_drasi 仓库。

概述

Drasi 是一个变更检测平台,使检测和响应数据库中的变更变得简单高效。LangChain-Drasi 集成通过将外部数据变更与工作流执行连接起来,创建响应式、变更驱动的 AI 智能体。这允许智能体通过桥接外部数据变更与智能体工作流来发现、订阅和响应实时查询更新。Drasi 持续查询流式传输实时更新,触发智能体状态转换、修改记忆或动态控制工作流执行——将静态智能体转变为环境感知的、长期运行的响应式系统。

详情

可序列化JS 支持下载量版本
DrasiToollangchain-drasiPyPI - DownloadsPyPI - Version

功能

  • 查询发现 - 自动识别可用的 Drasi 查询
  • 实时订阅 - 监控持续查询更新
  • 通知处理器 - 六个内置处理器,适用于不同用例
    • 控制台
    • 日志
    • 记忆
    • 缓冲区
    • LangChain 记忆
    • LangGraph 记忆
  • 自定义处理器 - 扩展基础处理器以实现特定领域逻辑

设置

要访问 Drasi 工具,您需要运行 Drasi 和 Drasi MCP 服务器。

先决条件

凭据(可选)

如果您的 Drasi MCP 服务器需要身份验证,您可以使用 Bearer 令牌或其他身份验证方法配置标头:
配置身份验证
from langchain_drasi import MCPConnectionConfig

config = MCPConnectionConfig(
    server_url="http://localhost:8083",
    headers={"Authorization": "Bearer your-token"},
    timeout=30.0
)

安装

Drasi 工具位于 langchain-drasi 包中:
pip install -U langchain-drasi

实例化

现在我们可以实例化一个 Drasi 工具实例。您需要配置 MCP 连接,并可选择添加通知处理器来处理实时更新:
初始化工具实例
from langchain_drasi import create_drasi_tool, MCPConnectionConfig, ConsoleHandler

# 配置到 Drasi MCP 服务器的连接
config = MCPConnectionConfig(
    server_url="http://localhost:8083",
    timeout=30.0
)

# 创建一个通知处理器
handler = ConsoleHandler()

# 创建工具
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

调用

直接调用

以下是直接调用工具的简单示例。
调用工具
# 发现可用查询
queries = await tool.discover_queries()
# 返回: [QueryInfo, QueryInfo, ...]

# 订阅特定查询
await tool.subscribe("hot-freezers")
# 通知路由到已注册的处理器

# 从查询中读取当前结果
result = await tool.read_query("active-orders")
# 返回: 包含当前数据的 QueryResult

作为 ToolCall

我们也可以使用模型生成的 ToolCall 来调用工具,在这种情况下,将返回一个 ToolMessage

在智能体中使用

我们可以在 LangGraph 智能体中使用 Drasi 工具来创建响应式、事件驱动的工作流。为此,我们需要一个具有工具调用能力的模型。
带工具的智能体
from langchain_anthropic import ChatAnthropic
from langchain.agents import create_agent

# 初始化模型
model = ChatAnthropic(model="claude-sonnet-4-6")

# 使用 Drasi 工具创建智能体
agent = create_agent(model, [tool])

# 运行智能体
result = agent.invoke(
    {"messages": [{"role": "user", "content": "有哪些可用的查询?"}]}
)

print(result["messages"][-1].content)

result = agent.invoke(
    {"messages": [{"role": "user", "content": "订阅 customer-orders 查询"}]}
)

print(result["messages"][-1].content)

通知处理器

Drasi 的关键功能之一是其内置的通知处理器,用于处理实时查询结果变更。您可以使用这些处理器根据数据变更采取特定操作。

内置处理器

ConsoleHandler - 将格式化的通知输出到标准输出:
from langchain_drasi import ConsoleHandler

handler = ConsoleHandler()
LoggingHandler - 使用 Python 的日志框架记录通知:
from langchain_drasi import LoggingHandler
import logging

handler = LoggingHandler(
    logger_name="drasi.notifications",
    log_level=logging.INFO
)
MemoryHandler - 在内存中存储通知,支持可选过滤:
from langchain_drasi import MemoryHandler

handler = MemoryHandler(max_size=100)

# 检索通知
all_notifs = handler.get_all()
freezer_notifs = handler.get_by_query("hot-freezers")
added_events = handler.get_by_type("added")
BufferHandler - 用于顺序处理的 FIFO 队列: 当您的工作流正忙于其他事情时,这对于缓冲传入的变更通知非常有用;然后您可以在工作流中设置一个循环,在准备好时从缓冲区消费通知。
from langchain_drasi import BufferHandler

handler = BufferHandler(max_size=100)
# 稍后,消费通知
notification = handler.consume()  # 移除并返回下一个通知
notification = handler.peek()     # 查看下一个通知而不移除
LangGraphMemoryHandler - 将更新直接注入 LangGraph 检查点:
from langchain_drasi import LangGraphMemoryHandler
from langgraph.checkpoint.memory import MemorySaver

checkpoint_manager = MemorySaver()
handler = LangGraphMemoryHandler(
    checkpointer=checkpoint_manager,
    thread_id="your-thread-id"
)

自定义处理器

您可以通过扩展 BaseDrasiNotificationHandler 来创建自定义处理器:
from langchain_drasi import BaseDrasiNotificationHandler

class CustomHandler(BaseDrasiNotificationHandler):
    def on_result_added(self, query_name: str, added_data: dict):
        # 处理新结果
        print(f"{query_name} 中的新结果: {added_data}")

    def on_result_updated(self, query_name: str, updated_data: dict):
        # 处理更新的结果
        print(f"{query_name} 中更新的结果: {updated_data}")

    def on_result_deleted(self, query_name: str, deleted_data: dict):
        # 处理删除的结果
        print(f"{query_name} 中删除的结果: {deleted_data}")

handler = CustomHandler()
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

示例

  • 交互式聊天:一个使用 Drasi 进行实时记忆更新的聊天应用程序。
  • 终结者游戏:一个利用 Drasi 实现动态 NPC 行为的游戏。

用例

Drasi 对于构建需要响应实时数据变更的环境智能体特别有用。一些示例用例包括:
  • AI 副驾驶 - 监控和响应系统事件的助手
  • AI 游戏玩家 - 适应游戏内事件的 NPC
  • 物联网监控 - 处理传感器数据流的智能体
  • 客户支持 - 响应工单更新或客户操作的机器人
  • DevOps 助手 - 监控基础设施变更的工具
  • 协作编辑 - 响应文档或代码变更的系统

API 参考

有关所有 Drasi 功能和配置的详细文档,请访问 API 参考