BigQuery Callback Handler
CommunityPythonPreview
Google BigQuery is a serverless and cost-effective enterprise data warehouse that works across clouds and scales with your data.
The BigQueryCallbackHandler allows you to log events from LangChain to Google BigQuery. This is useful for monitoring, auditing, and analyzing the performance of your LLM applications.
Preview releaseThe BigQuery Callback Handler is in Preview. APIs and functionality are subject to change.
For more information, see the
launch stage descriptions.
BigQuery Storage Write APIThis feature uses the BigQuery Storage Write API, which is a paid service.
For information on costs, see the
BigQuery documentation.
Installation
You need to install langchain-google-community with bigquery extra dependencies. For this example, you will also need langchain-google-genai and langgraph.
pip install "langchain-google-community[bigquery]" langchain langchain-google-genai langgraph
Prerequisites
- Google Cloud Project with the BigQuery API enabled.
- BigQuery Dataset: Create a dataset to store logging tables before using the callback handler. The callback handler automatically creates the necessary events table within the dataset if the table does not exist.
- Google Cloud Storage Bucket (Optional): If you plan to log multimodal content (images, audio, etc.), creating a GCS bucket is recommended for offloading large files.
- Authentication:
- Local: Run
gcloud auth application-default login.
- Cloud: Ensure your service account has the required permissions.
IAM Permissions
For the callback handler to work properly, the principal (e.g., service account, user account) under which the application is running needs these Google Cloud roles:
roles/bigquery.jobUser at Project Level to run BigQuery queries.
roles/bigquery.dataEditor at Table Level to write log/event data.
- If using GCS offloading:
roles/storage.objectCreator and roles/storage.objectViewer on the target bucket.
Use with LangChain Agent
To use the AsyncBigQueryCallbackHandler, you need to instantiate it with your Google Cloud project ID, dataset ID, and table ID.
If you want to log session_id, user_id, and agent fields to BigQuery, you must pass them via the metadata dictionary in the config object when invoking the chain or agent. If you do not have an agent field, we recommend creating and using different tables for different agents.
import asyncio
import os
from datetime import datetime
from langchain.tools import tool
from langchain_google_community.callbacks.bigquery_callback import AsyncBigQueryCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import create_react_agent
# 1. Define some tools for the agent
@tool
def get_current_time():
"""Returns the current local time."""
return datetime.now().strftime("%H:%M:%S")
@tool
def get_weather(city: str):
"""Returns the current weather for a specific city."""
return f"The weather in {city} is sunny and 25°C."
async def run_example_async_agent(bq_project_id: str):
"""Runs an asynchronous Agent with BigQuery logging."""
# Setup BigQuery logging details
dataset_id = "your_dataset_id"
table_id = "your_table_id"
print(f"--- Starting Async Agent Example ---")
print(f"Logging to: {bq_project_id}.{dataset_id}.{table_id}")
# Initialize the async callback handler
bigquery_handler = AsyncBigQueryCallbackHandler(
project_id=bq_project_id, dataset_id=dataset_id, table_id=table_id
)
try:
# Setup LLM and Tools
llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro")
tools = [get_current_time, get_weather]
# Create the Agent (LangGraph ReAct implementation)
agent_executor = create_react_agent(llm, tools)
query = "What time is it now, and what is the weather like in New York?"
print(f"User Query: {query}")
# Run the agent asynchronously
# We use astream to process chunks, but you can also use ainvoke
async for chunk in agent_executor.astream(
{"messages": [("user", query)]},
config={
"callbacks": [bigquery_handler], # Pass the handler here
"metadata": {
"session_id": "agent-session-001",
"user_id": "user-123",
"agent": "weather-agent",
},
},
):
# Print agent thoughts/actions as they happen
if "agent" in chunk:
print(f"🤖 Agent: {chunk['agent']['messages'][0].content}")
elif "tools" in chunk:
print(f"🛠️ Tool Output: {chunk['tools']['messages'][0].content}")
print("✅ Agent finished. Logs are being written in the background...")
finally:
# Ensure resources are cleaned up
await bigquery_handler.close()
if __name__ == "__main__":
# Ensure GOOGLE_API_KEY is set in your environment
if "GOOGLE_API_KEY" not in os.environ:
raise ValueError("Please set the GOOGLE_API_KEY environment variable.")
project_id = "your-project-id"
asyncio.run(run_example_async_agent(project_id))
Configuration options
You can customize the callback handler using BigQueryLoggerConfig.
To disable the handler from logging data to the BigQuery table, set this parameter to False.
clustering_fields
List[str]
default:"[\"event_type\", \"agent\", \"user_id\"]"
The fields used to cluster the BigQuery table when it is automatically created.
The name of the GCS bucket to offload large content (images, blobs, large text) to. If not provided, large content may be truncated or replaced with placeholders.
The BigQuery connection ID (e.g., us.my-connection) to use as the authorizer for ObjectRef columns. Required for using ObjectRef with BigQuery ML.
(500 KB) The maximum length (in characters) of text content to store inline in BigQuery before offloading to GCS (if configured) or truncating.
The number of events to batch before writing to BigQuery.
The maximum time (in seconds) to wait before flushing a partial batch.
Seconds to wait for logs to flush during shutdown.
A list of event types to log. If None, all events are logged except those in event_denylist.
A list of event types to skip logging.
Whether to log detailed content parts (including GCS references).
table_id
str
default:"agent_events_v2"
The default table ID to use if not explicitly provided to the callback handler constructor.
retry_config
RetryConfig
default:"RetryConfig()"
Configuration for retry logic (max retries, delay, multiplier) when writing to BigQuery fails.
The maximum number of events to hold in the internal buffer queue before dropping new events.
The following code sample shows how to define a configuration for the BigQuery callback handler, including a custom content formatter:
import json
import re
from typing import Any
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig
)
def redact_dollar_amounts(event_content: Any) -> str:
"""
Custom formatter to redact dollar amounts (e.g., $600, $12.50)
and ensure JSON output if the input is a dict.
"""
text_content = ""
# If the content is a dictionary (e.g., a list of messages), convert it to a JSON string first.
if isinstance(event_content, dict):
text_content = json.dumps(event_content)
else:
text_content = str(event_content)
# Regex to find dollar amounts: $ followed by digits, optionally with commas or decimals.
# Examples: $600, $1,200.50, $0.99
redacted_content = re.sub(r'\$\d+(?:,\d{3})*(?:\.\d+)?', 'xxx', text_content)
return redacted_content
# 1. Configure BigQueryLoggerConfig
config = BigQueryLoggerConfig(
enabled=True,
event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # Only log these specific events
shutdown_timeout=10.0, # Wait up to 10s for logs to flush on exit
max_content_length=500, # Truncate content to 500 characters
content_formatter=redact_dollar_amounts, # Set the custom formatting function
)
# 2. Initialize the Callback Handler
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
config=config
)
Schema and production setup
The plugin automatically creates the table if it does not exist. However, for production, we recommend creating the table manually using the following DDL, which utilizes the JSON type for flexibility and REPEATED RECORDs for multimodal content.
Recommended DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`
(
timestamp TIMESTAMP NOT NULL OPTIONS(description="The UTC timestamp when the event occurred."),
event_type STRING OPTIONS(description="The category of the event."),
agent STRING OPTIONS(description="The name of the agent."),
session_id STRING OPTIONS(description="A unique identifier for the conversation session."),
invocation_id STRING OPTIONS(description="A unique identifier for a single turn."),
user_id STRING OPTIONS(description="The identifier of the end-user."),
trace_id STRING OPTIONS(description="OpenTelemetry trace ID."),
span_id STRING OPTIONS(description="OpenTelemetry span ID."),
parent_span_id STRING OPTIONS(description="OpenTelemetry parent span ID."),
content JSON OPTIONS(description="The primary payload of the event."),
content_parts ARRAY<STRUCT<
mime_type STRING,
uri STRING,
object_ref STRUCT<
uri STRING,
version STRING,
authorizer STRING,
details JSON
>,
text STRING,
part_index INT64,
part_attributes STRING,
storage_mode STRING
>> OPTIONS(description="For multi-modal events, contains a list of content parts."),
attributes JSON OPTIONS(description="Arbitrary key-value pairs."),
latency_ms JSON OPTIONS(description="Latency measurements."),
status STRING OPTIONS(description="The outcome of the event."),
error_message STRING OPTIONS(description="Detailed error message."),
is_truncated BOOLEAN OPTIONS(description="Flag indicating if content was truncated.")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;
Event types and payloads
The content column contains a JSON object specific to the event_type.
The content_parts column provides a structured view of the content, especially useful for images or offloaded data.
Content Truncation
- Variable content fields are truncated to
max_content_length (configured in BigQueryLoggerConfig, default 500KB).
- If
gcs_bucket_name is configured, large content is offloaded to GCS instead of being truncated, and a reference is stored in content_parts.object_ref.
LLM interactions
These events track the raw requests sent to and responses received from the LLM.
| Event Type | Content (JSON) Structure | Attributes (JSON) | Example Content (Simplified) |
|---|
LLM_REQUEST |
{
"messages": [
{"content": "..."}
]
}
| {
"tags": ["tag1"],
"model": "gemini-1.5-pro"
}
| {
"messages": [
{"content": "What is the weather?"}
]
}
|
LLM_RESPONSE | (Stored as JSON string) | {
"usage": {
"total_tokens": 20
}
}
| |
LLM_ERROR | null | | null |
These events track the execution of tools by the agent.
| Event Type | Content (JSON) Structure |
|---|
TOOL_STARTING |
“city=‘Paris‘“ |
TOOL_COMPLETED |
“25°C, Sunny” |
TOOL_ERROR | ”Error: Connection timeout” |
Chain Execution
These events track the start and end of high-level chains/graphs.
| Event Type | Content (JSON) Structure |
|---|
CHAIN_START | {
“messages”: […]
} |
CHAIN_END | {
“output”: ”…”
} |
CHAIN_ERROR | null (See error_message column) |
Retriever usage
These events track the execution of retrievers.
| Event Type | Content (JSON) Structure |
|---|
RETRIEVER_START |
“What is the capital of France?” |
RETRIEVER_END |
[
{
“page_content”: “Paris is the capital…”,
“metadata”: {“source”: “wiki”}
}
] |
RETRIEVER_ERROR | null (See error_message column) |
Agent Actions
These events track specific actions taken by the agent.
| Event Type | Content (JSON) Structure |
|---|
AGENT_ACTION | {
“tool”: “Calculator”,
“input”: “2 + 2”
} |
AGENT_FINISH | {
“output”: “The answer is 4”
} |
Other Events
| Event Type | Content (JSON) Structure |
|---|
TEXT |
“Some logging text…” |
Advanced analysis queries
Once your agent is running and logging events, you can perform power analysis on the agent_events_v2 table.
1. Reconstruct a Trace (Conversation Turn)
Use the trace_id to group all events (Chain, LLM, Tool) belonging to a single execution flow.
SELECT
timestamp,
event_type,
span_id,
parent_span_id,
-- Extract summary or specific content based on event type
COALESCE(
JSON_VALUE(content, '$.messages[0].content'),
JSON_VALUE(content, '$.summary'),
JSON_VALUE(content)
) AS summary,
JSON_VALUE(latency_ms, '$.total_ms') AS duration_ms
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
-- Replace with a specific trace_id from your logs
trace_id = '019bb986-a0db-7da1-802d-2725795ab340'
ORDER BY
timestamp ASC;
2. Analyze LLM Latency & Token Usage
Calculate the average latency and total token usage for your LLM calls.
SELECT
JSON_VALUE(attributes, '$.model') AS model,
COUNT(*) AS total_calls,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms,
SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokens
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
event_type = 'LLM_RESPONSE'
GROUP BY
1;
3. Analyze Multimodal Content with BigQuery Remote Model (Gemini)
If you are offloading images to GCS, you can use BigQuery ML to analyze them directly.
SELECT
logs.session_id,
-- Get a signed URL for the image (optional, for viewing)
STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
-- Analyze the image using a remote model (e.g., gemini-1.5-pro)
AI.GENERATE(
('Describe this image briefly. What company logo?', parts.object_ref)
) AS generated_result
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2` logs,
UNNEST(logs.content_parts) AS parts
WHERE
parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;
4. Analyze Span Hierarchy & Duration
Visualize the execution flow and performance of your agent’s operations (LLM calls, Tool usage) using span IDs.
SELECT
span_id,
parent_span_id,
event_type,
timestamp,
-- Extract duration from latency_ms for completed operations
CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms,
-- Identify the specific tool or operation
COALESCE(
JSON_VALUE(content, '$.tool'),
'LLM_CALL'
) as operation
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE trace_id = 'your-trace-id'
AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
ORDER BY timestamp ASC;
5. Querying Offloaded Content (Get Signed URLs)
SELECT
timestamp,
event_type,
part.mime_type,
part.storage_mode,
part.object_ref.uri AS gcs_uri,
-- Generate a signed URL to read the content directly (requires connection_id configuration)
STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;
6. Advanced SQL Scenarios
These advanced patterns demonstrate how to sessionize data, analyze tool usage, and perform root cause analysis using BigQuery ML.
-- 1. Sessionize Conversation History (Create View)
-- Consolidates all events into a single row per session with a formatted history.
CREATE OR REPLACE VIEW `your-project.your-dataset.agent_sessions` AS
SELECT
session_id,
user_id,
MIN(timestamp) AS session_start,
MAX(timestamp) AS session_end,
ARRAY_AGG(
STRUCT(timestamp, event_type, TO_JSON_STRING(content) as content, error_message)
ORDER BY timestamp ASC
) AS events,
STRING_AGG(
CASE
WHEN event_type = 'USER_MESSAGE_RECEIVED' THEN CONCAT('User: ', JSON_VALUE(content, '$.input'))
WHEN event_type = 'LLM_RESPONSE' THEN CONCAT('Agent: ', JSON_VALUE(content, '$.text'))
WHEN event_type = 'TOOL_STARTING' THEN CONCAT('SYS: Calling ', JSON_VALUE(content, '$.tool_name'))
WHEN event_type = 'TOOL_COMPLETED' THEN CONCAT('SYS: Result from ', JSON_VALUE(content, '$.tool_name'))
WHEN event_type = 'TOOL_ERROR' THEN CONCAT('SYS: ERROR in ', JSON_VALUE(content, '$.tool_name'))
ELSE NULL
END,
'\n' ORDER BY timestamp ASC
) AS full_conversation
FROM
`your-project.your-dataset.agent_events_v2`
GROUP BY
session_id, user_id;
-- 2. Tool Usage Analysis
-- Extract tool names and count execution status
SELECT
JSON_VALUE(content, '$.tool_name') AS tool_name,
event_type,
COUNT(*) as count
FROM `your-project.your-dataset.agent_events_v2`
WHERE event_type IN ('TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR')
GROUP BY 1, 2
ORDER BY tool_name, event_type;
-- 3. Granular Cost & Token Estimation
-- Estimate tokens based on content character length (approx 4 chars/token)
SELECT
session_id,
COUNT(*) as interaction_count,
SUM(LENGTH(TO_JSON_STRING(content))) / 4 AS estimated_tokens,
-- Example cost: $0.0001 per 1k tokens
ROUND((SUM(LENGTH(TO_JSON_STRING(content))) / 4) / 1000 * 0.0001, 6) AS estimated_cost_usd
FROM `your-project.your-dataset.agent_events_v2`
GROUP BY session_id
ORDER BY estimated_cost_usd DESC
LIMIT 5;
-- 4. AI-Powered Root Cause Analysis (Requires BigQuery ML)
-- Use Gemini to analyze failed sessions
SELECT
session_id,
AI.GENERATE(
('Analyze this conversation and explain the failure root cause. Log: ', full_conversation),
connection_id => 'your-project.us.bqml_connection',
endpoint => 'gemini-2.5-flash'
).result AS root_cause_explanation
FROM `your-project.your-dataset.agent_sessions`
WHERE error_message IS NOT NULL
LIMIT 5;
Conversational Analytics in BigQuery
Conversational AnalyticsYou can also use BigQuery Conversational Analytics to analyze your agent logs using natural language.
Just ask questions like:
- “Show me the error rate over time”
- “What are the most common tool calls?”
- “Identify sessions with high token usage”
Looker Studio Dashboard
You can visualize your agent’s performance using our pre-built Looker Studio Dashboard template.
To connect this dashboard to your own BigQuery table, use the following link format, replacing the placeholders with your specific project, dataset, and table IDs:
https://lookerstudio.google.com/reporting/create?c.reportId=f1c5b513-3095-44f8-90a2-54953d41b125&ds.ds3.connector=bigQuery&ds.ds3.type=TABLE&ds.ds3.projectId=<your-project-id>&ds.ds3.datasetId=<your-dataset-id>&ds.ds3.tableId=<your-table-id>
Additional resources