GCP Data Storage for Python Analytics: google-cloud-bigquery

· combray's blog


Summary #

For your data analysis system requiring 100GB-10TB scale with streaming inserts and heavy pandas integration, BigQuery with the google-cloud-bigquery Python client library is the definitive choice. BigQuery is Google Cloud's serverless, highly-scalable data warehouse designed specifically for analytical workloads (OLAP), offering columnar storage that excels at aggregations and joins across massive datasets[1].

The google-cloud-bigquery package has exceptional adoption metrics: 775+ GitHub stars, ~100 million monthly PyPI downloads, and is classified as a "key ecosystem project"[2]. The package receives frequent updates with healthy maintenance—version 3.38.0 was released in September 2025, demonstrating active development[3]. Google officially recommends this library over the lighter-weight pandas-gbq wrapper for production applications requiring more control and better performance[4].

For streaming data, BigQuery offers two APIs: the legacy tabledata.insertAll and the newer Storage Write API. The Storage Write API costs half the price of streaming inserts, supports exactly-once delivery semantics, uses efficient gRPC/protobuf instead of REST/JSON, and offers 2 TB/month free ingestion[5]. Since your project already uses google-cloud-pubsub, you have a natural integration path where Pub/Sub messages can trigger writes to BigQuery.

Philosophy & Mental Model #

BigQuery operates on fundamentally different principles than transactional databases like PostgreSQL:

Columnar Storage: Data is stored by column rather than row. This means queries that select specific columns scan far less data, and aggregations (SUM, AVG, COUNT) are extremely fast. The mental model is: "I'm querying properties across millions of entities" rather than "I'm fetching individual records."

Immutable/Append-Only: BigQuery is optimized for append operations. Updates and deletes exist but trigger full partition rewrites. Design your schema assuming data is written once and queried many times.

Separation of Compute and Storage: You pay for storage (pennies per GB/month) and query compute (based on bytes scanned). Well-designed schemas with proper partitioning and clustering dramatically reduce query costs.

Streaming Buffer: Recently streamed data sits in a temporary buffer before being consolidated into columnar storage. Data is queryable immediately but this buffer has different performance characteristics[6].

SQL-First, DataFrame-Second: While pandas integration exists, BigQuery's power comes from pushing computation to the server via SQL. Think of pandas as the final-mile transformation, not the primary compute engine.

Setup #

Installation #

1# Install the full BigQuery stack with pandas and Storage API support
2pip install 'google-cloud-bigquery[bqstorage,pandas]' google-cloud-bigquery-storage pyarrow
3
4# For Poetry (matching your project):
5poetry add 'google-cloud-bigquery[bqstorage,pandas]' google-cloud-bigquery-storage pyarrow

Authentication #

BigQuery uses Application Default Credentials (ADC). Your project already uses google-cloud-pubsub, so authentication is likely configured:

 1# Option 1: Use environment variable (recommended for production)
 2# Set GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
 3
 4# Option 2: gcloud CLI for local development
 5# Run: gcloud auth application-default login
 6
 7# Option 3: Explicit credentials in code
 8from google.cloud import bigquery
 9from google.oauth2 import service_account
10
11credentials = service_account.Credentials.from_service_account_file(
12    "path/to/service-account.json",
13    scopes=["https://www.googleapis.com/auth/bigquery"],
14)
15client = bigquery.Client(credentials=credentials, project="your-project-id")

Project Configuration #

Add to your quantificore/config.py:

1import os
2
3class Config:
4    # Existing config...
5
6    # BigQuery settings
7    GCP_PROJECT_ID: str = os.getenv("GCP_PROJECT_ID", "your-project-id")
8    BIGQUERY_DATASET: str = os.getenv("BIGQUERY_DATASET", "quantificore_analytics")
9    BIGQUERY_LOCATION: str = os.getenv("BIGQUERY_LOCATION", "US")  # or "EU", etc.

Create Dataset (one-time setup) #

 1from google.cloud import bigquery
 2
 3def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str, location: str):
 4    """Create BigQuery dataset if it doesn't exist."""
 5    dataset_ref = f"{client.project}.{dataset_id}"
 6    dataset = bigquery.Dataset(dataset_ref)
 7    dataset.location = location
 8
 9    try:
