ingest
ingest(
input: str | Path | Sequence[str | Path] | gpd.GeoDataFrame,
output: str | None = None,
*,
output_metadata: str | None = None,
schema: Schema | None = None,
file_suffix: str | None = None,
load_columns: Sequence[str] | None = None,
remove_cols: Sequence[str] | None = None,
explode_geometries: bool = False,
drop_out_of_bounds: bool | None = None,
partitioning_method: Literal["area", "length", "coords", "rows"] = "rows",
partitioning_maximum_per_file: int | float | None = None,
partitioning_maximum_per_chunk: int | float | None = None,
partitioning_max_width_ratio: int | float = 2,
partitioning_max_height_ratio: int | float = 2,
partitioning_force_utm: Literal["file", "chunk", None] = "chunk",
partitioning_split_method: Literal["mean", "median"] = "mean",
subdivide_method: Literal["area", None] = None,
subdivide_start: float | None = None,
subdivide_stop: float | None = None,
split_identical_centroids: bool = True,
target_num_chunks: int = 500,
lonlat_cols: tuple[str, str] | None = None,
partitioning_schema_input: str | pd.DataFrame | None = None,
gdal_config: GDALOpenConfig | dict[str, Any] | None = None,
overwrite: bool = False,
as_udf: bool = False
) -> GeospatialPartitionJobStepConfig
Ingest a dataset into the Fused partitioned format.
Parameters
- input (
str | Path | Sequence[str | Path] | gpd.GeoDataFrame) – A GeoPandasGeoDataFrameor a path to file or files on S3 to ingest. Files may be Parquet or another geo data format. - output (
str | None) – Location on S3 to write themaintable to. - output_metadata (
str | None) – Location on S3 to write thefusedtable to. - schema (
Schema | None) – Schema of the data to be ingested. This is optional and will be inferred from the data if not provided. - file_suffix (
str | None) – filter which files are used for ingestion. Ifinputis a directory on S3, all files under that directory will be listed and used for ingestion. Iffile_suffixis not None, it will be used to filter paths by checking the trailing characters of each filename. E.g. passfile_suffix=".geojson"to include only GeoJSON files inside the directory. - load_columns (
Sequence[str] | None) – Read only this set of columns when ingesting geospatial datasets. Defaults to all columns. - remove_cols (
Sequence[str] | None) – The named columns to drop when ingesting geospatial datasets. Defaults to not drop any columns. - explode_geometries (
bool) – Whether to unpack multipart geometries to single geometries when ingesting geospatial datasets, saving each part as its own row. Defaults toFalse. - drop_out_of_bounds (
bool | None) – Whether to drop geometries outside of the expected WGS84 bounds. Defaults to True. - partitioning_method (
Literal['area', 'length', 'coords', 'rows']) – The method to use for grouping rows into partitions. Defaults to"rows"."area": Construct partitions where all contain a maximum total area among geometries."length": Construct partitions where all contain a maximum total length among geometries."coords": Construct partitions where all contain a maximum total number of coordinates among geometries."rows": Construct partitions where all contain a maximum number of rows.
- partitioning_maximum_per_file (
int | float | None) – Maximum value forpartitioning_methodto use per file. IfNone, defaults to 1/10th of the total value ofpartitioning_method. So if the value isNoneandpartitioning_methodis"area", then each file will have no more than 1/10th the total area of all geometries. Defaults toNone. - partitioning_maximum_per_chunk (
int | float | None) – Maximum value forpartitioning_methodto use per chunk. IfNone, defaults to 1/100th of the total value ofpartitioning_method. So if the value isNoneandpartitioning_methodis"area", then each file will have no more than 1/100th the total area of all geometries. Defaults toNone. - partitioning_max_width_ratio (
int | float) – The maximum ratio of width to height of each partition to use in the ingestion process. So for example, if the value is2, then if the width divided by the height is greater than2, the box will be split in half along the horizontal axis. Defaults to2. - partitioning_max_height_ratio (
int | float) – The maximum ratio of height to width of each partition to use in the ingestion process. So for example, if the value is2, then if the height divided by the width is greater than2, the box will be split in half along the vertical axis. Defaults to2. - partitioning_force_utm (
Literal["file", "chunk", None]) – Whether to force partitioning within UTM zones. If set to"file", this will ensure that the centroid of all geometries per file are contained in the same UTM zone. If set to"chunk", this will ensure that the centroid of all geometries per chunk are contained in the same UTM zone. If set toNone, then no UTM-based partitioning will be done. Defaults to "chunk". - partitioning_split_method (
Literal["mean", "median"]) – How to split one partition into children. Defaults to"mean"(this may change in the future)."mean": Split each axis according to the mean of the centroid values."median": Split each axis according to the median of the centroid values.
- subdivide_method (
Literal["area", None]) – The method to use for subdividing large geometries into multiple rows. Currently the only option is"area", where geometries will be subdivided based on their area (in WGS84 degrees). - subdivide_start (
float | None) – The value above which geometries will be subdivided into smaller parts, according tosubdivide_method. - subdivide_stop (
float | None) – The value below which geometries will not be subdivided into smaller parts, according tosubdivide_method. Recommended to be equal to subdivide_start. IfNone, geometries will be subdivided up to a recursion depth of 100 or until the subdivided geometry is rectangular. - split_identical_centroids (
bool) – IfTrue, should split a partition that has identical centroids (such as if all geometries in the partition are the same) if there are more such rows than defined in "partitioning_maximum_per_file" and "partitioning_maximum_per_chunk". - target_num_chunks (
int) – The target for the number of files ifpartitioning_maximum_per_fileis None. Note that this number is only a target and the actual number of files generated can be higher or lower than this number, depending on the spatial distribution of the data itself. Defaults to 500, rough default to use in most cases. - lonlat_cols (
tuple[str, str] | None) – Names of longitude, latitude columns to construct point geometries from. If your point columns are named"x"and"y", then passlonlat_cols=("x", "y"). This only applies to reading from Parquet files. For reading from CSV files, pass options togdal_config. - gdal_config (
GDALOpenConfig | dict[str, Any] | None) – Configuration options to pass to GDAL for how to read these files. For all files other than Parquet files, Fused uses GDAL as a step in the ingestion process. For some inputs, like CSV files or zipped shapefiles, you may need to pass some parameters to GDAL to tell it how to open your files. This config is expected to be a dictionary with up to two keys:layer(str): Define the layer of the input file you wish to read when the source contains multiple layers, as in GeoPackage.open_options(Dict[str, str]): Pass in key-value pairs with GDAL open options. These are defined on each driver's page in the GDAL documentation. For example, if you're ingesting a CSV file with two columns"longitude"and"latitude"denoting the coordinate information, passgdal_config={"open_options": {"X_POSSIBLE_NAMES": "longitude", "Y_POSSIBLE_NAMES": "latitude"}}. - overwrite (
bool) – If True, overwrite the output directory if it already exists (by first removing all existing content of the directory, i.e. it does not only overwrite conflicting files). Defaults to False. - as_udf (
bool) – Return the ingestion workflow as a UDF that can be executed usingfused.run(). Local files or python objects passed toinputorpartitioning_schema_inputare still uploaded to S3 first such that those are available when executing the UDF remotely. Defaults to False.
Returns
Configuration object describing the ingestion process. Call .execute on this object to start a job.
Example
For example, to ingest the California Census dataset for the year 2022:
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/",
output_metadata="s3://fused-sample/census/ca_bg_2022/fused/",
explode_geometries=True,
partitioning_maximum_per_file=2000,
partitioning_maximum_per_chunk=200,
).run_batch()
job.run_batch
def run_batch(
output_table: Optional[str] = ...,
instance_type: Optional[WHITELISTED_INSTANCE_TYPES] = None,
*,
region: str | None = None,
disk_size_gb: int | None = None,
additional_env: List[str] | None = None,
image_name: Optional[str] = None,
ignore_no_udf: bool = False,
ignore_no_output: bool = False,
validate_imports: Optional[bool] = None,
validate_inputs: bool = True,
overwrite: Optional[bool] = None
) -> RunResponse
Begin execution of the ingestion job by calling run_batch on the job object.
Parameters
output_table– The name of the table to write to. Defaults to None.instance_type– The AWS EC2 instance type to use for the job. Acceptable strings arem5.large,m5.xlarge,m5.2xlarge,m5.4xlarge,m5.8xlarge,m5.12xlarge,m5.16xlarge,r5.large,r5.xlarge,r5.2xlarge,r5.4xlarge,r5.8xlarge,r5.12xlarge, orr5.16xlarge. Defaults to None.region– The AWS region in which to run. Defaults to None.disk_size_gb– The disk size to specify for the job. Defaults to None.additional_env– Any additional environment variables to be passed into the job. Defaults to None.image_name– Custom image name to run. Defaults to None for default image.ignore_no_udf– Ignore validation errors about not specifying a UDF. Defaults to False.ignore_no_output– Ignore validation errors about not specifying output location. Defaults to False.
Monitoring jobs
Calling run_batch returns a RunResponse object with helper methods.
# Declare ingest job
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/"
)
# Start ingest job
job_id = job.run_batch()
job_id.get_status() # Fetch the job status
job_id.print_logs() # Fetch and print the job's logs
job_id.get_exec_time() # Determine the job's execution time
job_id.tail_logs() # Continuously print the job's logs
job_id.cancel() # Cancel the job
job.run_remote
Alias of job.run_batch for backwards compatibility. See job.run_batch above for details.