Skip to main content

BigQuery 回调处理器

社区版Python预览版
Google BigQuery 是一个无服务器且经济高效的企业数据仓库,可跨云工作并随数据扩展。
BigQueryCallbackHandler 允许您将来自 LangChain 和 LangGraph 的事件记录到 Google BigQuery。这对于监控、审计和分析 LLM 应用程序的性能非常有用。 主要功能:
  • LangGraph 支持:自动检测 LangGraph 节点,包含 AGENT_STARTING / AGENT_COMPLETED 事件和顶层 INVOCATION_STARTING / INVOCATION_COMPLETED 边界(术语与 Google ADK 的 BigQueryAgentAnalyticsPlugin 保持一致)
  • 自动创建分析视图:为每种事件类型创建一个带类型化列的 CREATE OR REPLACE VIEW(例如 v_llm_response.usage_total_tokens 而非 JSON_VALUE(...)
  • 自动 Schema 升级:对现有表进行增量 ALTER TABLE ADD COLUMN,由 schema 版本标签控制,确保每个版本最多运行一次
  • 子代理归因:当元数据中未显式设置 agent 时,agent BigQuery 列会自动从 langgraph_node 派生,因此多代理图会按子代理标记,无需用户更改
  • 丰富的 LLM 遥测数据:Token 使用情况(prompt_tokens / completion_tokens / total_tokens / cached_content_token_count)、model_version、完整的 usage_metadatacache_metadata,以及 LLM_REQUEST 上的 llm_config(temperature、top_p 等)和 tools
  • 延迟跟踪:内置所有 LLM 和工具调用的延迟测量
  • 事件过滤:可配置的允许列表/拒绝列表,加上可选的 skip_internal_chain_events 启发式方法,可丢弃嘈杂的框架链(ChannelWriteRunnableLambda 等)而不破坏跟踪连续性
  • 图上下文管理器:显式 INVOCATION_* 边界,具有准确的计时
  • 请求间 flush():清空队列而不关闭处理器
  • 实时仪表板:FastAPI 监控 Web 应用,支持实时事件流
预览版发布BigQuery 回调处理器处于 预览版 状态。API 和功能可能会发生变化。 有关更多信息,请参阅 发布阶段说明
BigQuery Storage Write API此功能使用 BigQuery Storage Write API,这是一项付费服务。 有关费用信息,请参阅 BigQuery 文档

安装

您需要安装带有 bigquery 额外依赖项的 langchain-google-community。对于此示例,您还需要 langchain-google-genailanggraph
pip install "langchain-google-community[bigquery]" langchain langchain-google-genai langgraph

先决条件

  1. Google Cloud 项目,已启用 BigQuery API
  2. BigQuery 数据集:在使用回调处理器之前,创建一个数据集来存储日志表。如果表不存在,回调处理器会自动在数据集中创建必要的事件表。
  3. Google Cloud Storage 存储桶(可选):如果您计划记录多模态内容(图像、音频等),建议创建一个 GCS 存储桶来卸载大文件。
  4. 身份验证
    • 本地:运行 gcloud auth application-default login
    • 云端:确保您的服务账号具有所需的权限。

IAM 权限

为使回调处理器正常工作,运行应用程序的主体(例如服务账号、用户账号)需要以下 Google Cloud 角色:
  • 项目级别的 roles/bigquery.jobUser,用于运行 BigQuery 查询。
  • 表级别的 roles/bigquery.dataEditor,用于写入日志/事件数据。
  • 如果使用 GCS 卸载:目标存储桶上的 roles/storage.objectCreatorroles/storage.objectViewer

与 LangGraph 代理一起使用

要将 BigQueryCallbackHandler 与 LangGraph 代理一起使用,请使用您的 Google Cloud 项目 ID 和数据集 ID 实例化它。处理器在首次运行时创建事件表(以及每种事件类型的分析视图)。使用 graph_context() 方法跟踪顶层调用边界——它在进入时发出 INVOCATION_STARTING,在退出时发出 INVOCATION_COMPLETED(或在异常时发出 INVOCATION_ERROR),并具有准确的延迟。 在调用代理时,通过 config 对象中的 metadata 字典传递 session_iduser_id 和(可选)agent。如果未设置 agent,处理器会自动从 metadata['langgraph_node'] 派生,以便每个子代理的事件都能正确归因。
import os
from datetime import datetime

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
from langchain_google_community.callbacks.bigquery_callback import (
    BigQueryCallbackHandler,
    BigQueryLoggerConfig,
)
from langchain_google_genai import ChatGoogleGenerativeAI

# 1. 为代理定义工具
@tool
def get_current_time() -> str:
    """返回当前本地时间。"""
    now = datetime.now()
    return f"Current time: {now.strftime('%I:%M:%S %p')} on {now.strftime('%B %d, %Y')}"

@tool
def get_weather(city: str) -> str:
    """返回特定城市的当前天气。"""
    # 模拟的天气数据(生产环境中使用真实 API)
    weather_data = {
        "new york": {"temp": 22, "condition": "Clear"},
        "tokyo": {"temp": 24, "condition": "Sunny"},
        "london": {"temp": 14, "condition": "Overcast"},
    }
    city_lower = city.lower()
    if city_lower in weather_data:
        data = weather_data[city_lower]
        return f"Weather in {city.title()}: {data['temp']}°C, {data['condition']}"
    return f"Weather data for '{city}' not available."

@tool
def convert_currency(amount: float, from_currency: str, to_currency: str) -> str:
    """在货币之间转换金额。"""
    rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "JPY": 0.0067}
    from_curr, to_curr = from_currency.upper(), to_currency.upper()
    if from_curr not in rates or to_curr not in rates:
        return f"Unknown currency"
    result = amount * rates[from_curr] / rates[to_curr]
    return f"{amount} {from_curr} = {result:.2f} {to_curr}"

