CockroachDB 是一种分布式 SQL 数据库,构建于事务性和强一致性键值存储之上。它支持水平扩展,能够在磁盘、机器、机架甚至数据中心故障时保持极低的延迟影响,且无需人工干预。
主要特性:
- 分布式 SQL:在保持 ACID 保证的同时实现横向扩展
- 原生向量支持:内置
VECTOR 类型(v24.2+)和 C-SPANN 索引(v25.2+)
- 兼容 PostgreSQL:可无缝替换 PostgreSQL 应用程序
- 全球复制:支持低延迟的多区域部署
- 自动分片:数据自动分布在各个节点上
- SERIALIZABLE 隔离级别:默认提供最强的隔离级别
安装与配置
安装 LangChain 集成包:
pip install langchain-cockroachdb
获取 CockroachDB 连接字符串
你需要一个 CockroachDB 集群。选择以下任一选项:
选项 1:CockroachDB Cloud(推荐)
- 在 cockroachlabs.cloud 注册账号
- 创建一个免费集群
- 获取你的连接字符串:
cockroachdb://user:pass@host:26257/db?sslmode=verify-full
选项 2:Docker(开发环境)
docker run -d --name cockroachdb -p 26257:26257 \
cockroachdb/cockroach:latest start-single-node --insecure
连接字符串:cockroachdb://root@localhost:26257/defaultdb?sslmode=disable
选项 3:本地二进制文件
从 cockroachlabs.com/docs/releases 下载
向量存储
CockroachDB 可作为向量存储使用,支持原生 VECTOR 类型和 C-SPANN 分布式索引。
主要特性:
- 原生向量支持(v24.2+)
- 针对分布式系统优化的 C-SPANN 索引(v25.2+)
- 高级元数据过滤
- 使用前缀列实现多租户隔离
- 水平可扩展性
有关详细用法,请参阅 CockroachDB 向量存储文档。
快速示例:
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings
# 初始化
engine = CockroachDBEngine.from_connection_string(
"cockroachdb://user:pass@host:26257/db"
)
await engine.ainit_vectorstore_table(
table_name="documents",
vector_dimension=1536,
)
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=OpenAIEmbeddings(),
collection_name="documents",
)
# 使用它
ids = await vectorstore.aadd_texts(["Hello world"])
results = await vectorstore.asimilarity_search("Hi", k=1)
聊天消息历史
将对话历史存储在 CockroachDB 中,以构建持久化、分布式的聊天应用程序。
主要特性:
- 支持自动复制的分布式存储
- 强一致性(SERIALIZABLE)
- 基于会话的组织结构
- 高可用性
有关详细用法,请参阅 CockroachDB 聊天历史文档。
快速示例:
from langchain_cockroachdb import CockroachDBChatMessageHistory
import uuid
chat_history = CockroachDBChatMessageHistory(
session_id=str(uuid.uuid4()),
connection_string=CONNECTION_STRING,
table_name="chat_history",
)
from langchain.messages import HumanMessage, AIMessage
await chat_history.aadd_message(HumanMessage(content="Hello!"))
await chat_history.aadd_message(AIMessage(content="Hi there!"))
messages = await chat_history.aget_messages()
LangGraph 检查点器
将 LangGraph 工作流状态持久化到 CockroachDB 中,以实现短期记忆、人机交互和容错能力。
同时提供同步(CockroachDBSaver)和异步(AsyncCockroachDBSaver)实现。
首次使用 CockroachDB 检查点器时,请调用 checkpointer.setup()(或 await checkpointer.setup())以创建所需的表。
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import CockroachDBSaver
model = init_chat_model(model="claude-haiku-4-5-20251001")
DB_URI = os.environ["COCKROACHDB_URI"]
# 示例:"cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
# checkpointer.setup()
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "hi! I'm bob"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "what's my name?"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import AsyncCockroachDBSaver
model = init_chat_model(model="claude-haiku-4-5-20251001")
DB_URI = os.environ["COCKROACHDB_URI"]
# 示例:"cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
async with AsyncCockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
# await checkpointer.setup()
async def call_model(state: MessagesState):
response = await model.ainvoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "hi! I'm bob"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "what's my name?"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
请参阅上方的 获取 CockroachDB 连接字符串 以了解连接选项,包括 CockroachDB Cloud(生产环境推荐)、Docker 和本地二进制安装。对于本地开发,使用 sslmode=disable 是可以接受的;在生产环境中请务必使用 sslmode=verify-full。
该检查点器使用原始 psycopg3 连接(而非 SQLAlchemy),以兼容 LangGraph 的检查点接口。from_conn_string 工厂方法接受 cockroachdb:// URL 并自动进行转换。
行级 TTL(v0.2.1+)
利用 CockroachDB 的 行级 TTL 自动使旧检查点数据过期:
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
# 使超过 30 天的检查点过期,并每日清理
checkpointer.enable_ttl(ttl_interval="30 days", cron="@daily")
# 正常使用检查点器——旧数据会自动清理
graph = builder.compile(checkpointer=checkpointer)
# 若需后续禁用 TTL:
# checkpointer.disable_ttl()
异步变体:await checkpointer.aenable_ttl(ttl_interval="7 days", cron="@hourly")
TTL 删除是最终一致的——过期行在被 CockroachDB 后台作业按指定 cron 计划执行前,仍可进行查询。
性能优化(v0.2.1+)
该检查点器包含多项针对低延迟读取的优化:
- 批量获取:
list() 通过 2 次批量查询获取所有 blob 和写入记录,而非每个检查点执行 2 次查询
- 原始 BYTEA:使用 psycopg3 二进制协议,而非在 SQL 中对 blob 进行十六进制编码
- 预编译语句:
from_conn_string() 启用服务端查询计划缓存(prepare_threshold=5)
多租户
通过使用可选的命名空间列按租户隔离向量数据。启用后,所有 CRUD 和搜索操作都将限定在指定的命名空间内。
CockroachDB 的 C-SPANN 索引支持前缀列,因此命名空间过滤可直接利用向量索引,无需单独扫描。
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings
engine = CockroachDBEngine.from_connection_string(CONNECTION_STRING)
# Create the table with a namespace column
await engine.ainit_vectorstore_table(
table_name="documents",
vector_dimension=1536,
namespace_column="namespace",
)
# Create a vectorstore scoped to a specific tenant
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=OpenAIEmbeddings(),
collection_name="documents",
namespace="tenant_a",
)
# All operations are scoped to tenant_a
ids = await vectorstore.aadd_texts(["Tenant A document"])
results = await vectorstore.asimilarity_search("query", k=5)
为什么在 AI 应用中使用 CockroachDB?
原生分布式架构
- 水平可扩展性:添加节点以处理更多负载
- 多区域部署:为全球用户提供低延迟服务
- 自动重新平衡:数据自动分布在各个节点上
生产就绪的可靠性
- 高可用性:能够承受节点、机架和数据中心故障
- 零停机升级:滚动更新,无需停机
- 备份与恢复:支持时间点恢复
大规模向量搜索
- C-SPANN 索引:分布式近似最近邻搜索
- 原生向量类型:对嵌入向量的一等公民支持
- 实时索引:新增向量无需重建索引
- 多租户:使用前缀列实现高效的租户隔离
PostgreSQL 兼容性
- 轻松迁移:可无缝替换 PostgreSQL
- 熟悉的 SQL:标准 PostgreSQL 语法
- 现有工具:兼容 PostgreSQL 驱动和工具