BigQuery 回调处理器
社区版Python预览版
Google BigQuery 是一个无服务器且经济高效的企业数据仓库,可跨云工作并随数据扩展。
BigQueryCallbackHandler 允许您将来自 LangChain 和 LangGraph 的事件记录到 Google BigQuery。这对于监控、审计和分析 LLM 应用程序的性能非常有用。
主要功能:
- LangGraph 支持:自动检测 LangGraph 节点,包含
AGENT_STARTING/AGENT_COMPLETED事件和顶层INVOCATION_STARTING/INVOCATION_COMPLETED边界(术语与 Google ADK 的BigQueryAgentAnalyticsPlugin保持一致) - 自动创建分析视图:为每种事件类型创建一个带类型化列的
CREATE OR REPLACE VIEW(例如v_llm_response.usage_total_tokens而非JSON_VALUE(...)) - 自动 Schema 升级:对现有表进行增量
ALTER TABLE ADD COLUMN,由 schema 版本标签控制,确保每个版本最多运行一次 - 子代理归因:当元数据中未显式设置
agent时,agentBigQuery 列会自动从langgraph_node派生,因此多代理图会按子代理标记,无需用户更改 - 丰富的 LLM 遥测数据:Token 使用情况(
prompt_tokens/completion_tokens/total_tokens/cached_content_token_count)、model_version、完整的usage_metadata、cache_metadata,以及LLM_REQUEST上的llm_config(temperature、top_p 等)和tools - 延迟跟踪:内置所有 LLM 和工具调用的延迟测量
- 事件过滤:可配置的允许列表/拒绝列表,加上可选的
skip_internal_chain_events启发式方法,可丢弃嘈杂的框架链(ChannelWrite、RunnableLambda等)而不破坏跟踪连续性 - 图上下文管理器:显式
INVOCATION_*边界,具有准确的计时 - 请求间
flush():清空队列而不关闭处理器 - 实时仪表板:FastAPI 监控 Web 应用,支持实时事件流
预览版发布BigQuery 回调处理器处于 预览版 状态。API 和功能可能会发生变化。
有关更多信息,请参阅
发布阶段说明。
安装
您需要安装带有bigquery 额外依赖项的 langchain-google-community。对于此示例,您还需要 langchain-google-genai 和 langgraph。
先决条件
- Google Cloud 项目,已启用 BigQuery API。
- BigQuery 数据集:在使用回调处理器之前,创建一个数据集来存储日志表。如果表不存在,回调处理器会自动在数据集中创建必要的事件表。
- Google Cloud Storage 存储桶(可选):如果您计划记录多模态内容(图像、音频等),建议创建一个 GCS 存储桶来卸载大文件。
- 身份验证:
- 本地:运行
gcloud auth application-default login。 - 云端:确保您的服务账号具有所需的权限。
- 本地:运行
IAM 权限
为使回调处理器正常工作,运行应用程序的主体(例如服务账号、用户账号)需要以下 Google Cloud 角色:- 项目级别的
roles/bigquery.jobUser,用于运行 BigQuery 查询。 - 表级别的
roles/bigquery.dataEditor,用于写入日志/事件数据。 - 如果使用 GCS 卸载:目标存储桶上的
roles/storage.objectCreator和roles/storage.objectViewer。
与 LangGraph 代理一起使用
要将BigQueryCallbackHandler 与 LangGraph 代理一起使用,请使用您的 Google Cloud 项目 ID 和数据集 ID 实例化它。处理器在首次运行时创建事件表(以及每种事件类型的分析视图)。使用 graph_context() 方法跟踪顶层调用边界——它在进入时发出 INVOCATION_STARTING,在退出时发出 INVOCATION_COMPLETED(或在异常时发出 INVOCATION_ERROR),并具有准确的延迟。
在调用代理时,通过 config 对象中的 metadata 字典传递 session_id、user_id 和(可选)agent。如果未设置 agent,处理器会自动从 metadata['langgraph_node'] 派生,以便每个子代理的事件都能正确归因。
配置选项
您可以使用BigQueryLoggerConfig 自定义回调处理器。
要禁用处理器将数据记录到 BigQuery 表,请将此参数设置为
False。自动创建 BigQuery 表时用于聚类的字段。
用于卸载大内容(图像、二进制对象、大文本)的 GCS 存储桶名称。如果未提供,大内容可能会被截断或替换为占位符。
用作 ObjectRef 列授权者的 BigQuery 连接 ID(例如
us.my-connection)。使用 ObjectRef 与 BigQuery ML 时必需。(500 KB)在卸载到 GCS(如果已配置)或截断之前,内联存储在 BigQuery 中的文本内容的最大长度(以字符为单位)。
写入 BigQuery 之前要批处理的事件数量。
刷新部分批次之前等待的最长时间(以秒为单位)。
关闭期间等待日志刷新的秒数。
要记录的事件类型列表。如果为
None,则记录除 event_denylist 中事件之外的所有事件。要跳过记录的事件类型列表。
是否记录详细的内容部分(包括 GCS 引用)。
如果未显式提供给回调处理器构造函数,则使用的默认表 ID。
写入 BigQuery 失败时的重试逻辑配置(最大重试次数、延迟、乘数)。
在丢弃新事件之前,内部缓冲队列中可容纳的最大事件数。
当为
True 时,丢弃由框架内部 Runnables(ChannelWrite、ChannelRead、Branch、RunnableLambda、RunnableSequence、RunnableParallel、RunnableAssign、RunnablePassthrough、RunnableBinding、Pregel、__start__、__end__)发出的 CHAIN_* 事件。跳过的运行仍会在跟踪注册表中注册,因此子 LLM/工具事件会将真实的图根作为其 trace_id(没有中断的跟踪)。每次抑制都会记录一个 DEBUG 行,以便该启发式方法可审计。写入每个事件行
attributes.custom_tags 的静态标签。对于按部署、队列或实验切片仪表板很有用(例如 {"env": "prod", "agent_role": "sales"})。当为
True 时,将用户提供的 RunnableConfig 元数据(减去我们已提升为一等列的键,如 session_id、user_id、agent、langgraph_node)转储到 attributes.session_metadata 下。可选的
(raw_content, event_type) -> formatted 钩子,在内容解析之前调用。对于 PII 编辑或强制转换自定义负载很有用。失败时回退到原始内容并发出警告——格式化器不能破坏代理。当为
True 时,对事件模式中此处理器未来版本添加的任何新字段进行增量 ALTER TABLE ADD COLUMN。由 langchain_bq_schema_version 表标签控制,因此差异运行每个模式版本最多一次。从不删除、重命名或重新键入列。当为
True 时,在事件表旁边自动 CREATE OR REPLACE 每种事件类型的分析视图。每个视图将 JSON 列解构为类型化的顶级列(参见下面的自动创建的分析视图)。自动创建的视图名称的前缀(
v_llm_request、v_tool_completed 等)。当多个处理器实例共享一个数据集时,按表设置以避免冲突。Schema 和生产设置
如果表不存在,插件会自动创建该表。但是,对于生产环境,我们建议使用以下 DDL 手动创建表,该 DDL 利用 JSON 类型以实现灵活性,并使用 REPEATED RECORD 来处理多模态内容。 推荐的 DDL:自动创建的分析视图
当处理器创建事件表时,它还会在旁边为每种事件类型创建一个CREATE OR REPLACE VIEW(由 create_views 控制,默认为 True)。每个视图将 JSON 列解构为类型化的顶级列,因此分析查询不必每次都拼写 JSON_VALUE(...):
view_prefix 配置)以及每个视图在始终包含的列之上添加的类型化列:
| 视图 | 额外的类型化列 |
|---|---|
v_invocation_starting | (无——仅始终包含的列) |
v_invocation_completed / v_invocation_error | total_ms |
v_agent_starting | node_name, step |
v_agent_completed | node_name, step, total_ms |
v_agent_error | node_name, total_ms |
v_llm_request | model, request_content, llm_config, tools |
v_llm_response | response, usage_prompt_tokens, usage_completion_tokens, usage_total_tokens, usage_cached_tokens, context_cache_hit_rate, total_ms, ttft_ms, model_version, usage_metadata, cache_metadata |
v_llm_error | total_ms |
v_tool_starting | tool_name, tool_args |
v_tool_completed | tool_name, tool_result, total_ms |
v_tool_error | tool_name, total_ms |
v_retriever_start | query |
v_retriever_end / v_retriever_error | total_ms |
timestamp、event_type、agent、session_id、invocation_id、user_id、trace_id、span_id、parent_span_id、status、error_message、is_truncated),以及从 attributes JSON 中提取的三列:root_agent_name、custom_tags、session_metadata。
自动 Schema 升级
当处理器的模式获得新列时,现有表会自动进行增量升级。处理器在启动时读取表,并为任何新字段运行ALTER TABLE ADD COLUMN,由 langchain_bq_schema_version 表标签控制,因此差异运行每个模式版本最多一次。从不删除、重命名或重新键入列。使用 auto_schema_upgrade=False 禁用。
子代理归因
对于多代理 LangGraph 部署,agent BigQuery 列会自动从以下回退链派生:
metadata["agent"]— 显式用户提供的值(最高优先级)metadata["langgraph_node"]— 活动的 LangGraph 节点,因此每个子代理的事件都用节点名称标记metadata["checkpoint_ns"]— LangGraph 检查点命名空间handler.graph_name— 顶层INVOCATION_*事件的回退
TheCritic、TheMeteo 等)会产生遥测数据,其中每个事件都归因于发起的子代理,无需用户更改。
事件类型和负载
content 列包含特定于 event_type 的 JSON 对象。
content_parts 列提供了内容的结构化视图,对于图像或卸载的数据特别有用。
内容截断
- 可变内容字段被截断为
max_content_length(在BigQueryLoggerConfig中配置,默认为 500KB)。 - 如果配置了
gcs_bucket_name,大内容会卸载到 GCS 而不是被截断,并且引用存储在content_parts.object_ref中。
content 始终包含 summary 键每个事件行的 content JSON 对象都包含一个 summary 字符串,其中包含负载的人类可读预览(上限为 max_content_length)。
下面的每种事件表中省略了 summary 以保持形状可读,但它始终存在于磁盘上。LLM 交互
这些事件跟踪发送到 LLM 的原始请求和从 LLM 接收的响应。| 事件类型 | 内容 (JSON) 结构 | 属性 (JSON) |
|---|---|---|
LLM_REQUEST | 聊天模型: {"messages": [<转储的消息>]}旧版模型: {"prompt": [<提示>]} | {"tags": ["..."], "model": "...", "llm_config": {"temperature": 0.2, ...}, "tools": ["get_weather", ...]} |
LLM_RESPONSE | {"response": "<生成的文本>"} | {"usage": {"prompt_tokens": 100, "completion_tokens": 25, "total_tokens": 125}, "model_version": "gemini-2.5-flash-001", "usage_metadata": {"cached_content_token_count": 30, ...}, "cache_metadata": {...}} |
LLM_ERROR | {"data": null}(实际的异常文本位于 error_message 列中) | {} |
子代理(LangGraph 节点)和调用生命周期
这些事件来自 LangGraph 的节点和图上下文生命周期。 当未显式设置agent 时,agent 会自动从 metadata['langgraph_node'] 派生,因此事件按子代理标记。
| 事件类型 | 描述 |
|---|---|
AGENT_STARTING / AGENT_COMPLETED / AGENT_ERROR | LangGraph 节点开始 / 结束 / 出错 |
INVOCATION_STARTING / INVOCATION_COMPLETED / INVOCATION_ERROR | 顶层图调用开始 / 结束 / 出错(由 handler.graph_context() 发出) |
工具使用
这些事件跟踪代理执行的工具。工具名称也显示在attributes.tool_name 中,并且(对于自动视图)作为类型化的 tool_name 列。
| 事件类型 | 内容 (JSON) 结构 |
|---|---|
TOOL_STARTING | {"tool": "<名称>", "input": "<输入字符串>"} — 例如 {"tool": "get_weather", "input": "city='Paris'"} |
TOOL_COMPLETED | {"tool": "<名称>", "result": "<输出字符串>"} — 例如 {"tool": "get_weather", "result": "25°C, Sunny"} |
TOOL_ERROR | {"data": null}(实际的异常文本位于 error_message 列中) |
链执行
这些事件针对非图 LangChainRunnable 生命周期触发(图调用和 LangGraph 节点使用上面列出的 INVOCATION_* / AGENT_* 事件)。
| 事件类型 | 内容 (JSON) 结构 |
|---|---|
CHAIN_START | {"data": "<JSON 字符串化的输入>"} |
CHAIN_END | {"data": "<JSON 字符串化的输出>"} |
CHAIN_ERROR | {"data": null}(参见 error_message 列) |
检索器使用
这些事件跟踪检索器的执行。| 事件类型 | 内容 (JSON) 结构 |
|---|---|
RETRIEVER_START | {"data": "<查询字符串>"} — 例如 {"data": "What is the capital of France?"} |
RETRIEVER_END | {"data": "<JSON 字符串化的文档列表>"} |
RETRIEVER_ERROR | {"data": null}(实际的异常文本位于 error_message 列中) |
代理操作
这些事件来自旧版 LangChainAgentExecutor 风格的代理(on_agent_action / on_agent_finish)。data 字段包含操作/完成负载的 JSON 序列化字符串。
| 事件类型 | 内容 (JSON) 结构 |
|---|---|
AGENT_ACTION | {"data": "{\"tool\": \"Calculator\", \"input\": \"2 + 2\"}"} |
AGENT_FINISH | {"data": "{\"output\": \"The answer is 4\"}"} |
其他事件
| 事件类型 | 内容 (JSON) 结构 |
|---|---|
TEXT | {"data": "<文本>"} — 例如 {"data": "Some logging text..."} |
高级分析查询
一旦您的代理开始运行并记录事件,您就可以对agent_events 表执行强大的分析。
1. 重建跟踪(对话轮次)
使用trace_id 将属于单个执行流的所有事件(Chain、LLM、Tool)分组。
2. 分析 LLM 延迟和 Token 使用情况
计算 LLM 调用的平均延迟和总 Token 使用量。3. 使用 BigQuery 远程模型(Gemini)分析多模态内容
如果您将图像卸载到 GCS,可以使用 BigQuery ML 直接分析它们。4. 分析 Span 层次结构和持续时间
使用 span ID 可视化代理操作(LLM 调用、工具使用)的执行流程和性能。5. 查询卸载的内容(获取签名 URL)
6. 高级 SQL 场景
这些高级模式展示了如何使用 BigQuery ML 对数据进行会话化、分析工具使用情况并执行根本原因分析。BigQuery 中的对话分析
Looker Studio 仪表板
您可以使用我们预构建的 Looker Studio 仪表板模板 来可视化代理的性能。 要将此仪表板连接到您自己的 BigQuery 表,请使用以下链接格式,将占位符替换为您特定的项目、数据集和表 ID:LangGraph 集成
BigQueryCallbackHandler 为 LangGraph 代理提供增强支持,包括自动节点检测、图级跟踪和延迟测量。
LangGraph 事件类型
除了标准的 LangChain 事件外,回调处理器还会自动检测和记录 LangGraph 特定事件:| 事件类型 | 描述 |
|---|---|
AGENT_STARTING | 当 LangGraph 节点开始执行时发出 |
AGENT_COMPLETED | 当 LangGraph 节点成功完成时发出 |
AGENT_ERROR | 当 LangGraph 节点失败时发出 |
INVOCATION_STARTING | 当图执行开始时发出(通过上下文管理器) |
INVOCATION_COMPLETED | 当图执行完成时发出 |
INVOCATION_ERROR | 当图执行失败时发出 |
图上下文管理器
使用graph_context() 方法显式标记图执行边界。这可以启用具有准确延迟测量的 INVOCATION_STARTING 和 INVOCATION_COMPLETED 事件:
延迟跟踪
回调处理器自动跟踪所有操作的延迟,并将测量值存储在latency_ms JSON 列中:
事件过滤
使用event_allowlist 和 event_denylist 控制记录哪些事件:
示例和资源
示例代码
以下示例展示了 BigQuery 回调处理器的各种功能:| 示例 | 描述 |
|---|---|
| 基本示例 | 带有 LLM 调用的基本回调用法 |
| LangGraph 代理 | 带有 6 个真实工具的完整 ReAct 代理 |
| 异步示例 | 带有并发查询的异步处理器 |
| 事件过滤 | 允许列表/拒绝列表配置 |
| 示例数据生成器 | 跨多个代理类型生成示例数据 |
分析笔记本
LangGraph 代理分析笔记本 提供了全面的 BigQuery 分析查询,用于:- 实时事件监控
- 工具使用分析
- 延迟分析和趋势
- 错误调试
- 用户参与度指标
- 时间序列可视化
实时监控仪表板
提供了一个 基于 FastAPI 的监控仪表板,用于实时代理监控: 功能:- 通过服务器发送事件(SSE)的实时事件流
- 用于事件分布和延迟趋势的交互式图表
- 带有详细时间线视图的会话跟踪
- 20 多个用于分析查询的 REST API 端点
- 每 5 秒自动刷新
反馈
我们欢迎您对 BigQuery 代理分析的反馈。如果您有任何问题、建议或遇到任何问题,请通过 bqaa-feedback@google.com 联系团队。附加资源
将这些文档连接到 Claude、VSCode 等,通过 MCP 获取实时答案。

