Skip to main content

ingest

ingest(
input: str | Path | Sequence[str | Path] | gpd.GeoDataFrame,
output: str | None = None,
*,
output_metadata: str | None = None,
schema: Schema | None = None,
file_suffix: str | None = None,
load_columns: Sequence[str] | None = None,
remove_cols: Sequence[str] | None = None,
explode_geometries: bool = False,
drop_out_of_bounds: bool | None = None,
partitioning_method: Literal["area", "length", "coords", "rows"] = "rows",
partitioning_maximum_per_file: int | float | None = None,
partitioning_maximum_per_chunk: int | float | None = None,
partitioning_max_width_ratio: int | float = 2,
partitioning_max_height_ratio: int | float = 2,
partitioning_force_utm: Literal["file", "chunk", None] = "chunk",
partitioning_split_method: Literal["mean", "median"] = "mean",
subdivide_method: Literal["area", None] = None,
subdivide_start: float | None = None,
subdivide_stop: float | None = None,
split_identical_centroids: bool = True,
target_num_chunks: int = 500,
lonlat_cols: tuple[str, str] | None = None,
partitioning_schema_input: str | pd.DataFrame | None = None,
gdal_config: GDALOpenConfig | dict[str, Any] | None = None,
overwrite: bool = False,
as_udf: bool = False
) -> GeospatialPartitionJobStepConfig

Ingest a dataset into the Fused partitioned format.

Parameters

  • input (str | Path | Sequence[str | Path] | gpd.GeoDataFrame) – A GeoPandas GeoDataFrame or a path to file or files on S3 to ingest. Files may be Parquet or another geo data format.
  • output (str | None) – Location on S3 to write the main table to.
  • output_metadata (str | None) – Location on S3 to write the fused table to.
  • schema (Schema | None) – Schema of the data to be ingested. Optional, will be inferred if not provided.
  • file_suffix (str | None) – Filter which files are used for ingestion (e.g. .geojson).
  • load_columns (Sequence[str] | None) – Read only this set of columns. Defaults to all columns.
  • remove_cols (Sequence[str] | None) – Columns to drop when ingesting.
  • explode_geometries (bool) – Whether to unpack multipart geometries to single geometries. Defaults to False.
  • drop_out_of_bounds (bool | None) – Whether to drop geometries outside of the expected WGS84 bounds. Defaults to True.
  • partitioning_method (Literal['area', 'length', 'coords', 'rows']) – The method to use for grouping rows into partitions. Defaults to "rows".
  • partitioning_maximum_per_file (int | float | None) – Maximum value for partitioning_method to use per file.
  • partitioning_maximum_per_chunk (int | float | None) – Maximum value for partitioning_method to use per chunk.
  • target_num_chunks (int) – The target for the number of files. Defaults to 500.
  • lonlat_cols (tuple[str, str] | None) – Names of longitude, latitude columns to construct point geometries from.
  • gdal_config (GDALOpenConfig | dict[str, Any] | None) – Configuration options to pass to GDAL for reading files.
  • overwrite (bool) – If True, overwrite the output directory if it already exists. Defaults to False.
  • as_udf (bool) – Return the ingestion workflow as a UDF. Defaults to False.

Returns

GeospatialPartitionJobStepConfig – Configuration object describing the ingestion process. Call .execute on this object to start a job.

Example

job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/",
output_metadata="s3://fused-sample/census/ca_bg_2022/fused/",
explode_geometries=True,
partitioning_maximum_per_file=2000,
partitioning_maximum_per_chunk=200,
).execute()

job.run_batch

def run_batch(
output_table: Optional[str] = ...,
instance_type: Optional[WHITELISTED_INSTANCE_TYPES] = None,
*,
region: str | None = None,
disk_size_gb: int | None = None,
additional_env: List[str] | None = None,
image_name: Optional[str] = None,
ignore_no_udf: bool = False,
ignore_no_output: bool = False,
validate_imports: Optional[bool] = None,
validate_inputs: bool = True,
overwrite: Optional[bool] = None
) -> RunResponse

Begin execution of the ingestion job by calling run_batch on the job object.

Parameters

  • output_table – The name of the table to write to. Defaults to None.
  • instance_type – The AWS EC2 instance type to use for the job. Acceptable strings are m5.large, m5.xlarge, m5.2xlarge, m5.4xlarge, m5.8xlarge, m5.12xlarge, m5.16xlarge, r5.large, r5.xlarge, r5.2xlarge, r5.4xlarge, r5.8xlarge, r5.12xlarge, or r5.16xlarge. Defaults to None.
  • region – The AWS region in which to run. Defaults to None.
  • disk_size_gb – The disk size to specify for the job. Defaults to None.
  • additional_env – Any additional environment variables to be passed into the job. Defaults to None.
  • image_name – Custom image name to run. Defaults to None for default image.
  • ignore_no_udf – Ignore validation errors about not specifying a UDF. Defaults to False.
  • ignore_no_output – Ignore validation errors about not specifying output location. Defaults to False.

Monitoring jobs

Calling run_batch returns a RunResponse object with helper methods.

# Declare ingest job
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/"
)

# Start ingest job
job_id = job.run_batch()
job_id.get_status()      # Fetch the job status
job_id.print_logs() # Fetch and print the job's logs
job_id.get_exec_time() # Determine the job's execution time
job_id.tail_logs() # Continuously print the job's logs
job_id.cancel() # Cancel the job

job.run_remote

Alias of job.run_batch for backwards compatibility. See job.run_batch above for details.