10        client.create_dataset(dataset, exists_ok=True)
11        print(f"Dataset {dataset_ref} created or already exists")
12    except Exception as e:
13        print(f"Error creating dataset: {e}")
14        raise

Core Usage Patterns #

Pattern 1: Initialize Client as Singleton #

Create a module for BigQuery client management similar to your existing database.py:

 1# quantificore/bigquery_client.py
 2"""BigQuery client configuration."""
 3
 4import logging
 5from google.cloud import bigquery
 6from quantificore.config import Config
 7
 8logger = logging.getLogger(__name__)
 9
10_client = None
11
12
13def get_bigquery_client() -> bigquery.Client:
14    """Get or create BigQuery client singleton."""
15    global _client
16    if _client is None:
17        _client = bigquery.Client(
18            project=Config.GCP_PROJECT_ID,
19            location=Config.BIGQUERY_LOCATION,
20        )
21        logger.info(f"BigQuery client initialized for project {Config.GCP_PROJECT_ID}")
22    return _client

Pattern 2: Query to pandas DataFrame #

The most common analytical workflow—run SQL, get DataFrame:

 1from google.cloud import bigquery
 2import pandas as pd
 3
 4def query_to_dataframe(sql: str, client: bigquery.Client = None) -> pd.DataFrame:
 5    """Execute SQL query and return results as pandas DataFrame.
 6
 7    Uses BigQuery Storage API for faster downloads of large results.
 8    """
 9    if client is None:
10        client = get_bigquery_client()
11
12    # Create job config for query
13    job_config = bigquery.QueryJobConfig(
14        # Use standard SQL (not legacy)
15        use_legacy_sql=False,
16    )
17
18    # Execute query and download results using Storage API (15-31x faster)
19    df = client.query(sql, job_config=job_config).to_dataframe(
20        # Use BigQuery Storage API for large results
21        create_bqstorage_client=True,
22    )
23
24    return df
25
26
27# Usage example
28df = query_to_dataframe("""
29    SELECT
30        ticker,
31        filing_type,
32        COUNT(*) as filing_count,
33        MAX(filed_at) as latest_filing
34    FROM `quantificore_analytics.sec_filings`
35    WHERE filed_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
36    GROUP BY ticker, filing_type
37    ORDER BY filing_count DESC
38    LIMIT 1000
39""")

Pattern 3: Load DataFrame to BigQuery (Batch) #

For bulk loading pandas DataFrames—use when you have a complete dataset to upload:

 1from google.cloud import bigquery
 2import pandas as pd
 3
 4def load_dataframe_to_table(
 5    df: pd.DataFrame,
 6    table_id: str,  # Format: "project.dataset.table"
 7    write_disposition: str = "WRITE_APPEND",
 8    client: bigquery.Client = None,
 9) -> bigquery.LoadJob:
10    """Load pandas DataFrame to BigQuery table.
11
12    Args:
13        df: DataFrame to load
14        table_id: Fully qualified table ID (project.dataset.table)
15        write_disposition: WRITE_APPEND, WRITE_TRUNCATE, or WRITE_EMPTY
16        client: BigQuery client instance
17
18    Returns:
19        Completed LoadJob
20    """
21    if client is None:
22        client = get_bigquery_client()
23
24    job_config = bigquery.LoadJobConfig(
25        write_disposition=write_disposition,
26        # Automatically detect schema from DataFrame
27        autodetect=True,
28    )
29
30    job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
31    job.result()  # Wait for completion
32
33    return job
34
35
36# Usage
37df = pd.DataFrame({
38    "ticker": ["AAPL", "GOOGL", "MSFT"],
39    "price": [150.25, 2800.50, 300.75],
40    "timestamp": pd.to_datetime(["2025-01-01", "2025-01-01", "2025-01-01"]),
41})
42
43table_id = f"{Config.GCP_PROJECT_ID}.{Config.BIGQUERY_DATASET}.stock_prices"
44load_dataframe_to_table(df, table_id)

