Skip to main content

BigQuery

Fused integrates with Google BigQuery with the Python bigquery library.

1. Authenticate with a Google Service Account

Create a UDF to set your Google Service Account credentials in your Fused runtime disk in a file in the /mnt/cache directory.

@fused.udf
def udf():

import os, json

# Google Key as JSON
data = {
'type': 'service_account',
'project_id': 'MYPROJECT',
'private_key_id': '1234',
'private_key': '-----BEGIN PRIVATE KEY-----...\n-----END PRIVATE KEY-----\n',
'client_email': 'fused-account@MYPROJECT.iam.gserviceaccount.com',
'client_id': '1234567',
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token',
'auth_provider_x509_cert_url': 'https://www.googleapis.com/oauth2/v1/certs',
'client_x509_cert_url': 'https://www.googleapis.com/robot/v1/metadata/x509/fused-pg%40MYPROJECT.iam.gserviceaccount.com',
'universe_domain': 'googleapis.com'
}

# Define the target path for the new GEE credentials file
key_path = '/mnt/cache/bq_creds.json'

# Write the loaded JSON data to the new file
with open(key_path, 'w') as file:
json.dump(data, file)

2. Load data from BigQuery

Create a UDF to perform a query on a BigQuery dataset and return the results as a DataFrame or GeoDataFrame. Authenticate by passing the key file path to service_account.Credentials.

@fused.udf
def udf(bbox: fused.types.TileGDF=None, geography_column=None):
from google.cloud import bigquery
from google.oauth2 import service_account

# This UDF will only work on runtime with mounted EFS
key_path = "/mnt/cache/bq_creds.json"

# Authenticate BigQuery
credentials = service_account.Credentials.from_service_account_file(
key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Create a BigQuery client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# Structure spatial query
query = f"""
SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
LIMIT 10
"""

if geography_column:
return client.query(query).to_geodataframe(geography_column=geography_column)
else:
return client.query(query).to_dataframe()