Skip to main content

BigQuery Integration

Fused integrates with Google BigQuery via the Python bigquery library.

Option 1: Credentials file

1. Authenticate with a Google Service Account

Create a UDF to write your Google Service Account credentials to the Fused runtime disk at /mnt/cache.

@fused.udf
def udf():

import os, json

# Google Key as JSON
data = {
'type': 'service_account',
'project_id': 'MYPROJECT',
'private_key_id': '1234',
'private_key': '-----BEGIN PRIVATE KEY-----...\n-----END PRIVATE KEY-----\n',
'client_email': 'fused-account@MYPROJECT.iam.gserviceaccount.com',
'client_id': '1234567',
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token',
'auth_provider_x509_cert_url': 'https://www.googleapis.com/oauth2/v1/certs',
'client_x509_cert_url': 'https://www.googleapis.com/robot/v1/metadata/x509/fused-pg%40MYPROJECT.iam.gserviceaccount.com',
'universe_domain': 'googleapis.com'
}

key_path = '/mnt/cache/bq_creds.json'

with open(key_path, 'w') as file:
json.dump(data, file)

2. Load data from BigQuery

Create a UDF that queries a BigQuery dataset and returns the results as a DataFrame or GeoDataFrame. Authenticate by passing the key file path to service_account.Credentials.

@fused.udf
def udf(bounds: fused.types.TileGDF=None, geography_column=None):
from google.cloud import bigquery
from google.oauth2 import service_account

key_path = "/mnt/cache/bq_creds.json"

credentials = service_account.Credentials.from_service_account_file(
key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

client = bigquery.Client(credentials=credentials, project=credentials.project_id)

query = f"""
SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
LIMIT 10
"""

if geography_column:
return client.query(query).to_geodataframe(geography_column=geography_column)
else:
return client.query(query).to_dataframe()

Option 2: Secrets

If you already have a gcs_secret in Fused secrets, you can use it to access your GCP secrets. Otherwise create new secrets in the Fused secrets manager with:

  • GS_ACCESS_KEY_ID
  • GS_SECRET_ACCESS_KEY
@fused.udf
def udf(repo_name: str = "athasdev/athas"):
import os
# Not required if your account already has the `gcs_secret` in Fused secrets
os.environ['GS_ACCESS_KEY_ID'] = fused.secrets["GS_ACCESS_KEY_ID"]
os.environ['GS_SECRET_ACCESS_KEY'] = fused.secrets["GS_SECRET_ACCESS_KEY"]

from google.cloud import bigquery
client = bigquery.Client()

query = f""" SELECT * FROM `githubarchive.day.202508*` LIMIT 10"""

query_job = client.query(query)
return query_job.to_dataframe()