Skip to main content

Databases

Connect Fused to external databases like Snowflake and BigQuery.

Snowflake

1. Allowlist Fused IPs in Snowflake

If your Snowflake account has a network policy, you must allowlist the Fused IPs before connections will work:

  • 44.230.70.117/32
  • 52.42.164.181/32

To add them, run the following SQL in Snowflake (or ask your Snowflake admin):

CREATE NETWORK RULE fused_ips
TYPE = IPV4
VALUE_LIST = ('44.230.70.117', '52.42.164.181')
MODE = INGRESS;

ALTER NETWORK POLICY your_network_policy
ADD ALLOWED_IP_LIST = ('44.230.70.117', '52.42.164.181');

2. Set up a Snowflake user for Fused

You can use an existing Snowflake user or create a dedicated one. Either way, make sure the user has a role and warehouse assigned.

To create a new user:

CREATE USER fused_user
PASSWORD = 'a_strong_password'
DEFAULT_ROLE = your_role
DEFAULT_WAREHOUSE = your_warehouse;

GRANT ROLE your_role TO USER fused_user;

If using an existing user, verify they have the necessary role and warehouse:

ALTER USER existing_user SET DEFAULT_ROLE = your_role DEFAULT_WAREHOUSE = your_warehouse;
GRANT ROLE your_role TO USER existing_user;
Finding your account identifier

Your account identifier looks like orgname-accountname or xy12345.us-east-1. You can find it in Snowflake > Admin > Accounts, or by running SELECT CURRENT_ACCOUNT().

3. Store credentials in Fused secrets

Add SNOWFLAKE_USER and SNOWFLAKE_PASSWORD in the Fused secrets management UI. Use the user’s password or a programmatic access token (PAT) for SNOWFLAKE_PASSWORD—the Python connector accepts either.

4. Connect from a UDF

@fused.udf
def udf():
import snowflake.connector

query = 'SELECT CURRENT_VERSION()'

conn = snowflake.connector.connect(
user=fused.secrets['SNOWFLAKE_USER'],
password=fused.secrets['SNOWFLAKE_PASSWORD'],
account='your_account_identifier',
warehouse='your_warehouse',
database='your_database',
schema='your_schema'
)

cursor = conn.cursor()
cursor.execute(query)
df = cursor.fetch_pandas_all()
cursor.close()
conn.close()

return df

Read more about Snowflake's Python connector.


BigQuery - Option 1: Credentials file

Fused integrates with Google BigQuery with the Python bigquery library.

1. Authenticate with a Google Service Account

Create a UDF to set your Google Service Account credentials in your Fused runtime disk in a file in the /mnt/cache directory.

@fused.udf
def udf():

import os, json

# Google Key as JSON
data = {
'type': 'service_account',
'project_id': 'MYPROJECT',
'private_key_id': '1234',
'private_key': '-----BEGIN PRIVATE KEY-----...\n-----END PRIVATE KEY-----\n',
'client_email': 'fused-account@MYPROJECT.iam.gserviceaccount.com',
'client_id': '1234567',
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token',
'auth_provider_x509_cert_url': 'https://www.googleapis.com/oauth2/v1/certs',
'client_x509_cert_url': 'https://www.googleapis.com/robot/v1/metadata/x509/fused-pg%40MYPROJECT.iam.gserviceaccount.com',
'universe_domain': 'googleapis.com'
}

# Define the target path for the new GEE credentials file
key_path = '/mnt/cache/bq_creds.json'

# Write the loaded JSON data to the new file
with open(key_path, 'w') as file:
json.dump(data, file)

2. Load data from BigQuery

Create a UDF to perform a query on a BigQuery dataset and return the results as a DataFrame or GeoDataFrame. Authenticate by passing the key file path to service_account.Credentials.

@fused.udf
def udf(bounds: fused.types.TileGDF=None, geography_column=None):
from google.cloud import bigquery
from google.oauth2 import service_account

# This UDF will only work on runtime with mounted EFS
key_path = "/mnt/cache/bq_creds.json"

# Authenticate BigQuery
credentials = service_account.Credentials.from_service_account_file(
key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Create a BigQuery client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# Structure spatial query
query = f"""
SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
LIMIT 10
"""

if geography_column:
return client.query(query).to_geodataframe(geography_column=geography_column)
else:
return client.query(query).to_dataframe()

Big Query - Option 2: Secrets

If you already have a gcs_secret in Fused secrets, you can use it to access your GCP secrets. Otherwise you can simply create new secrets in the Fused secrets manager with:

  • GS_ACCESS_KEY_ID
  • GS_SECRET_ACCESS_KEY

You can for example use this to access the Github Activity Data

@fused.udf
def udf(repo_name: str = "athasdev/athas"):
import os
# This is not required if your account already has the `gcs_secret` in Fused secrets
os.environ['GS_ACCESS_KEY_ID'] = fused.secrets["GS_ACCESS_KEY_ID"]
os.environ['GS_SECRET_ACCESS_KEY'] = fused.secrets["GS_SECRET_ACCESS_KEY"]

from google.cloud import bigquery
# Initialize BigQuery client
client = bigquery.Client()

# Get total stars
total_query = f"""
SELECT
repo.name as repository,
COUNT(*) as total_stars
FROM `githubarchive.day.202508*`
WHERE type = 'WatchEvent'
-- AND repo.name = '{repo_name}'
GROUP BY repository
"""

total_query = f""" SELECT * FROM `githubarchive.day.202508*` limit 10"""

# Run the query
query_job = client.query(total_query)

# Convert to pandas DataFrame
total_df = query_job.to_dataframe()

return total_df