Skip to main content
Apache Cassandra® 是一款广泛用于存储事务性应用数据的数据库。大型语言模型中函数和工具的引入,为生成式 AI 应用中现有数据的使用带来了一些令人兴奋的使用场景。
Cassandra Database 工具包使 AI 工程师能够高效地将智能体与 Cassandra 数据集成,提供以下功能:
  • 通过优化查询实现快速数据访问。大多数查询应在个位毫秒或更短时间内完成。
  • 模式自省,以增强 LLM 的推理能力
  • 与各种 Cassandra 部署兼容,包括 Apache Cassandra®、DataStax Enterprise™ 和 DataStax Astra™
  • 目前,该工具包仅限于 SELECT 查询和模式自省操作。(安全第一)
有关创建 Cassandra DB 智能体的更多信息,请参阅 CQL 智能体 cookbook

快速开始

  • 安装 cassio
  • 设置要连接的 Cassandra 数据库的环境变量
  • 初始化 CassandraDatabase
  • 使用 toolkit.get_tools() 将工具传递给您的智能体
  • 坐等它为您完成所有工作

工作原理

Cassandra 查询语言 (CQL) 是与 Cassandra 数据库交互的主要以人为中心方式。虽然在生成查询时提供了一定的灵活性,但它需要了解 Cassandra 数据建模最佳实践。LLM 函数调用赋予智能体推理能力,然后选择工具来满足请求。使用 LLM 的智能体在选择适当的工具包或工具包链时,应使用 Cassandra 特定的逻辑进行推理。这减少了 LLM 被迫提供自上而下解决方案时引入的随机性。您想让 LLM 不受限制地完全访问您的数据库吗?嗯,可能不想。为此,我们提供了在构建智能体问题时使用的提示: 您是一个具有以下功能和规则的 Apache Cassandra 专家查询分析机器人:
  • 您将接受来自最终用户的关于在数据库中查找特定数据的问题。
  • 您将检查数据库的模式并创建查询路径。
  • 您将向用户提供正确的查询以找到他们正在寻找的数据,并显示查询路径提供的步骤。
  • 您将使用 Apache Cassandra 查询的最佳实践,包括分区键和聚类列。
  • 避免在查询中使用 ALLOW FILTERING。
  • 目标是找到查询路径,因此可能需要查询其他表才能得到最终答案。
以下是 JSON 格式的查询路径示例:
 {
  "query_paths": [
    {
      "description": "Direct query to users table using email",
      "steps": [
        {
          "table": "user_credentials",
          "query":
             "SELECT userid FROM user_credentials WHERE email = 'example@example.com';"
        },
        {
          "table": "users",
          "query": "SELECT * FROM users WHERE userid = ?;"
        }
      ]
    }
  ]
}

提供的工具

cassandra_db_schema

收集已连接数据库或特定模式的所有模式信息。对于智能体确定操作至关重要。

cassandra_db_select_table_data

从特定键空间和表中选择数据。智能体可以传递谓词参数和返回记录数量的限制。

cassandra_db_query

cassandra_db_select_table_data 的实验性替代方案,接受完全由智能体构建的查询字符串而非参数。警告:这可能会导致一些不寻常的查询,性能可能不佳(甚至无法工作)。此功能可能会在未来版本中删除。如果它做了一些很酷的事情,我们也想知道。谁知道呢!

环境设置

安装以下 Python 模块:
pip install ipykernel python-dotenv cassio langchain-openai langchain langchain-community langchainhub

.env 文件

连接通过 cassio 使用 auto=True 参数,notebook 使用 OpenAI。您应相应地创建一个 .env 文件。 对于 Cassandra,设置:
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
对于 Astra,设置:
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_DATABASE_ID
ASTRA_DB_KEYSPACE
例如:
# 连接到 Astra:
ASTRA_DB_DATABASE_ID=a1b2c3d4-...
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
ASTRA_DB_KEYSPACE=notebooks

# 同时设置
OPENAI_API_KEY=sk-....
(您也可以修改以下代码直接通过 cassio 连接。)
from dotenv import load_dotenv

load_dotenv(override=True)
# Import necessary libraries
import os

import cassio
from langchain_classic import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.agent_toolkits.cassandra_database.toolkit import (
    CassandraDatabaseToolkit,
)
from langchain_community.tools.cassandra_database.prompt import QUERY_PATH_PROMPT
from langchain_community.utilities.cassandra_database import CassandraDatabase
from langchain_openai import ChatOpenAI

连接到 Cassandra 数据库

cassio.init(auto=True)
session = cassio.config.resolve_session()
if not session:
    raise Exception(
        "Check environment configuration or manually configure cassio connection parameters"
    )
# Test data pep

session = cassio.config.resolve_session()

session.execute("""DROP KEYSPACE IF EXISTS langchain_agent_test; """)

