Skip to main content
本页面提供将 Apache Cassandra® 用作向量存储的快速入门指南。
Cassandra 是一个 NoSQL、面向行、高度可扩展且高可用的数据库。从 5.0 版本开始,该数据库内置向量搜索功能
注意:除了访问数据库外,运行完整示例还需要 OpenAI API Key。

设置和通用依赖

使用本集成需要以下 Python 包。
pip install -qU langchain-community "cassio>=0.1.4"
注意:根据您的 LangChain 设置,您可能需要安装/升级本演示所需的其他依赖项 (具体来说,需要最新版本的 datasetsopenaipypdftiktoken,以及 langchain-community)。
import os
from getpass import getpass

from datasets import (
    load_dataset,
)
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("OPENAI_API_KEY = ")
embe = OpenAIEmbeddings()

导入向量存储

from langchain_community.vectorstores import Cassandra

连接参数

本页面展示的向量存储集成可与 Cassandra 以及其他使用 CQL(Cassandra 查询语言)协议的衍生数据库(如 Astra DB)配合使用。
DataStax Astra DB 是基于 Cassandra 构建的托管无服务器数据库,提供相同的接口和优势。
根据您连接的是 Cassandra 集群还是通过 CQL 连接到 Astra DB,在创建向量存储对象时需要提供不同的参数。

连接到 Cassandra 集群

您首先需要按照 Cassandra 驱动程序文档中的说明创建一个 cassandra.cluster.Session 对象。具体细节因情况而异(例如网络设置和身份验证),但大致如下:
from cassandra.cluster import Cluster

cluster = Cluster(["127.0.0.1"])
session = cluster.connect()
现在您可以将 session 以及所需的 keyspace 名称设置为全局 CassIO 参数:
import cassio

CASSANDRA_KEYSPACE = input("CASSANDRA_KEYSPACE = ")

cassio.init(session=session, keyspace=CASSANDRA_KEYSPACE)
现在您可以创建向量存储:
vstore = Cassandra(
    embedding=embe,
    table_name="cassandra_vector_demo",
    # session=None, keyspace=None  # Uncomment on older versions of LangChain
)
注意:您也可以在创建向量存储时直接将 session 和 keyspace 作为参数传入。但使用全局 cassio.init 设置更为便捷,特别是当您的应用程序以多种方式使用 Cassandra 时(例如用于向量存储、聊天记忆和 LLM 响应缓存),因为它允许将凭据和数据库连接管理集中在一处。

通过 CQL 连接到 Astra DB

在这种情况下,使用以下连接参数初始化 CassIO:
  • 数据库 ID,例如 01234567-89ab-cdef-0123-456789abcdef
  • 令牌,例如 AstraCS:6gBhNmsk135....(必须是”数据库管理员”令牌)
  • 可选的 Keyspace 名称(如果省略,将使用数据库的默认 Keyspace)
ASTRA_DB_ID = input("ASTRA_DB_ID = ")
ASTRA_DB_APPLICATION_TOKEN = getpass("ASTRA_DB_APPLICATION_TOKEN = ")

desired_keyspace = input("ASTRA_DB_KEYSPACE (optional, can be left empty) = ")
if desired_keyspace:
    ASTRA_DB_KEYSPACE = desired_keyspace
else:
    ASTRA_DB_KEYSPACE = None
import cassio

cassio.init(
    database_id=ASTRA_DB_ID,
    token=ASTRA_DB_APPLICATION_TOKEN,
    keyspace=ASTRA_DB_KEYSPACE,
)
现在您可以创建向量存储:
vstore = Cassandra(
    embedding=embe,
    table_name="cassandra_vector_demo",
    # session=None, keyspace=None  # Uncomment on older versions of LangChain
)

加载数据集

将源数据集中的每个条目转换为 Document,然后写入向量存储:
philo_dataset = load_dataset("datastax/philosopher-quotes")["train"]

docs = []
for entry in philo_dataset:
    metadata = {"author": entry["author"]}
    doc = Document(page_content=entry["quote"], metadata=metadata)
    docs.append(doc)

