Ingest your own data
This guide explains how to use fused.ingest
to geopartition and load vector tables into an S3 bucket so they can quickly be queried with Fused.
Ingest data
To start an ingestion job, call run_remote
on the job object returned by fused.ingest
.
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract",
)
job_id = job.run_remote()
The ingestion job first uploads the table specified by the input
parameter using fused.upload. It then geopartitions the table, creating chunks for spatial partitions. Finally, it writes a parquet table to the S3 path specified by the output
parameter. Along with the table records, the resulting parquet file contains two metadata files: main
and fused
.
View the ingestion job status and logs in fused.io/profile#jobs. Monitor and manage the job with the following methods:
# View the job status
job_id.status
# Follow the job logs
job_id.tail_logs()
# Get the job logs
job_id.print_logs()
# Cancel the job
job_id.cancel()
Ingested tables can easily be read with the Fused utility function table_to_tile
, which spatially filters the dataset and reads only the chunks within a specified polygon.
@fused.udf
def udf(bbox, table="s3://fused-asset/infra/building_msft_us/"):
utils = fused.load("https://github.com/fusedio/udfs/tree/eda5aec/public/common/").utils
return utils.table_to_tile(bbox, table)
The following sections cover common ingestion implementations. It's recommended to run ingestion jobs from a Python Notebook or this web app.
Ingest a table from a URL
Ingests a table from a URL and writes it to an S3 bucket specified with fd://
.
import fused
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract",
).run_remote()
If you encounter the message
HTTPError: {'detail': 'Quota limit: Number of running instances'}
, please contact Fused to increase the number of workers in your account.
Ingest multiple files
import fused
job_id = fused.ingest(
input=["s3://my-bucket/file1.parquet", "s3://my-bucket/file2.parquet"],
output=f"fd://census/dc_tract",
).run_remote()
To ingest multiple local files, first upload them to S3 with fused.upload then specify an array of their S3 paths as the input to ingest.
Row-based ingestion
Standard ingestion is row-based, where the user sets the maximum number of rows per chunk and file.
Each resulting table has one or more files and each file has one or more chunks, which are spatially partitioned. By default, ingestion does a best effort to create the number of files specified by target_num_files
(default 20
), and the number of rows per file and chunk can be adjusted to meet this number.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
explode_geometries=True,
partitioning_method="rows",
partitioning_maximum_per_file=100,
partitioning_maximum_per_chunk=10,
).run_remote()
Area-based ingestion
Fused also supports area-based ingestion, where the number of rows in each partition is determined by the sum of their area.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_area",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
).run_remote()
Geometry subdivision
Subdivide geometries during ingestion. This keeps operations efficient when geometries have many vertices or span large areas.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_geometry",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
subdivide_start=0.001,
subdivide_stop=0.0001,
subdivide_method="area",
).run_remote()
Ingest GeoDataFrame
Ingest a GeoDataFrame directly.
job_id = fused.ingest(
input=gdf,
output="s3://sample-bucket/file.parquet",
).run_remote()
Ingest non-geospatial
Ingest a table that doesn't have a spatial component.
job_id = fused.ingest_nongeospatial(
input=df,
output="s3://sample-bucket/file.parquet",
).run_remote()