session.execute(
    """
CREATE KEYSPACE if not exists langchain_agent_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
)

session.execute(
    """
    CREATE TABLE IF NOT EXISTS langchain_agent_test.user_credentials (
    user_email text PRIMARY KEY,
    user_id UUID,
    password TEXT
);
"""
)

session.execute(
    """
    CREATE TABLE IF NOT EXISTS langchain_agent_test.users (
    id UUID PRIMARY KEY,
    name TEXT,
    email TEXT
);"""
)

session.execute(
    """
    CREATE TABLE IF NOT EXISTS langchain_agent_test.user_videos (
    user_id UUID,
    video_id UUID,
    title TEXT,
    description TEXT,
    PRIMARY KEY (user_id, video_id)
);
"""
)

user_id = "522b1fe2-2e36-4cef-a667-cd4237d08b89"
video_id = "27066014-bad7-9f58-5a30-f63fe03718f6"

session.execute(
    f"""
    INSERT INTO langchain_agent_test.user_credentials (user_id, user_email)
    VALUES ({user_id}, 'patrick@datastax.com');
"""
)

session.execute(
    f"""
    INSERT INTO langchain_agent_test.users (id, name, email)
    VALUES ({user_id}, 'Patrick McFadin', 'patrick@datastax.com');
"""
)

session.execute(
    f"""
    INSERT INTO langchain_agent_test.user_videos (user_id, video_id, title)
    VALUES ({user_id}, {video_id}, 'Use Langflow to Build a LangChain LLM Application in 5 Minutes');
"""
)

session.set_keyspace("langchain_agent_test")
# 创建 CassandraDatabase 实例
# 使用 cassio 会话连接到数据库
db = CassandraDatabase()
# 选择将驱动智能体的 LLM
# 只有某些模型支持此功能
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
toolkit = CassandraDatabaseToolkit(db=db)

tools = toolkit.get_tools()

print("可用工具:")
for tool in tools:
    print(tool.name + "\t- " + tool.description)
Available tools:
cassandra_db_schema -
    Input to this tool is a keyspace name, output is a table description
    of Apache Cassandra tables.
    If the query is not correct, an error message will be returned.
    If an error is returned, report back to the user that the keyspace
    doesn't exist and stop.

cassandra_db_query -
    Execute a CQL query against the database and get back the result.
    If the query is not correct, an error message will be returned.
    If an error is returned, rewrite the query, check the query, and try again.

cassandra_db_select_table_data -
    Tool for getting data from a table in an Apache Cassandra database.
    Use the WHERE clause to specify the predicate for the query that uses the
    primary key. A blank predicate will return all rows. Avoid this if possible.
    Use the limit to specify the number of rows to return. A blank limit will
    return all rows.
prompt = hub.pull("hwchase17/openai-tools-agent")

# 构建 OpenAI Tools 智能体
agent = create_openai_tools_agent(llm, tools, prompt)
input = (
    QUERY_PATH_PROMPT
    + "\n\nHere is your task: Find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the langchain_agent_test keyspace."
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

response = agent_executor.invoke({"input": input})

print(response["output"])
> Entering new AgentExecutor chain...

Invoking: `cassandra_db_schema` with `{'keyspace': 'langchain_agent_test'}`


Table Name: user_credentials
- Keyspace: langchain_agent_test
- Columns
  - password (text)
  - user_email (text)
  - user_id (uuid)
- Partition Keys: (user_email)
- Clustering Keys:

Table Name: user_videos
- Keyspace: langchain_agent_test
- Columns
  - description (text)
  - title (text)
  - user_id (uuid)
  - video_id (uuid)
- Partition Keys: (user_id)
- Clustering Keys: (video_id asc)


Table Name: users
- Keyspace: langchain_agent_test
- Columns
  - email (text)
  - id (uuid)
  - name (text)
- Partition Keys: (id)
- Clustering Keys:


Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_credentials', 'predicate': "user_email = 'patrick@datastax.com'", 'limit': 1}`


Row(user_email='patrick@datastax.com', password=None, user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'))
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_videos', 'predicate': 'user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89', 'limit': 10}`


Row(user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'), video_id=UUID('27066014-bad7-9f58-5a30-f63fe03718f6'), description='DataStax Academy is a free resource for learning Apache Cassandra.', title='DataStax Academy')To find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:

1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.

Here is the query path in JSON format:

\`\`\`json
{
  "query_paths": [
    {
      "description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
      "steps": [
        {
          "table": "user_credentials",
          "query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
        },
        {
          "table": "user_videos",
          "query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
        }
      ]
    }
  ]
}
\`\`\`

Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.

> Finished chain.
To find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:

1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.

Here is the query path in JSON format:

\`\`\`json
{
  "query_paths": [
    {
      "description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
      "steps": [
        {
          "table": "user_credentials",
          "query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
        },
        {
          "table": "user_videos",
          "query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
        }
      ]
    }
  ]
}
\`\`\`

Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.