inserted_ids = vstore.add_documents(docs)
print(f"\nInserted {len(inserted_ids)} documents.")
在上面的代码中,metadata 字典从源数据创建,并作为 Document 的一部分。 使用 add_texts 添加更多条目:
texts = ["I think, therefore I am.", "To the things themselves!"]
metadatas = [{"author": "descartes"}, {"author": "husserl"}]
ids = ["desc_01", "huss_xy"]

inserted_ids_2 = vstore.add_texts(texts=texts, metadatas=metadatas, ids=ids)
print(f"\nInserted {len(inserted_ids_2)} documents.")
注意:您可以通过增加并发级别来加快 add_textsadd_documents 的执行速度, 这些批量操作——查看方法的 batch_size 参数 了解更多详情。根据网络和客户端机器规格,最佳参数选择可能有所不同。

运行搜索

本节演示元数据过滤和返回相似度分数:
results = vstore.similarity_search("Our life is what we make of it", k=3)
for res in results:
    print(f"* {res.page_content} [{res.metadata}]")
results_filtered = vstore.similarity_search(
    "Our life is what we make of it",
    k=3,
    filter={"author": "plato"},
)
for res in results_filtered:
    print(f"* {res.page_content} [{res.metadata}]")
results = vstore.similarity_search_with_score("Our life is what we make of it", k=3)
for res, score in results:
    print(f"* [SIM={score:3f}] {res.page_content} [{res.metadata}]")

MMR(最大边际相关性)搜索

results = vstore.max_marginal_relevance_search(
    "Our life is what we make of it",
    k=3,
    filter={"author": "aristotle"},
)
for res in results:
    print(f"* {res.page_content} [{res.metadata}]")

删除已存储的文档

delete_1 = vstore.delete(inserted_ids[:3])
print(f"all_succeed={delete_1}")  # True, all documents deleted
delete_2 = vstore.delete(inserted_ids[2:5])
print(f"some_succeeds={delete_2}")  # True, though some IDs were gone already

一个极简的 RAG 链

以下单元格将实现一个简单的 RAG 管道:
  • 下载一个示例 PDF 文件并将其加载到向量存储中;
  • 使用 LCEL(LangChain 表达式语言)创建一个 RAG 链,向量存储位于其核心;
  • 运行问答链。
!curl -L \
    "https://github.com/awesome-astra/datasets/blob/main/demo-resources/what-is-philosophy/what-is-philosophy.pdf?raw=true" \
    -o "what-is-philosophy.pdf"
pdf_loader = PyPDFLoader("what-is-philosophy.pdf")
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
docs_from_pdf = pdf_loader.load_and_split(text_splitter=splitter)

print(f"Documents from PDF: {len(docs_from_pdf)}.")
inserted_ids_from_pdf = vstore.add_documents(docs_from_pdf)
print(f"Inserted {len(inserted_ids_from_pdf)} documents.")
retriever = vstore.as_retriever(search_kwargs={"k": 3})

philo_template = """
You are a philosopher that draws inspiration from great thinkers of the past
to craft well-thought answers to user questions. Use the provided context as the basis
for your answers and do not make up new reasoning paths - just mix-and-match what you are given.
Your answers must be concise and to the point, and refrain from answering about other topics than philosophy.

CONTEXT:
{context}

QUESTION: {question}

YOUR ANSWER:"""

philo_prompt = ChatPromptTemplate.from_template(philo_template)

llm = ChatOpenAI()

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | philo_prompt
    | llm
    | StrOutputParser()
)
chain.invoke("How does Russel elaborate on Peirce's idea of the security blanket?")
更多内容,请查看使用 Astra DB(通过 CQL)的完整 RAG 模板,点击此处

清理

以下操作实质上是从 CassIO 获取 Session 对象,并用它运行一个 CQL DROP TABLE 语句: (您将丢失存储在其中的数据。)
cassio.config.resolve_session().execute(
    f"DROP TABLE {cassio.config.resolve_keyspace()}.cassandra_vector_demo;"
)

了解更多

有关更多信息、扩展快速入门和其他使用示例,请访问 CassIO 文档,了解更多关于使用 LangChain Cassandra 向量存储的内容。

归属声明

Apache Cassandra、Cassandra 和 Apache 是 Apache 软件基金会在美国和/或其他国家/地区的注册商标或商标。