Databases
Connect Fused to external databases like Snowflake and BigQuery.
Snowflake
1. Allowlist Fused IPs in Snowflake
If your Snowflake account has a network policy, you must allowlist the Fused IPs before connections will work:
44.230.70.117/3252.42.164.181/32
To add them, run the following SQL in Snowflake (or ask your Snowflake admin):
CREATE NETWORK RULE fused_ips
TYPE = IPV4
VALUE_LIST = ('44.230.70.117', '52.42.164.181')
MODE = INGRESS;
ALTER NETWORK POLICY your_network_policy
ADD ALLOWED_IP_LIST = ('44.230.70.117', '52.42.164.181');
2. Set up a Snowflake user for Fused
You can use an existing Snowflake user or create a dedicated one. Either way, make sure the user has a role and warehouse assigned.
To create a new user:
CREATE USER fused_user
PASSWORD = 'a_strong_password'
DEFAULT_ROLE = your_role
DEFAULT_WAREHOUSE = your_warehouse;
GRANT ROLE your_role TO USER fused_user;
If using an existing user, verify they have the necessary role and warehouse:
ALTER USER existing_user SET DEFAULT_ROLE = your_role DEFAULT_WAREHOUSE = your_warehouse;
GRANT ROLE your_role TO USER existing_user;
Your account identifier looks like orgname-accountname or xy12345.us-east-1. You can find it in Snowflake > Admin > Accounts, or by running SELECT CURRENT_ACCOUNT().
3. Store credentials in Fused secrets
Add SNOWFLAKE_USER and SNOWFLAKE_PASSWORD in the Fused secrets management UI. Use the user’s password or a programmatic access token (PAT) for SNOWFLAKE_PASSWORD—the Python connector accepts either.
4. Connect from a UDF
@fused.udf
def udf():
import snowflake.connector
query = 'SELECT CURRENT_VERSION()'
conn = snowflake.connector.connect(
user=fused.secrets['SNOWFLAKE_USER'],
password=fused.secrets['SNOWFLAKE_PASSWORD'],
account='your_account_identifier',
warehouse='your_warehouse',
database='your_database',
schema='your_schema'
)
cursor = conn.cursor()
cursor.execute(query)
df = cursor.fetch_pandas_all()
cursor.close()
conn.close()
return df
Read more about Snowflake's Python connector.
BigQuery - Option 1: Credentials file
Fused integrates with Google BigQuery with the Python bigquery library.
1. Authenticate with a Google Service Account
Create a UDF to set your Google Service Account credentials in your Fused runtime disk in a file in the /mnt/cache directory.
@fused.udf
def udf():
import os, json
# Google Key as JSON
data = {
'type': 'service_account',
'project_id': 'MYPROJECT',
'private_key_id': '1234',
'private_key': '-----BEGIN PRIVATE KEY-----...\n-----END PRIVATE KEY-----\n',
'client_email': 'fused-account@MYPROJECT.iam.gserviceaccount.com',
'client_id': '1234567',
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token',
'auth_provider_x509_cert_url': 'https://www.googleapis.com/oauth2/v1/certs',
'client_x509_cert_url': 'https://www.googleapis.com/robot/v1/metadata/x509/fused-pg%40MYPROJECT.iam.gserviceaccount.com',
'universe_domain': 'googleapis.com'
}
# Define the target path for the new GEE credentials file
key_path = '/mnt/cache/bq_creds.json'
# Write the loaded JSON data to the new file
with open(key_path, 'w') as file:
json.dump(data, file)
2. Load data from BigQuery
Create a UDF to perform a query on a BigQuery dataset and return the results as a DataFrame or GeoDataFrame. Authenticate by passing the key file path to service_account.Credentials.
@fused.udf
def udf(bounds: fused.types.TileGDF=None, geography_column=None):
from google.cloud import bigquery
from google.oauth2 import service_account
# This UDF will only work on runtime with mounted EFS
key_path = "/mnt/cache/bq_creds.json"
# Authenticate BigQuery
credentials = service_account.Credentials.from_service_account_file(
key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Create a BigQuery client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
# Structure spatial query
query = f"""
SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
LIMIT 10
"""
if geography_column:
return client.query(query).to_geodataframe(geography_column=geography_column)
else:
return client.query(query).to_dataframe()
Big Query - Option 2: Secrets
If you already have a gcs_secret in Fused secrets, you can use it to access your GCP secrets. Otherwise you can simply create new secrets in the Fused secrets manager with:
GS_ACCESS_KEY_IDGS_SECRET_ACCESS_KEY
You can for example use this to access the Github Activity Data
@fused.udf
def udf(repo_name: str = "athasdev/athas"):
import os
# This is not required if your account already has the `gcs_secret` in Fused secrets
os.environ['GS_ACCESS_KEY_ID'] = fused.secrets["GS_ACCESS_KEY_ID"]
os.environ['GS_SECRET_ACCESS_KEY'] = fused.secrets["GS_SECRET_ACCESS_KEY"]
from google.cloud import bigquery
# Initialize BigQuery client
client = bigquery.Client()
# Get total stars
total_query = f"""
SELECT
repo.name as repository,
COUNT(*) as total_stars
FROM `githubarchive.day.202508*`
WHERE type = 'WatchEvent'
-- AND repo.name = '{repo_name}'
GROUP BY repository
"""
total_query = f""" SELECT * FROM `githubarchive.day.202508*` limit 10"""
# Run the query
query_job = client.query(total_query)
# Convert to pandas DataFrame
total_df = query_job.to_dataframe()
return total_df