def run_agent_example(project_id: str):
    """运行带有 BigQuery 日志记录的 LangGraph 代理。"""

    dataset_id = "agent_analytics"

    # 2. 配置回调处理器。
    # `table_id` 默认为 "agent_events";仅当您想要不同的表时才显式传递它。
    # 处理器在首次运行时创建表和每种事件类型的 `v_*` 分析视图。
    config = BigQueryLoggerConfig(
        batch_size=1,
        batch_flush_interval=0.5,
        custom_tags={"env": "prod", "service": "travel"},  # 每个事件上的静态标签
    )

    handler = BigQueryCallbackHandler(
        project_id=project_id,
        dataset_id=dataset_id,
        config=config,
        graph_name="travel_assistant",  # 用于 INVOCATION_* 事件 + root_agent_name
    )

    # 3. 创建 LLM 和代理
    # 对于 Vertex AI 使用 project 参数,对于 Gemini Developer API 使用 api_key
    llm = ChatGoogleGenerativeAI(
        model="gemini-2.5-flash",
        project=project_id,  # 用于 Vertex AI
    )
    tools = [get_current_time, get_weather, convert_currency]
    agent = create_agent(llm, tools)

    # 4. 使用 graph_context 运行以获取 INVOCATION_STARTING / INVOCATION_COMPLETED
    query = "What time is it? What's the weather in Tokyo? How much is 100 USD in EUR?"

    run_metadata = {
        "session_id": "session-001",
        "user_id": "user-123",
        # `agent` 是可选的——省略时,处理器会回退到
        # metadata['langgraph_node'] 进行按子代理归因。
    }

    with handler.graph_context("travel_assistant", metadata=run_metadata):
        result = agent.invoke(
            {"messages": [HumanMessage(content=query)]},
            config={
                "callbacks": [handler],
                "metadata": run_metadata,
            },
        )

    print(f"Response: {result['messages'][-1].content}")

    # 5. 阻塞直到待处理的行被写入(在请求边界之间)。
    # 不会关闭处理器——后续调用仍然有效。
    handler.flush(timeout=5.0)

    # 6. 进程退出时的最终清理。
    handler.shutdown()

if __name__ == "__main__":
    project_id = os.environ.get("GCP_PROJECT_ID", "your-project-id")
    run_agent_example(project_id)

配置选项

