CockroachDB 是一个分布式 SQL 数据库,构建于事务性强且高度一致的键值存储之上。它支持水平扩展,能够在磁盘、机器、机架甚至数据中心故障时保持运行,延迟中断极小且无需人工干预。
主要特性:
- 分布式 SQL:在扩展的同时保持 ACID 保证
- 原生向量支持:内置
VECTOR 类型(v24.2+)和 C-SPANN 索引(v25.2+)
- 兼容 PostgreSQL:可直接替换 PostgreSQL 应用程序
- 全局复制:多区域部署,低延迟
- 自动分片:数据自动分布在节点间
- 可序列化隔离:默认使用最强隔离级别
安装与设置
安装 LangChain 集成:
pip install langchain-cockroachdb
获取 CockroachDB 连接字符串
你需要一个 CockroachDB 集群。选择以下任一选项:
选项 1:CockroachDB Cloud(推荐)
- 在 cockroachlabs.cloud 注册
- 创建一个免费集群
- 获取连接字符串:
cockroachdb://user:pass@host:26257/db?sslmode=verify-full
选项 2:Docker(开发环境)
docker run -d --name cockroachdb -p 26257:26257 \
cockroachdb/cockroach:latest start-single-node --insecure
连接字符串:cockroachdb://root@localhost:26257/defaultdb?sslmode=disable
选项 3:本地二进制文件
从 cockroachlabs.com/docs/releases 下载
向量存储
CockroachDB 可用作向量存储,支持原生 VECTOR 类型和 C-SPANN 分布式索引。
主要特性:
- 原生向量支持(v24.2+)
- 针对分布式系统优化的 C-SPANN 索引(v25.2+)
- 高级元数据过滤
- 使用前缀列实现多租户
- 水平可扩展性
详细用法请参阅 CockroachDB 向量存储文档。
快速示例:
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings
# 初始化
engine = CockroachDBEngine.from_connection_string(
"cockroachdb://user:pass@host:26257/db"
)
await engine.ainit_vectorstore_table(
table_name="documents",
vector_dimension=1536,
)
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=OpenAIEmbeddings(),
collection_name="documents",
)
# 使用它
ids = await vectorstore.aadd_texts(["Hello world"])
results = await vectorstore.asimilarity_search("Hi", k=1)
聊天消息历史
在 CockroachDB 中存储对话历史,用于持久化、分布式的聊天应用。
主要特性:
- 带自动复制的分布式存储
- 强一致性(可序列化)
- 基于会话的组织
- 高可用性
详细用法请参阅 CockroachDB 聊天历史文档。
快速示例:
from langchain_cockroachdb import CockroachDBChatMessageHistory
import uuid
chat_history = CockroachDBChatMessageHistory(
session_id=str(uuid.uuid4()),
connection_string=CONNECTION_STRING,
table_name="chat_history",
)
from langchain.messages import HumanMessage, AIMessage
await chat_history.aadd_message(HumanMessage(content="Hello!"))
await chat_history.aadd_message(AIMessage(content="Hi there!"))
messages = await chat_history.aget_messages()
LangGraph 检查点存储器
在 CockroachDB 中持久化 LangGraph 工作流状态,用于短期记忆、人机交互和容错。
同步(CockroachDBSaver)和异步(AsyncCockroachDBSaver)实现均可用。
首次使用 CockroachDB 检查点存储器时,请调用 checkpointer.setup()(或 await checkpointer.setup())以创建所需的表。
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import CockroachDBSaver
model = init_chat_model(model="claude-haiku-4-5-20251001")
DB_URI = os.environ["COCKROACHDB_URI"]
# 示例:"cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
# checkpointer.setup()
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "hi! I'm bob"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "what's my name?"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import AsyncCockroachDBSaver
model = init_chat_model(model="claude-haiku-4-5-20251001")
DB_URI = os.environ["COCKROACHDB_URI"]
# 示例:"cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
async with AsyncCockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
# await checkpointer.setup()
async def call_model(state: MessagesState):
response = await model.ainvoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "hi! I'm bob"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "what's my name?"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
连接选项(包括 CockroachDB Cloud(推荐用于生产环境)、Docker 和本地二进制文件安装)请参阅上方的获取 CockroachDB 连接字符串。对于本地开发,sslmode=disable 是可接受的;在生产环境中请始终使用 sslmode=verify-full。
检查点存储器使用原始 psycopg3 连接(而非 SQLAlchemy)以兼容 LangGraph 的检查点接口。from_conn_string 工厂方法接受 cockroachdb:// URL 并自动转换。
行级 TTL(v0.2.1+)
使用 CockroachDB 的行级 TTL 自动过期旧检查点数据:
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
# 使超过 30 天的检查点过期,每天清理一次
checkpointer.enable_ttl(ttl_interval="30 days", cron="@daily")
# 正常使用检查点存储器 -- 旧数据会自动清理
graph = builder.compile(checkpointer=checkpointer)
# 稍后禁用 TTL:
# checkpointer.disable_ttl()
异步变体:await checkpointer.aenable_ttl(ttl_interval="7 days", cron="@hourly")
TTL 删除是最终的 — 过期的行在 CockroachDB 的后台作业按指定 cron 计划运行之前仍可查询。
性能优化(v0.2.1+)
检查点存储器包含多项针对低延迟读取的优化:
- 批量获取:
list() 使用 2 个批量查询获取所有 blob 和写入,而不是每个检查点 2 个查询
- 原始 BYTEA:使用 psycopg3 二进制协议,而非在 SQL 中对 blob 进行十六进制编码
- 预处理语句:
from_conn_string() 启用服务器端查询计划缓存(prepare_threshold=5)
多租户
使用可选的命名空间列按租户隔离向量数据。启用后,所有 CRUD 和搜索操作都限定在指定的命名空间内。
CockroachDB 的 C-SPANN 索引支持前缀列,因此命名空间过滤直接使用向量索引,无需单独扫描。
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings
engine = CockroachDBEngine.from_connection_string(CONNECTION_STRING)
# 创建带有命名空间列的表
await engine.ainit_vectorstore_table(
table_name="documents",
vector_dimension=1536,
namespace_column="namespace",
)
# 创建限定于特定租户的向量存储
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=OpenAIEmbeddings(),
collection_name="documents",
namespace="tenant_a",
)
# 所有操作都限定在 tenant_a 内
ids = await vectorstore.aadd_texts(["Tenant A document"])
results = await vectorstore.asimilarity_search("query", k=5)
为什么选择 CockroachDB 用于 AI 应用?
设计即分布式
- 水平可扩展性:添加节点以处理更多负载
- 多区域部署:为全球用户提供低延迟服务
- 自动再平衡:数据自动在节点间分布
生产就绪的可靠性
- 高可用性:能够承受节点、机架和数据中心故障
- 零停机升级:滚动更新,无停机时间
- 备份与恢复:时间点恢复
大规模向量搜索
- C-SPANN 索引:分布式近似最近邻搜索
- 原生向量类型:对嵌入向量的一流支持
- 实时索引:新向量无需重建
- 多租户:使用前缀列实现高效租户隔离
PostgreSQL 兼容性
- 轻松迁移:可直接替换 PostgreSQL
- 熟悉的 SQL:标准 PostgreSQL 语法
- 现有工具:兼容 PostgreSQL 驱动程序和工具
将这些文档通过 MCP 连接到 Claude、VSCode 等,以获取实时答案。