Skip to main content

Geospatial Data Ingestion

This page gives you tools to make your data fast to read so your UDFs are more responsive.

Why Ingestion?

The whole purpose of Fused is to speed up data science pipelines. To make this happen we need data to be responsive, regardless of size. The ideal solution is to have all data sitting in RAM right next to compute, but in real-world applications:

  • Datasets (especially geospatial data) can be in the Tb or Pb range which rarely fit in storage, let alone RAM
  • Compute needs to be scaled up and down depending on workloads

One solution is Cloud Optimized formats: Data lives in the cloud but also leverages file formats that are fast to access. Just putting a .zip file that needs to be uncompressed at every read on an S3 bucket is still very slow. Ingested data should be:

  • On the cloud so dataset size doesn't matter (AWS S3, Google Cloud Storage, etc.)
  • Partitioned (broken down into smaller pieces that are fast to retrieve so we can load only sections of the dataset we need)

This makes it fast to read for any UDF, so developing in Workbench UDF Builder & running UDFs is a lot faster & responsive!

When is ingestion needed?

You don't always need to ingest your file into a cloud, geo-partitioned format:

  • Small files (< 100Mb) that are fast to open (already in .parquet) that you only read once

Example of data you should ingest: 1Gb .zip of shapefile

  • .zip means you need to unzip your file each time you open it. This slows down working with the data every time.
  • shapefile contains multiple files, it isn't the fastest to read

Example of data you don't need to ingest: 50Mb .parquet

  • Even if the data isn't geo-partitioned, loading this data should be fast enough

Ingest Geospatial Table Data

To run an ingestion job on vector data we need:

  1. Input data - This could be CSV files, a .zip containing shapefiles or any other non-partitioned data
  2. A cloud directory - Where we save ingested data and later access it through UDFs

Our ingestion process:

  1. Uploads the input
  2. Creates geo-partitions of the input data

This is defined with fused.ingest():

# Run this locally - not in Workbench
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract/",
)

Ingestion jobs often take more than a few seconds and require a lot of RAM, making this a large run. Use run_batch() so your ingestion job can take as long as needed.

# Run this locally - not in Workbench
job_id = job.run_batch()
tip

Refer to the dedicated documentation page for fused.ingest() for more details on all parameters

Reading Ingested Data

Ingested tables can easily be read with the Fused utility function table_to_tile, which spatially filters the dataset and reads only the chunks within a specified polygon.

@fused.udf
def udf(
bounds: fused.types.Bounds = None,
table="s3://fused-asset/infra/building_msft_us/"
):
common = fused.load("https://github.com/fusedio/udfs/tree/fbf5682/public/common/")
return common.table_to_tile(bounds, table)

Common Ingestion Patterns

Ingest a table from a URL

Ingests a table from a URL and writes it to an S3 bucket specified with fd://.

import fused

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract/",
).run_batch()

Ingest multiple files

import fused

job_id = fused.ingest(
input=["s3://my-bucket/file1.parquet", "s3://my-bucket/file2.parquet"],
output=f"fd://census/dc_tract",
).run_batch()
warning

To ingest multiple local files, first upload them to S3 with fused.api.upload then specify an array of their S3 paths as the input to ingest.

Row-based ingestion

Standard ingestion is row-based, where the user sets the maximum number of rows per chunk and file.

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
explode_geometries=True,
partitioning_method="rows",
partitioning_maximum_per_file=100,
partitioning_maximum_per_chunk=10,
).run_batch()

Area-based ingestion

Fused also supports area-based ingestion, where the number of rows in each partition is determined by the sum of their area.

job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_area",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
).run_batch()

Ingest GeoDataFrame

Ingest a GeoDataFrame directly.

job_id = fused.ingest(
input=gdf,
output="s3://sample-bucket/file.parquet",
).run_batch()

Ingest Shapefile

We recommend you .zip your shapefile and ingest it as a single file:

job_id = fused.ingest(
input="s3://my_bucket/my_shapefile.zip",
output="s3://sample-bucket/file.parquet",
).run_batch()

Ingest to your own S3 bucket

fused.ingest supports writing output to any S3 bucket as long as you have appropriate permissions:

  1. Open the S3 Policy tab in the profile page of Workbench and enter your S3 bucket name. This returns a JSON object.
  2. Copy this IAM policy and paste it in your AWS S3 Policy tab.
  3. Write the output to your S3 bucket using the output parameter:
# Assuming s3://my-bucket/ is your own managed S3 bucket
job_id = fused.ingest(
input="s3://my-bucket/file.parquet",
output="s3://my-bucket/ingested/",
).run_batch()

Troubleshooting

If you encounter the following error message, please contact the Fused team to request an increase:

Error: `Quota limit: Number of running instances`