您可以使用 BigQueryLoggerConfig 自定义回调处理器。
enabled
bool
default:"True"
要禁用处理器将数据记录到 BigQuery 表,请将此参数设置为 False
clustering_fields
List[str]
default:"['event_type', 'agent', 'user_id']"
自动创建 BigQuery 表时用于聚类的字段。
gcs_bucket_name
str
default:"None"
用于卸载大内容(图像、二进制对象、大文本)的 GCS 存储桶名称。如果未提供,大内容可能会被截断或替换为占位符。
connection_id
str
default:"None"
用作 ObjectRef 列授权者的 BigQuery 连接 ID(例如 us.my-connection)。使用 ObjectRef 与 BigQuery ML 时必需。
max_content_length
int
default:"512000"
(500 KB)在卸载到 GCS(如果已配置)或截断之前,内联存储在 BigQuery 中的文本内容的最大长度(以字符为单位)。
batch_size
int
default:"1"
写入 BigQuery 之前要批处理的事件数量。
batch_flush_interval
float
default:"1.0"
刷新部分批次之前等待的最长时间(以秒为单位)。
shutdown_timeout
float
default:"10.0"
关闭期间等待日志刷新的秒数。
event_allowlist
List[str]
default:"None"
要记录的事件类型列表。如果为 None,则记录除 event_denylist 中事件之外的所有事件。
event_denylist
List[str]
default:"None"
要跳过记录的事件类型列表。
log_multi_modal_content
bool
default:"True"
是否记录详细的内容部分(包括 GCS 引用)。
table_id
str
default:"agent_events"
如果未显式提供给回调处理器构造函数,则使用的默认表 ID。
retry_config
RetryConfig
default:"RetryConfig()"
写入 BigQuery 失败时的重试逻辑配置(最大重试次数、延迟、乘数)。
queue_max_size
int
default:"10000"
在丢弃新事件之前,内部缓冲队列中可容纳的最大事件数。
skip_internal_chain_events
bool
default:"False"
当为 True 时,丢弃由框架内部 Runnables(ChannelWriteChannelReadBranchRunnableLambdaRunnableSequenceRunnableParallelRunnableAssignRunnablePassthroughRunnableBindingPregel__start____end__)发出的 CHAIN_* 事件。跳过的运行仍会在跟踪注册表中注册,因此子 LLM/工具事件会将真实的图根作为其 trace_id(没有中断的跟踪)。每次抑制都会记录一个 DEBUG 行,以便该启发式方法可审计。
custom_tags
dict[str, Any]
default:"{}"
写入每个事件行 attributes.custom_tags 的静态标签。对于按部署、队列或实验切片仪表板很有用(例如 {"env": "prod", "agent_role": "sales"})。
log_session_metadata
bool
default:"True"
当为 True 时,将用户提供的 RunnableConfig 元数据(减去我们已提升为一等列的键,如 session_iduser_idagentlanggraph_node)转储到 attributes.session_metadata 下。
content_formatter
Callable[[Any, str], Any]
default:"None"
可选的 (raw_content, event_type) -> formatted 钩子,在内容解析之前调用。对于 PII 编辑或强制转换自定义负载很有用。失败时回退到原始内容并发出警告——格式化器不能破坏代理。
auto_schema_upgrade
bool
default:"True"
当为 True 时,对事件模式中此处理器未来版本添加的任何新字段进行增量 ALTER TABLE ADD COLUMN。由 langchain_bq_schema_version 表标签控制,因此差异运行每个模式版本最多一次。从不删除、重命名或重新键入列。
create_views
bool
default:"True"
当为 True 时,在事件表旁边自动 CREATE OR REPLACE 每种事件类型的分析视图。每个视图将 JSON 列解构为类型化的顶级列(参见下面的自动创建的分析视图)。
view_prefix
str
default:"v"
自动创建的视图名称的前缀(v_llm_requestv_tool_completed 等)。当多个处理器实例共享一个数据集时,按表设置以避免冲突。
以下代码示例展示了如何为带有事件过滤的 BigQuery 回调处理器定义配置:
from langchain_google_community.callbacks.bigquery_callback import (
    BigQueryCallbackHandler,
    BigQueryLoggerConfig,
)

# 1. 配置 BigQueryLoggerConfig
config = BigQueryLoggerConfig(
    enabled=True,
    event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"],  # 仅记录这些特定事件
    shutdown_timeout=10.0,  # 退出时等待日志刷新最多 10 秒
    max_content_length=500,  # 将内容截断为 500 个字符
)

