ingest
ingest(
input: str | Path | Sequence[str | Path] | gpd.GeoDataFrame,
output: str | None = None,
*,
output_metadata: str | None = None,
schema: Schema | None = None,
file_suffix: str | None = None,
load_columns: Sequence[str] | None = None,
remove_cols: Sequence[str] | None = None,
explode_geometries: bool = False,
drop_out_of_bounds: bool | None = None,
partitioning_method: Literal["area", "length", "coords", "rows"] = "rows",
partitioning_maximum_per_file: int | float | None = None,
partitioning_maximum_per_chunk: int | float | None = None,
partitioning_max_width_ratio: int | float = 2,
partitioning_max_height_ratio: int | float = 2,
partitioning_force_utm: Literal["file", "chunk", None] = "chunk",
partitioning_split_method: Literal["mean", "median"] = "mean",
subdivide_method: Literal["area", None] = None,
subdivide_start: float | None = None,
subdivide_stop: float | None = None,
split_identical_centroids: bool = True,
target_num_chunks: int = 500,
lonlat_cols: tuple[str, str] | None = None,
partitioning_schema_input: str | pd.DataFrame | None = None,
gdal_config: GDALOpenConfig | dict[str, Any] | None = None,
overwrite: bool = False,
as_udf: bool = False
) -> GeospatialPartitionJobStepConfig
Ingest a dataset into the Fused partitioned format.
Parameters
- input (
str | Path | Sequence[str | Path] | gpd.GeoDataFrame) – A GeoPandasGeoDataFrameor a path to file or files on S3 to ingest. Files may be Parquet or another geo data format. - output (
str | None) – Location on S3 to write themaintable to. - output_metadata (
str | None) – Location on S3 to write thefusedtable to. - schema (
Schema | None) – Schema of the data to be ingested. Optional, will be inferred if not provided. - file_suffix (
str | None) – Filter which files are used for ingestion (e.g..geojson). - load_columns (
Sequence[str] | None) – Read only this set of columns. Defaults to all columns. - remove_cols (
Sequence[str] | None) – Columns to drop when ingesting. - explode_geometries (
bool) – Whether to unpack multipart geometries to single geometries. Defaults toFalse. - drop_out_of_bounds (
bool | None) – Whether to drop geometries outside of the expected WGS84 bounds. Defaults to True. - partitioning_method (
Literal['area', 'length', 'coords', 'rows']) – The method to use for grouping rows into partitions. Defaults to"rows". - partitioning_maximum_per_file (
int | float | None) – Maximum value forpartitioning_methodto use per file. - partitioning_maximum_per_chunk (
int | float | None) – Maximum value forpartitioning_methodto use per chunk. - target_num_chunks (
int) – The target for the number of files. Defaults to 500. - lonlat_cols (
tuple[str, str] | None) – Names of longitude, latitude columns to construct point geometries from. - gdal_config (
GDALOpenConfig | dict[str, Any] | None) – Configuration options to pass to GDAL for reading files. - overwrite (
bool) – If True, overwrite the output directory if it already exists. Defaults to False. - as_udf (
bool) – Return the ingestion workflow as a UDF. Defaults to False.
Returns
GeospatialPartitionJobStepConfig – Configuration object describing the ingestion process. Call .execute on this object to start a job.
Example
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/",
output_metadata="s3://fused-sample/census/ca_bg_2022/fused/",
explode_geometries=True,
partitioning_maximum_per_file=2000,
partitioning_maximum_per_chunk=200,
).execute()
job.run_batch
def run_batch(
output_table: Optional[str] = ...,
instance_type: Optional[WHITELISTED_INSTANCE_TYPES] = None,
*,
region: str | None = None,
disk_size_gb: int | None = None,
additional_env: List[str] | None = None,
image_name: Optional[str] = None,
ignore_no_udf: bool = False,
ignore_no_output: bool = False,
validate_imports: Optional[bool] = None,
validate_inputs: bool = True,
overwrite: Optional[bool] = None
) -> RunResponse
Begin execution of the ingestion job by calling run_batch on the job object.
Parameters
output_table– The name of the table to write to. Defaults to None.instance_type– The AWS EC2 instance type to use for the job. Acceptable strings arem5.large,m5.xlarge,m5.2xlarge,m5.4xlarge,m5.8xlarge,m5.12xlarge,m5.16xlarge,r5.large,r5.xlarge,r5.2xlarge,r5.4xlarge,r5.8xlarge,r5.12xlarge, orr5.16xlarge. Defaults to None.region– The AWS region in which to run. Defaults to None.disk_size_gb– The disk size to specify for the job. Defaults to None.additional_env– Any additional environment variables to be passed into the job. Defaults to None.image_name– Custom image name to run. Defaults to None for default image.ignore_no_udf– Ignore validation errors about not specifying a UDF. Defaults to False.ignore_no_output– Ignore validation errors about not specifying output location. Defaults to False.
Monitoring jobs
Calling run_batch returns a RunResponse object with helper methods.
# Declare ingest job
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/"
)
# Start ingest job
job_id = job.run_batch()
job_id.get_status() # Fetch the job status
job_id.print_logs() # Fetch and print the job's logs
job_id.get_exec_time() # Determine the job's execution time
job_id.tail_logs() # Continuously print the job's logs
job_id.cancel() # Cancel the job
job.run_remote
Alias of job.run_batch for backwards compatibility. See job.run_batch above for details.