Geospatial Data Ingestion
This page gives you tools to make your data fast to read so your UDFs are more responsive.
Why Ingestion?
The whole purpose of Fused is to speed up data science pipelines. To make this happen we need data to be responsive, regardless of size. The ideal solution is to have all data sitting in RAM right next to compute, but in real-world applications:
- Datasets (especially geospatial data) can be in the Tb or Pb range which rarely fit in storage, let alone RAM
- Compute needs to be scaled up and down depending on workloads
One solution is Cloud Optimized formats: Data lives in the cloud but also leverages file formats that are fast to access. Just putting a .zip file that needs to be uncompressed at every read on an S3 bucket is still very slow. Ingested data should be:
- On the cloud so dataset size doesn't matter (AWS S3, Google Cloud Storage, etc.)
- Partitioned (broken down into smaller pieces that are fast to retrieve so we can load only sections of the dataset we need)
This makes it fast to read for any UDF, so developing in Workbench UDF Builder & running UDFs is a lot faster & responsive!
When is ingestion needed?
You don't always need to ingest your file into a cloud, geo-partitioned format:
- Small files (< 100Mb) that are fast to open (already in
.parquet) that you only read once
Example of data you should ingest: 1Gb .zip of shapefile
.zipmeans you need to unzip your file each time you open it. This slows down working with the data every time.- shapefile contains multiple files, it isn't the fastest to read
Example of data you don't need to ingest: 50Mb .parquet
- Even if the data isn't geo-partitioned, loading this data should be fast enough
Ingest Geospatial Table Data
To run an ingestion job on vector data we need:
- Input data - This could be CSV files, a
.zipcontaining shapefiles or any other non-partitioned data - A cloud directory - Where we save ingested data and later access it through UDFs
Our ingestion process:
- Uploads the
input - Creates geo-partitions of the input data
This is defined with fused.ingest():
# Run this locally - not in Workbench
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract/",
)
Ingestion jobs often take more than a few seconds and require a lot of RAM, making this a large run. Use run_batch() so your ingestion job can take as long as needed.
# Run this locally - not in Workbench
job_id = job.run_batch()
Refer to the dedicated documentation page for fused.ingest() for more details on all parameters
Reading Ingested Data
Ingested tables can easily be read with the Fused utility function table_to_tile, which spatially filters the dataset and reads only the chunks within a specified polygon.
@fused.udf
def udf(
bounds: fused.types.Bounds = None,
table="s3://fused-asset/infra/building_msft_us/"
):
common = fused.load("https://github.com/fusedio/udfs/tree/fbf5682/public/common/")
return common.table_to_tile(bounds, table)
Common Ingestion Patterns
Ingest a table from a URL
Ingests a table from a URL and writes it to an S3 bucket specified with fd://.
import fused
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract/",
).run_batch()
Ingest multiple files
import fused
job_id = fused.ingest(
input=["s3://my-bucket/file1.parquet", "s3://my-bucket/file2.parquet"],
output=f"fd://census/dc_tract",
).run_batch()
To ingest multiple local files, first upload them to S3 with fused.api.upload then specify an array of their S3 paths as the input to ingest.
Row-based ingestion
Standard ingestion is row-based, where the user sets the maximum number of rows per chunk and file.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
explode_geometries=True,
partitioning_method="rows",
partitioning_maximum_per_file=100,
partitioning_maximum_per_chunk=10,
).run_batch()
Area-based ingestion
Fused also supports area-based ingestion, where the number of rows in each partition is determined by the sum of their area.
job_id = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/TRACT/tl_rd22_11_tract.zip",
output=f"fd://census/dc_tract_area",
explode_geometries=True,
partitioning_method="area",
partitioning_maximum_per_file=None,
partitioning_maximum_per_chunk=None,
).run_batch()
Ingest GeoDataFrame
Ingest a GeoDataFrame directly.
job_id = fused.ingest(
input=gdf,
output="s3://sample-bucket/file.parquet",
).run_batch()
Ingest Shapefile
We recommend you .zip your shapefile and ingest it as a single file:
job_id = fused.ingest(
input="s3://my_bucket/my_shapefile.zip",
output="s3://sample-bucket/file.parquet",
).run_batch()
Ingest to your own S3 bucket
fused.ingest supports writing output to any S3 bucket as long as you have appropriate permissions:
- Open the
S3 Policytab in the profile page of Workbench and enter your S3 bucket name. This returns a JSON object. - Copy this IAM policy and paste it in your AWS S3 Policy tab.
- Write the output to your S3 bucket using the
outputparameter:
# Assuming s3://my-bucket/ is your own managed S3 bucket
job_id = fused.ingest(
input="s3://my-bucket/file.parquet",
output="s3://my-bucket/ingested/",
).run_batch()
Troubleshooting
If you encounter the following error message, please contact the Fused team to request an increase:
Error: `Quota limit: Number of running instances`