# 2. 初始化回调处理器
handler = BigQueryCallbackHandler(
    project_id="your-project-id",
    dataset_id="your_dataset",
    table_id="your_table",
    config=config,
)

Schema 和生产设置

如果表不存在,插件会自动创建该表。但是,对于生产环境,我们建议使用以下 DDL 手动创建表,该 DDL 利用 JSON 类型以实现灵活性,并使用 REPEATED RECORD 来处理多模态内容。 推荐的 DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events`
(
  timestamp TIMESTAMP NOT NULL OPTIONS(description="事件发生时的 UTC 时间戳。"),
  event_type STRING OPTIONS(description="事件的类别。"),
  agent STRING OPTIONS(description="代理的名称。"),
  session_id STRING OPTIONS(description="对话会话的唯一标识符。"),
  invocation_id STRING OPTIONS(description="单次轮次的唯一标识符。"),
  user_id STRING OPTIONS(description="最终用户的标识符。"),
  trace_id STRING OPTIONS(description="OpenTelemetry 跟踪 ID。"),
  span_id STRING OPTIONS(description="OpenTelemetry span ID。"),
  parent_span_id STRING OPTIONS(description="OpenTelemetry 父 span ID。"),
  content JSON OPTIONS(description="事件的主要负载。"),
  content_parts ARRAY<STRUCT<
    mime_type STRING,
    uri STRING,
    object_ref STRUCT<
      uri STRING,
      version STRING,
      authorizer STRING,
      details JSON
    >,
    text STRING,
    part_index INT64,
    part_attributes STRING,
    storage_mode STRING
  >> OPTIONS(description="对于多模态事件,包含内容部分列表。"),
  attributes JSON OPTIONS(description="任意键值对。"),
  latency_ms JSON OPTIONS(description="延迟测量。"),
  status STRING OPTIONS(description="事件的结果。"),
  error_message STRING OPTIONS(description="详细的错误消息。"),
  is_truncated BOOLEAN OPTIONS(description="指示内容是否被截断的标志。")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;

自动创建的分析视图

当处理器创建事件表时,它还会在旁边为每种事件类型创建一个 CREATE OR REPLACE VIEW(由 create_views 控制,默认为 True)。每个视图将 JSON 列解构为类型化的顶级列,因此分析查询不必每次都拼写 JSON_VALUE(...)
-- 使用自动视图
SELECT agent, SUM(usage_total_tokens) AS tokens
FROM `PROJECT.DATASET.v_llm_response`
GROUP BY agent;

-- 等效于对原始表的查询
SELECT agent,
       SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS tokens
FROM `PROJECT.DATASET.agent_events`
WHERE event_type = 'LLM_RESPONSE'
GROUP BY agent;
默认视图名称(可通过 view_prefix 配置)以及每个视图在始终包含的列之上添加的类型化列:
视图额外的类型化列
v_invocation_starting(无——仅始终包含的列)
v_invocation_completed / v_invocation_errortotal_ms
v_agent_startingnode_name, step
v_agent_completednode_name, step, total_ms
v_agent_errornode_name, total_ms
v_llm_requestmodel, request_content, llm_config, tools
v_llm_responseresponse, usage_prompt_tokens, usage_completion_tokens, usage_total_tokens, usage_cached_tokens, context_cache_hit_rate, total_ms, ttft_ms, model_version, usage_metadata, cache_metadata
v_llm_errortotal_ms
v_tool_startingtool_name, tool_args
v_tool_completedtool_name, tool_result, total_ms
v_tool_errortool_name, total_ms
v_retriever_startquery
v_retriever_end / v_retriever_errortotal_ms
每个视图还公开原始表中始终包含的列(timestampevent_typeagentsession_idinvocation_iduser_idtrace_idspan_idparent_span_idstatuserror_messageis_truncated),以及从 attributes JSON 中提取的三列:root_agent_namecustom_tagssession_metadata

自动 Schema 升级

当处理器的模式获得新列时,现有表会自动进行增量升级。处理器在启动时读取表,并为任何新字段运行 ALTER TABLE ADD COLUMN,由 langchain_bq_schema_version 表标签控制,因此差异运行每个模式版本最多一次。从不删除、重命名或重新键入列。使用 auto_schema_upgrade=False 禁用。

子代理归因

对于多代理 LangGraph 部署,agent BigQuery 列会自动从以下回退链派生:
  1. metadata["agent"] — 显式用户提供的值(最高优先级)
  2. metadata["langgraph_node"] — 活动的 LangGraph 节点,因此每个子代理的事件都用节点名称标记
  3. metadata["checkpoint_ns"] — LangGraph 检查点命名空间
  4. handler.graph_name — 顶层 INVOCATION_* 事件的回退
因此,多代理图(例如 supervisor → TheCriticTheMeteo 等)会产生遥测数据,其中每个事件都归因于发起的子代理,无需用户更改。

事件类型和负载

content 列包含特定于 event_typeJSON 对象。 content_parts 列提供了内容的结构化视图,对于图像或卸载的数据特别有用。
内容截断
  • 可变内容字段被截断为 max_content_length(在 BigQueryLoggerConfig 中配置,默认为 500KB)。
  • 如果配置了 gcs_bucket_name,大内容会卸载到 GCS 而不是被截断,并且引用存储在 content_parts.object_ref 中。
content 始终包含 summary每个事件行的 content JSON 对象都包含一个 summary 字符串,其中包含负载的人类可读预览(上限为 max_content_length)。 下面的每种事件表中省略了 summary 以保持形状可读,但它始终存在于磁盘上。

LLM 交互

这些事件跟踪发送到 LLM 的原始请求和从 LLM 接收的响应。
事件类型内容 (JSON) 结构属性 (JSON)
LLM_REQUEST聊天模型: {"messages": [<转储的消息>]}
旧版模型: {"prompt": [<提示>]}
{"tags": ["..."], "model": "...", "llm_config": {"temperature": 0.2, ...}, "tools": ["get_weather", ...]}
LLM_RESPONSE{"response": "<生成的文本>"}{"usage": {"prompt_tokens": 100, "completion_tokens": 25, "total_tokens": 125}, "model_version": "gemini-2.5-flash-001", "usage_metadata": {"cached_content_token_count": 30, ...}, "cache_metadata": {...}}
LLM_ERROR{"data": null}(实际的异常文本位于 error_message 列中){}

子代理(LangGraph 节点)和调用生命周期

这些事件来自 LangGraph 的节点和图上下文生命周期。 当未显式设置 agent 时,agent 会自动从 metadata['langgraph_node'] 派生,因此事件按子代理标记。
事件类型描述
AGENT_STARTING / AGENT_COMPLETED / AGENT_ERRORLangGraph 节点开始 / 结束 / 出错
INVOCATION_STARTING / INVOCATION_COMPLETED / INVOCATION_ERROR顶层图调用开始 / 结束 / 出错(由 handler.graph_context() 发出)

工具使用

这些事件跟踪代理执行的工具。工具名称也显示在 attributes.tool_name 中,并且(对于自动视图)作为类型化的 tool_name 列。
事件类型内容 (JSON) 结构
TOOL_STARTING{"tool": "<名称>", "input": "<输入字符串>"} — 例如 {"tool": "get_weather", "input": "city='Paris'"}
TOOL_COMPLETED{"tool": "<名称>", "result": "<输出字符串>"} — 例如 {"tool": "get_weather", "result": "25°C, Sunny"}
TOOL_ERROR{"data": null}(实际的异常文本位于 error_message 列中)

链执行

这些事件针对非图 LangChain Runnable 生命周期触发(图调用和 LangGraph 节点使用上面列出的 INVOCATION_* / AGENT_* 事件)。
事件类型内容 (JSON) 结构
CHAIN_START{"data": "<JSON 字符串化的输入>"}
CHAIN_END{"data": "<JSON 字符串化的输出>"}
CHAIN_ERROR{"data": null}(参见 error_message 列)

检索器使用

这些事件跟踪检索器的执行。
事件类型内容 (JSON) 结构
RETRIEVER_START{"data": "<查询字符串>"} — 例如 {"data": "What is the capital of France?"}
RETRIEVER_END{"data": "<JSON 字符串化的文档列表>"}
RETRIEVER_ERROR{"data": null}(实际的异常文本位于 error_message 列中)

代理操作

这些事件来自旧版 LangChain AgentExecutor 风格的代理(on_agent_action / on_agent_finish)。data 字段包含操作/完成负载的 JSON 序列化字符串。
事件类型内容 (JSON) 结构
AGENT_ACTION{"data": "{\"tool\": \"Calculator\", \"input\": \"2 + 2\"}"}
AGENT_FINISH{"data": "{\"output\": \"The answer is 4\"}"}

其他事件

事件类型内容 (JSON) 结构
TEXT{"data": "<文本>"} — 例如 {"data": "Some logging text..."}

高级分析查询

一旦您的代理开始运行并记录事件,您就可以对 agent_events 表执行强大的分析。

1. 重建跟踪(对话轮次)

使用 trace_id 将属于单个执行流的所有事件(Chain、LLM、Tool)分组。
SELECT
  timestamp,
  event_type,
  span_id,
  parent_span_id,
  -- 根据事件类型提取摘要或特定内容
  COALESCE(
    JSON_VALUE(content, '$.messages[0].content'),
    JSON_VALUE(content, '$.summary'),
    JSON_VALUE(content)
  ) AS summary,
  JSON_VALUE(latency_ms, '$.total_ms') AS duration_ms
FROM
  `your-gcp-project-id.adk_agent_logs.agent_events`
WHERE
    -- 替换为日志中的特定 trace_id
  trace_id = '019bb986-a0db-7da1-802d-2725795ab340'
ORDER BY
  timestamp ASC;

2. 分析 LLM 延迟和 Token 使用情况

计算 LLM 调用的平均延迟和总 Token 使用量。
SELECT
  JSON_VALUE(attributes, '$.model') AS model,
  COUNT(*) AS total_calls,
  AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms,
  SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokens
FROM
  `your-gcp-project-id.adk_agent_logs.agent_events`
WHERE
  event_type = 'LLM_RESPONSE'
GROUP BY
  1;

3. 使用 BigQuery 远程模型(Gemini)分析多模态内容

如果您将图像卸载到 GCS,可以使用 BigQuery ML 直接分析它们。
SELECT
  logs.session_id,
  -- 获取图像的签名 URL(可选,用于查看)
  STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
  -- 使用远程模型(例如 gemini-2.5-flash)分析图像
  AI.GENERATE(
    ('Describe this image briefly. What company logo?', parts.object_ref)
  ) AS generated_result
FROM
  `your-gcp-project-id.adk_agent_logs.agent_events` logs,
  UNNEST(logs.content_parts) AS parts
WHERE
  parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;

4. 分析 Span 层次结构和持续时间

使用 span ID 可视化代理操作(LLM 调用、工具使用)的执行流程和性能。
SELECT
  span_id,
  parent_span_id,
  event_type,
  timestamp,
  -- 从已完成操作的 latency_ms 中提取持续时间
  CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms,
  -- 识别特定的工具或操作
  COALESCE(
    JSON_VALUE(content, '$.tool'),
    'LLM_CALL'
  ) as operation
FROM `your-gcp-project-id.adk_agent_logs.agent_events`
WHERE trace_id = 'your-trace-id'
  AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
ORDER BY timestamp ASC;

5. 查询卸载的内容(获取签名 URL)

SELECT
  timestamp,
  event_type,
  part.mime_type,
  part.storage_mode,
  part.object_ref.uri AS gcs_uri,
  -- 生成签名 URL 以直接读取内容(需要 connection_id 配置)
  STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.adk_agent_logs.agent_events`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;

