CockroachDBChatMessageHistory 将聊天对话历史存储在 CockroachDB 的分布式 SQL 数据库中。
代码位于集成包中:langchain-cockroachdb。
CockroachDBChatMessageHistory 提供:
- 分布式存储:聊天历史自动在节点间复制
- 强一致性:可序列化事务防止消息排序问题
- 高可用性:自动故障转移,无数据丢失
- 会话隔离:每个对话具有唯一的会话 ID
- PostgreSQL 兼容性:易于从基于 PostgreSQL 的系统迁移
pip install -qU langchain-cockroachdb
连接字符串
# CockroachDB Cloud
CONNECTION_STRING = "cockroachdb://user:password@host:26257/database?sslmode=verify-full"
# 本地不安全集群
CONNECTION_STRING = "cockroachdb://root@localhost:26257/defaultdb?sslmode=disable"
初始化
创建消息历史表
import asyncio
from langchain_cockroachdb import CockroachDBChatMessageHistory
# 创建表(仅需执行一次)
async def setup():
chat_history = CockroachDBChatMessageHistory(
session_id="setup",
connection_string=CONNECTION_STRING,
table_name="chat_history",
)
await chat_history._acreate_table_if_not_exists()
asyncio.run(setup())
可选:使用 schema 参数指定模式名称(默认:“public”)
为会话初始化
为特定对话创建聊天历史实例:
import uuid
session_id = str(uuid.uuid4()) # 此对话的唯一 ID
chat_history = CockroachDBChatMessageHistory(
session_id=session_id,
connection_string=CONNECTION_STRING,
table_name="chat_history",
)
添加消息
from langchain.messages import HumanMessage, AIMessage
# 向对话添加消息
await chat_history.aadd_message(HumanMessage(content="你好!什么是 CockroachDB?"))
await chat_history.aadd_message(AIMessage(content="CockroachDB 是一个分布式 SQL 数据库..."))
一次添加多条消息
messages = [
HumanMessage(content="什么是向量索引?"),
AIMessage(content="向量索引支持快速相似性搜索..."),
HumanMessage(content="它们如何工作?"),
AIMessage(content="它们使用近似最近邻算法..."),
]
await chat_history.aadd_messages(messages)
检索消息
获取会话的所有消息:
messages = await chat_history.aget_messages()
for msg in messages:
if isinstance(msg, HumanMessage):
print(f"用户: {msg.content}")
elif isinstance(msg, AIMessage):
print(f"AI: {msg.content}")
清除对话历史
删除会话的所有消息:
await chat_history.aclear()
同步接口
使用同步 API:
# 同步用法
chat_history_sync = CockroachDBChatMessageHistory(
session_id=session_id,
connection_string=CONNECTION_STRING,
table_name="chat_history",
)
# 同步操作
chat_history_sync.add_message(HumanMessage(content="你好!"))
messages = chat_history_sync.messages
chat_history_sync.clear()
API 参考
详细文档:
附加资源