Pattern 4: Streaming Inserts with Default Stream (Storage Write API) #

For real-time data that needs to be queryable within seconds. The default stream is recommended for most streaming use cases[7]:

 1from google.cloud import bigquery
 2from google.cloud.bigquery import SchemaField
 3import json
 4
 5def stream_rows_to_table(
 6    rows: list[dict],
 7    table_id: str,
 8    client: bigquery.Client = None,
 9) -> list:
10    """Stream rows to BigQuery using insertAll API.
11
12    Note: For higher throughput, consider the Storage Write API directly.
13    This method is simpler but slightly more expensive.
14
15    Args:
16        rows: List of dictionaries matching table schema
17        table_id: Fully qualified table ID
18        client: BigQuery client
19
20    Returns:
21        List of errors (empty if successful)
22    """
23    if client is None:
24        client = get_bigquery_client()
25
26    errors = client.insert_rows_json(table_id, rows)
27
28    if errors:
29        raise RuntimeError(f"Streaming insert errors: {errors}")
30
31    return errors
32
33
34# Usage for SEC filings from your existing pipeline
35rows = [
36    {
37        "accession_number": "0001193125-25-000001",
38        "ticker": "AAPL",
39        "company_name": "Apple Inc.",
40        "form_type": "10-K",
41        "filed_at": "2025-01-15T10:30:00Z",
42        "ingested_at": "2025-01-15T10:30:05Z",
43    },
44]
45
46stream_rows_to_table(
47    rows,
48    f"{Config.GCP_PROJECT_ID}.{Config.BIGQUERY_DATASET}.sec_filings"
49)

Pattern 5: Define Schema with Partitioning and Clustering #

For analytical tables, always partition by time and cluster by frequently-filtered columns:

 1from google.cloud import bigquery
 2
 3def create_sec_filings_table(client: bigquery.Client = None):
 4    """Create SEC filings table with optimal schema for analytics."""
 5    if client is None:
 6        client = get_bigquery_client()
 7
 8    table_id = f"{Config.GCP_PROJECT_ID}.{Config.BIGQUERY_DATASET}.sec_filings"
 9
10    schema = [
11        bigquery.SchemaField("accession_number", "STRING", mode="REQUIRED"),
12        bigquery.SchemaField("ticker", "STRING", mode="REQUIRED"),
13        bigquery.SchemaField("company_name", "STRING"),
14        bigquery.SchemaField("form_type", "STRING", mode="REQUIRED"),
15        bigquery.SchemaField("filed_at", "TIMESTAMP", mode="REQUIRED"),
16        bigquery.SchemaField("ingested_at", "TIMESTAMP", mode="REQUIRED"),
17        bigquery.SchemaField("filing_url", "STRING"),
18        bigquery.SchemaField("raw_content", "STRING"),  # Or use JSON type
19    ]
20
21    table = bigquery.Table(table_id, schema=schema)
22
23    # Partition by filing date for time-range queries
24    table.time_partitioning = bigquery.TimePartitioning(
25        type_=bigquery.TimePartitioningType.DAY,
26        field="filed_at",
27    )
28
29    # Cluster by ticker and form_type for faster filtered queries
30    table.clustering_fields = ["ticker", "form_type"]
31
32    table = client.create_table(table, exists_ok=True)
33    return table

Anti-Patterns & Pitfalls #

Don't: Use Legacy SQL Dialect #

1# BAD: pandas-gbq defaults to legacy SQL
2import pandas as pd
3df = pd.read_gbq("SELECT * FROM [project:dataset.table]")  # Legacy syntax

Why it's wrong: Legacy SQL has different syntax, fewer features, and is deprecated. The bracket notation [project:dataset.table] and other legacy patterns cause confusion and incompatibility.

Instead: Always Use Standard SQL #

1# GOOD: Explicitly use standard SQL with google-cloud-bigquery
2from google.cloud import bigquery
3
4client = bigquery.Client()
5df = client.query(
6    "SELECT * FROM `project.dataset.table`",  # Backtick notation
7    job_config=bigquery.QueryJobConfig(use_legacy_sql=False)
8).to_dataframe()