6. 高级 SQL 场景

这些高级模式展示了如何使用 BigQuery ML 对数据进行会话化、分析工具使用情况并执行根本原因分析。
-- 1. 会话化对话历史记录(创建视图)
-- 将所有事件合并为每个会话一行,并带有格式化的历史记录。
CREATE OR REPLACE VIEW `your-project.your-dataset.agent_sessions` AS
SELECT
  session_id,
  user_id,
  MIN(timestamp) AS session_start,
  MAX(timestamp) AS session_end,
  ARRAY_AGG(
    STRUCT(timestamp, event_type, TO_JSON_STRING(content) as content, error_message)
    ORDER BY timestamp ASC
  ) AS events,
  STRING_AGG(
      CASE
          -- LLM_REQUEST 在 content.messages 下携带用户的提示
          WHEN event_type = 'LLM_REQUEST' THEN CONCAT('User: ', JSON_VALUE(content, '$.summary'))
          WHEN event_type = 'LLM_RESPONSE' THEN CONCAT('Agent: ', JSON_VALUE(content, '$.summary'))
          WHEN event_type = 'TOOL_STARTING' THEN CONCAT('SYS: Calling ', JSON_VALUE(content, '$.tool'))
          WHEN event_type = 'TOOL_COMPLETED' THEN CONCAT('SYS: Result from ', JSON_VALUE(content, '$.tool'))
          WHEN event_type = 'TOOL_ERROR' THEN CONCAT('SYS: ERROR in ', JSON_VALUE(content, '$.tool'))
          ELSE NULL
      END,
      '\n' ORDER BY timestamp ASC
  ) AS full_conversation
