Skip to main content

Ingest your own data

This guide explains how to use fused.ingest to geopartition and load vector tables into an S3 bucket so they can quickly be queried with Fused.

Ingest data

To start an ingestion job, call run_remote on the job object returned by fused.ingest.

job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract",
)
job_id = job.run_remote()

The ingestion job first uploads the table specified by the input parameter using fused.upload. It then geopartitions the table, creating chunks for spatial partitions. Finally, it writes a parquet table to the S3 path specified by the output parameter. Along with the table records, the resulting parquet file contains two metadata files: main and fused.

View the ingestion job status and logs in fused.io/workbench/jobs. Monitor and manage the job with the following methods:

# View the job status
job_id.status

# Follow the job logs
job_id.tail_logs()

# Get the job logs
job_id.print_logs()

# Cancel the job
job_id.cancel()

Ingested tables can easily be read with the Fused utility function table_to_tile, which spatially filters the dataset and reads only the chunks within a specified polygon.

@fused.udf
def udf(bbox, table="s3://fused-asset/infra/building_msft_us/"):
utils = fused.load("https://github.com/fusedio/udfs/tree/eda5aec/public/common/").utils
return utils.table_to_tile(bbox, table)

The following sections cover common ingestion implementations. It's recommended to run ingestion jobs from a Python Notebook or this web app.

Ingest a table from a URL

Ingests a table from a URL and writes it to an S3 bucket specified with fd://.

import fused

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract",
).run_remote()
info

If you encounter the message HTTPError: {'detail': 'Quota limit: Number of running instances'}, please contact Fused to increase the number of workers in your account.

Ingest multiple files

import fused

job_id = fused.ingest(
input=["s3://my-bucket/file1.parquet", "s3://my-bucket/file2.parquet"],
output=f"fd://census/dc_tract",
).run_remote()
warning

To ingest multiple local files, first upload them to S3 with fused.upload then specify an array of their S3 paths as the input to ingest.

Row-based ingestion

Standard ingestion is row-based, where the user sets the maximum number of rows per chunk and file.

Each resulting table has one or more files and each file has one or more chunks, which are spatially partitioned. By default, ingestion does a best effort to create the number of files specified by target_num_files (default 20), and the number of rows per file and chunk can be adjusted to meet this number.

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
explode_geometries=True,
partitioning_method="rows",
partitioning_maximum_per_file=100,
partitioning_maximum_per_chunk=10,
).run_remote()

Area-based ingestion

Fused also supports area-based ingestion, where the number of rows in each partition is determined by the sum of their area.

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_area",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
).run_remote()

Geometry subdivision

Subdivide geometries during ingestion. This keeps operations efficient when geometries have many vertices or span large areas.

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_geometry",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
subdivide_start=0.001,
subdivide_stop=0.0001,
subdivide_method="area",
).run_remote()

Ingest GeoDataFrame

Ingest a GeoDataFrame directly.

job_id = fused.ingest(
input=gdf,
output="s3://sample-bucket/file.parquet",
).run_remote()

Ingest non-geospatial

Ingest a table that doesn't have a spatial component.

job_id = fused.ingest_nongeospatial(
input=df,
output="s3://sample-bucket/file.parquet",
).run_remote()

Troubleshooting

As with other Fused batch jobs, ingestion jobs require server allocation for the account that initiates them. If you encounter the following error message, please contact the Fused team to request an increase.

Error: `Quota limit: Number of running instances`