from langchain_google_community.callbacks.bigquery_callback import ( BigQueryCallbackHandler, BigQueryLoggerConfig,)# 1. Configure BigQueryLoggerConfigconfig = BigQueryLoggerConfig( enabled=True, event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # Only log these specific events shutdown_timeout=10.0, # Wait up to 10s for logs to flush on exit max_content_length=500, # Truncate content to 500 characters)# 2. Initialize the Callback Handlerhandler = BigQueryCallbackHandler( project_id="your-project-id", dataset_id="your_dataset", table_id="your_table", config=config,)
该插件会在表不存在时自动创建表。但对于生产环境,我们建议使用以下 DDL 手动创建表,该表使用 JSON 类型以保证灵活性,并使用 REPEATED RECORD 支持多模态内容。推荐 DDL:
Copy
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`( timestamp TIMESTAMP NOT NULL OPTIONS(description="The UTC timestamp when the event occurred."), event_type STRING OPTIONS(description="The category of the event."), agent STRING OPTIONS(description="The name of the agent."), session_id STRING OPTIONS(description="A unique identifier for the conversation session."), invocation_id STRING OPTIONS(description="A unique identifier for a single turn."), user_id STRING OPTIONS(description="The identifier of the end-user."), trace_id STRING OPTIONS(description="OpenTelemetry trace ID."), span_id STRING OPTIONS(description="OpenTelemetry span ID."), parent_span_id STRING OPTIONS(description="OpenTelemetry parent span ID."), content JSON OPTIONS(description="The primary payload of the event."), content_parts ARRAY<STRUCT< mime_type STRING, uri STRING, object_ref STRUCT< uri STRING, version STRING, authorizer STRING, details JSON >, text STRING, part_index INT64, part_attributes STRING, storage_mode STRING >> OPTIONS(description="For multi-modal events, contains a list of content parts."), attributes JSON OPTIONS(description="Arbitrary key-value pairs."), latency_ms JSON OPTIONS(description="Latency measurements."), status STRING OPTIONS(description="The outcome of the event."), error_message STRING OPTIONS(description="Detailed error message."), is_truncated BOOLEAN OPTIONS(description="Flag indicating if content was truncated."))PARTITION BY DATE(timestamp)CLUSTER BY event_type, agent, user_id;
SELECT timestamp, event_type, span_id, parent_span_id, -- Extract summary or specific content based on event type COALESCE( JSON_VALUE(content, '$.messages[0].content'), JSON_VALUE(content, '$.summary'), JSON_VALUE(content) ) AS summary, JSON_VALUE(latency_ms, '$.total_ms') AS duration_msFROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`WHERE -- Replace with a specific trace_id from your logs trace_id = '019bb986-a0db-7da1-802d-2725795ab340'ORDER BY timestamp ASC;
SELECT JSON_VALUE(attributes, '$.model') AS model, COUNT(*) AS total_calls, AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms, SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokensFROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`WHERE event_type = 'LLM_RESPONSE'GROUP BY 1;
SELECT logs.session_id, -- Get a signed URL for the image (optional, for viewing) STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url, -- Analyze the image using a remote model (e.g., gemini-2.5-flash) AI.GENERATE( ('Describe this image briefly. What company logo?', parts.object_ref) ) AS generated_resultFROM `your-gcp-project-id.adk_agent_logs.agent_events_v2` logs, UNNEST(logs.content_parts) AS partsWHERE parts.mime_type LIKE 'image/%'ORDER BY logs.timestamp DESCLIMIT 1;
SELECT span_id, parent_span_id, event_type, timestamp, -- Extract duration from latency_ms for completed operations CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms, -- Identify the specific tool or operation COALESCE( JSON_VALUE(content, '$.tool'), 'LLM_CALL' ) as operationFROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`WHERE trace_id = 'your-trace-id' AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')ORDER BY timestamp ASC;
SELECT timestamp, event_type, part.mime_type, part.storage_mode, part.object_ref.uri AS gcs_uri, -- Generate a signed URL to read the content directly (requires connection_id configuration) STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_urlFROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`,UNNEST(content_parts) AS partWHERE part.storage_mode = 'GCS_REFERENCE'ORDER BY timestamp DESCLIMIT 10;
以下高级模式展示了如何使用 BigQuery ML 对数据进行会话化、分析工具使用情况以及执行根因分析。
Copy
-- 1. Sessionize Conversation History (Create View)-- Consolidates all events into a single row per session with a formatted history.CREATE OR REPLACE VIEW `your-project.your-dataset.agent_sessions` ASSELECT session_id, user_id, MIN(timestamp) AS session_start, MAX(timestamp) AS session_end, ARRAY_AGG( STRUCT(timestamp, event_type, TO_JSON_STRING(content) as content, error_message) ORDER BY timestamp ASC ) AS events, STRING_AGG( CASE WHEN event_type = 'USER_MESSAGE_RECEIVED' THEN CONCAT('User: ', JSON_VALUE(content, '$.input')) WHEN event_type = 'LLM_RESPONSE' THEN CONCAT('Agent: ', JSON_VALUE(content, '$.text')) WHEN event_type = 'TOOL_STARTING' THEN CONCAT('SYS: Calling ', JSON_VALUE(content, '$.tool_name')) WHEN event_type = 'TOOL_COMPLETED' THEN CONCAT('SYS: Result from ', JSON_VALUE(content, '$.tool_name')) WHEN event_type = 'TOOL_ERROR' THEN CONCAT('SYS: ERROR in ', JSON_VALUE(content, '$.tool_name')) ELSE NULL END, '\n' ORDER BY timestamp ASC ) AS full_conversationFROM `your-project.your-dataset.agent_events_v2`GROUP BY session_id, user_id;-- 2. Tool Usage Analysis-- Extract tool names and count execution statusSELECT JSON_VALUE(content, '$.tool_name') AS tool_name, event_type, COUNT(*) as countFROM `your-project.your-dataset.agent_events_v2`WHERE event_type IN ('TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR')GROUP BY 1, 2ORDER BY tool_name, event_type;-- 3. Granular Cost & Token Estimation-- Estimate tokens based on content character length (approx 4 chars/token)SELECT session_id, COUNT(*) as interaction_count, SUM(LENGTH(TO_JSON_STRING(content))) / 4 AS estimated_tokens, -- Example cost: $0.0001 per 1k tokens ROUND((SUM(LENGTH(TO_JSON_STRING(content))) / 4) / 1000 * 0.0001, 6) AS estimated_cost_usdFROM `your-project.your-dataset.agent_events_v2`GROUP BY session_idORDER BY estimated_cost_usd DESCLIMIT 5;-- 4. AI-Powered Root Cause Analysis (Requires BigQuery ML)-- Use Gemini to analyze failed sessionsSELECT session_id, AI.GENERATE( ('Analyze this conversation and explain the failure root cause. Log: ', full_conversation), connection_id => 'your-project.us.bqml_connection', endpoint => 'gemini-2.5-flash' ).result AS root_cause_explanationFROM `your-project.your-dataset.agent_sessions`WHERE error_message IS NOT NULLLIMIT 5;
-- Query latency by event typeSELECT event_type, agent, COUNT(*) as count, ROUND(AVG(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64)), 2) as avg_latency_ms, ROUND(APPROX_QUANTILES(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64), 100)[OFFSET(95)], 2) as p95_latency_msFROM `your-project.your-dataset.agent_events_v2`WHERE DATE(timestamp) = CURRENT_DATE() AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED', 'GRAPH_END')GROUP BY event_type, agentORDER BY avg_latency_ms DESC;
# Run the dashboardcd libs/community/examples/bigquery_callback/webapppip install -r requirements.txtuvicorn main:app --port 8001# Open http://localhost:8001