FROM
  `your-project.your-dataset.agent_events`
GROUP BY
  session_id, user_id;

-- 2. 工具使用分析
-- 从内容中提取工具名称(自动视图 `v_tool_completed`
-- 直接公开 `tool_name`,如果您想跳过 JSON_VALUE)。
SELECT
  JSON_VALUE(content, '$.tool') AS tool_name,
  event_type,
  COUNT(*) as count
FROM `your-project.your-dataset.agent_events`
WHERE event_type IN ('TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR')
GROUP BY 1, 2
ORDER BY tool_name, event_type;

-- 3. 细粒度成本和 Token 估算
-- 根据内容字符长度估算 Token(大约 4 个字符/Token)
SELECT
  session_id,
  COUNT(*) as interaction_count,
  SUM(LENGTH(TO_JSON_STRING(content))) / 4 AS estimated_tokens,
  -- 示例成本:每 1k Token $0.0001
  ROUND((SUM(LENGTH(TO_JSON_STRING(content))) / 4) / 1000 * 0.0001, 6) AS estimated_cost_usd
FROM `your-project.your-dataset.agent_events`
GROUP BY session_id
ORDER BY estimated_cost_usd DESC
LIMIT 5;

-- 4. AI 驱动的根本原因分析(需要 BigQuery ML)
-- 使用 Gemini 分析失败的会话
SELECT
  session_id,
  AI.GENERATE(
    ('Analyze this conversation and explain the failure root cause. Log: ', full_conversation),
    connection_id => 'your-project.us.bqml_connection',
    endpoint => 'gemini-2.5-flash'
  ).result AS root_cause_explanation
FROM `your-project.your-dataset.agent_sessions`
WHERE error_message IS NOT NULL
LIMIT 5;

BigQuery 中的对话分析

对话分析您还可以使用 BigQuery 对话分析 使用自然语言分析您的代理日志。 只需提出这样的问题:
  • “显示随时间变化的错误率”
  • “最常见的工具调用是什么?”
  • “识别 Token 使用量高的会话”

Looker Studio 仪表板

您可以使用我们预构建的 Looker Studio 仪表板模板 来可视化代理的性能。 要将此仪表板连接到您自己的 BigQuery 表,请使用以下链接格式,将占位符替换为您特定的项目、数据集和表 ID:
https://lookerstudio.google.com/reporting/create?c.reportId=f1c5b513-3095-44f8-90a2-54953d41b125&ds.ds3.connector=bigQuery&ds.ds3.type=TABLE&ds.ds3.projectId=<your-project-id>&ds.ds3.datasetId=<your-dataset-id>&ds.ds3.tableId=<your-table-id>

LangGraph 集成

BigQueryCallbackHandler 为 LangGraph 代理提供增强支持,包括自动节点检测、图级跟踪和延迟测量。

LangGraph 事件类型

除了标准的 LangChain 事件外,回调处理器还会自动检测和记录 LangGraph 特定事件:
事件类型描述
AGENT_STARTING当 LangGraph 节点开始执行时发出
AGENT_COMPLETED当 LangGraph 节点成功完成时发出
AGENT_ERROR当 LangGraph 节点失败时发出
INVOCATION_STARTING当图执行开始时发出(通过上下文管理器)
INVOCATION_COMPLETED当图执行完成时发出
INVOCATION_ERROR当图执行失败时发出

图上下文管理器

使用 graph_context() 方法显式标记图执行边界。这可以启用具有准确延迟测量的 INVOCATION_STARTINGINVOCATION_COMPLETED 事件:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain_google_community.callbacks.bigquery_callback import (
    BigQueryCallbackHandler,
    BigQueryLoggerConfig,
)

# 使用图名称初始化处理器
handler = BigQueryCallbackHandler(
    project_id="your-project-id",
    dataset_id="agent_analytics",
    table_id="agent_events",
    graph_name="my_agent",
)

# 创建您的代理
agent = create_agent(llm, tools)

# 使用图上下文管理器获取正确的 INVOCATION_STARTING/INVOCATION_COMPLETED 事件
run_metadata = {
    "session_id": "session-123",
    "user_id": "user-456",
    "agent": "my_agent",
}

with handler.graph_context("my_agent", metadata=run_metadata):
    result = agent.invoke(
        {"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
        config={
            "callbacks": [handler],
            "metadata": run_metadata,
        },
    )

延迟跟踪

回调处理器自动跟踪所有操作的延迟,并将测量值存储在 latency_ms JSON 列中:
-- 按事件类型查询延迟
SELECT
    event_type,
    agent,
    COUNT(*) as count,
    ROUND(AVG(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64)), 2) as avg_latency_ms,
    ROUND(APPROX_QUANTILES(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64), 100)[OFFSET(95)], 2) as p95_latency_ms
FROM `your-project.your-dataset.agent_events`
WHERE DATE(timestamp) = CURRENT_DATE()
  AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED', 'INVOCATION_COMPLETED')
GROUP BY event_type, agent
ORDER BY avg_latency_ms DESC;

事件过滤

使用 event_allowlistevent_denylist 控制记录哪些事件:
from langchain_google_community.callbacks.bigquery_callback import (
    BigQueryCallbackHandler,
    BigQueryLoggerConfig,
)

# 生产配置:仅记录重要事件
config = BigQueryLoggerConfig(
    event_allowlist=[
        "LLM_RESPONSE",
        "LLM_ERROR",
        "TOOL_COMPLETED",
        "TOOL_ERROR",
        "INVOCATION_COMPLETED",
        "INVOCATION_ERROR",
    ],
)

handler = BigQueryCallbackHandler(
    project_id="your-project-id",
    dataset_id="agent_analytics",
    config=config,
)
或排除嘈杂的事件:
# 排除链事件但记录其他所有内容
config = BigQueryLoggerConfig(
    event_denylist=["CHAIN_START", "CHAIN_END"],
)

示例和资源

示例代码

以下示例展示了 BigQuery 回调处理器的各种功能:
示例描述
基本示例带有 LLM 调用的基本回调用法
LangGraph 代理带有 6 个真实工具的完整 ReAct 代理
异步示例带有并发查询的异步处理器
事件过滤允许列表/拒绝列表配置
示例数据生成器跨多个代理类型生成示例数据

分析笔记本

LangGraph 代理分析笔记本 提供了全面的 BigQuery 分析查询,用于:
  • 实时事件监控
  • 工具使用分析
  • 延迟分析和趋势
  • 错误调试
  • 用户参与度指标
  • 时间序列可视化

实时监控仪表板

提供了一个 基于 FastAPI 的监控仪表板,用于实时代理监控: 功能:
  • 通过服务器发送事件(SSE)的实时事件流
  • 用于事件分布和延迟趋势的交互式图表
  • 带有详细时间线视图的会话跟踪
  • 20 多个用于分析查询的 REST API 端点
  • 每 5 秒自动刷新
# 运行仪表板
cd libs/community/examples/bigquery_callback/webapp
pip install -r requirements.txt
uvicorn main:app --port 8001
# 打开 http://localhost:8001

反馈

我们欢迎您对 BigQuery 代理分析的反馈。如果您有任何问题、建议或遇到任何问题,请通过 bqaa-feedback@google.com 联系团队。

附加资源