Ingest your own data
This guide explains how to use fused.ingest
to geopartition and load vector tables into an S3 bucket so they can quickly be queried with Fused.
Ingest vector data
To run an ingestion job on vector data we need:
- Input data - This is could be CSV files, a
.zip
containing shapely files or any other sort of non partitioned data - A cloud directory - This is where we will save our ingested data and later access it through UDFs
We've built our own ingestion pipeline at Fused that partitions data based on dataset size & location. Our ingestion process:
- Uploads the
input
- Creates geo-partitions of the input data
This is defined with fused.ingest()
:
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract",
)
Ingestion jobs often take more than a few seconds and require a lot of RAM (we open the whole dataset & re-partition it), which makes this a large run so we're going to use run_remote()
so our ingestion job can take as long as needed.
To start an ingestion job, call run_remote
on the job object returned by fused.ingest
.
job_id = job.run_remote()
Refer to the dedicated documentation page for fused.ingest()
for more details on all the parameters
Ingested tables can easily be read with the Fused utility function table_to_tile
, which spatially filters the dataset and reads only the chunks within a specified polygon.
@fused.udf
def udf(bbox, table="s3://fused-asset/infra/building_msft_us/"):
utils = fused.load("https://github.com/fusedio/udfs/tree/eda5aec/public/common/").utils
return utils.table_to_tile(bbox, table)
The following sections cover common ingestion implementations. It's recommended to run ingestion jobs from a Python Notebook or this web app.
Ingest a table from a URL
Ingests a table from a URL and writes it to an S3 bucket specified with fd://
.
import fused
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract",
).run_remote()
If you encounter the message
HTTPError: {'detail': 'Quota limit: Number of running instances'}
, please contact Fused to increase the number of workers in your account.
Ingest multiple files
import fused
job_id = fused.ingest(
input=["s3://my-bucket/file1.parquet", "s3://my-bucket/file2.parquet"],
output=f"fd://census/dc_tract",
).run_remote()
To ingest multiple local files, first upload them to S3 with fused.upload then specify an array of their S3 paths as the input to ingest.
Row-based ingestion
Standard ingestion is row-based, where the user sets the maximum number of rows per chunk and file.
Each resulting table has one or more files and each file has one or more chunks, which are spatially partitioned. By default, ingestion does a best effort to create the number of files specified by target_num_files
(default 20
), and the number of rows per file and chunk can be adjusted to meet this number.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
explode_geometries=True,
partitioning_method="rows",
partitioning_maximum_per_file=100,
partitioning_maximum_per_chunk=10,
).run_remote()
Area-based ingestion
Fused also supports area-based ingestion, where the number of rows in each partition is determined by the sum of their area.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_area",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
).run_remote()
Geometry subdivision
Subdivide geometries during ingestion. This keeps operations efficient when geometries have many vertices or span large areas.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_geometry",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
subdivide_start=0.001,
subdivide_stop=0.0001,
subdivide_method="area",
).run_remote()
Ingest GeoDataFrame
Ingest a GeoDataFrame directly.
job_id = fused.ingest(
input=gdf,
output="s3://sample-bucket/file.parquet",
).run_remote()
Ingest non-geospatial
Ingest a table that doesn't have a spatial component.
job_id = fused.ingest_nongeospatial(
input=df,
output="s3://sample-bucket/file.parquet",
).run_remote()
Troubleshooting
As with other Fused batch jobs, ingestion jobs require server allocation for the account that initiates them. If you encounter the following error message, please contact the Fused team to request an increase.
Error: `Quota limit: Number of running instances`