Don't: Stream One Row at a Time #

1# BAD: Individual inserts are slow and expensive
2for row in rows:
3    client.insert_rows_json(table_id, [row])  # DON'T DO THIS

Why it's wrong: Each insert call has overhead. Streaming is billed with 1KB minimum per row, so small individual inserts waste money and quota[8].

Instead: Batch Streaming Inserts #

1# GOOD: Batch rows together
2BATCH_SIZE = 500
3
4for i in range(0, len(rows), BATCH_SIZE):
5    batch = rows[i:i + BATCH_SIZE]
6    client.insert_rows_json(table_id, batch)

Don't: Query Without Partition Filters #

1# BAD: Full table scan
2df = client.query("""
3    SELECT * FROM `project.dataset.sec_filings`
4    WHERE ticker = 'AAPL'
5""").to_dataframe()

Why it's wrong: Without a partition filter, BigQuery scans all partitions. On a 1TB table, you pay for scanning 1TB even if you only need 1GB of results.

Instead: Always Filter on Partition Column #

1# GOOD: Partition pruning reduces scanned data
2df = client.query("""
3    SELECT * FROM `project.dataset.sec_filings`
4    WHERE filed_at >= TIMESTAMP('2025-01-01')  -- Partition filter
5      AND ticker = 'AAPL'
6""").to_dataframe()

Don't: Use SELECT * in Production #

1# BAD: Selects all columns, scans more data
2df = client.query("SELECT * FROM `project.dataset.large_table`").to_dataframe()

Why it's wrong: BigQuery charges by bytes scanned. Selecting all columns scans all columnar data, even columns you don't need.

Instead: Select Only Needed Columns #

1# GOOD: Only scan required columns
2df = client.query("""
3    SELECT ticker, form_type, filed_at
4    FROM `project.dataset.sec_filings`
5    WHERE filed_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
6""").to_dataframe()

Don't: Download Huge Results to Local pandas #

1# BAD: Downloading 100GB to local machine
2df = client.query("SELECT * FROM `project.dataset.huge_table`").to_dataframe()
3df_filtered = df[df['ticker'] == 'AAPL']  # Local filtering after download

Why it's wrong: You're paying to download data that you then filter locally. Network transfer is slow and you may run out of local memory.

Instead: Push Computation to BigQuery #

 1# GOOD: Filter in BigQuery, download only results
 2df = client.query("""
 3    SELECT *
 4    FROM `project.dataset.huge_table`
 5    WHERE ticker = 'AAPL'
 6""").to_dataframe()
 7
 8# Or use BigQuery DataFrames for large-scale pandas-like operations
 9import bigframes.pandas as bpd
10bf = bpd.read_gbq("project.dataset.huge_table")
11bf_filtered = bf[bf['ticker'] == 'AAPL']
12local_df = bf_filtered.to_pandas()  # Download only when needed

Caveats #

References #

[1] BigQuery Documentation - Overview - Google's official documentation for BigQuery data warehouse capabilities and architecture

[2] google-cloud-bigquery on Snyk Advisor - Package health analysis showing 14.7M weekly downloads and healthy maintenance status

[3] google-cloud-bigquery on PyPI - Official package page showing version 3.38.0 and ~100M monthly downloads

[4] Choose a Python library | BigQuery - Google's comparison of google-cloud-bigquery vs pandas-gbq, recommending the former for production use

[5] Introduction to the BigQuery Storage Write API - Documentation for the modern streaming API with 50% cost reduction and exactly-once semantics

[6] Life of a BigQuery Streaming Insert - Detailed explanation of streaming buffer behavior and data availability

[7] BigQuery Storage Write API best practices - Official guidance recommending default stream for most streaming scenarios

[8] BigQuery Pricing - Streaming - Pricing details showing 1KB minimum per row for streaming inserts

[9] BigQuery DataFrames (bigframes) - pandas-compatible API that pushes computation to BigQuery for terabyte-scale analysis

[10] Xebia - BigQuery Storage Write API Hands-On Guide - Practical implementation guide for the Storage Write API with Python

last updated: