Top-Level Functions
@fused.udf
def udf(
fn: Optional[Callable] = None,
*,
schema: Union[Schema, Dict, None] = None,
name: Optional[str] = None,
cache_max_age: Optional[str] = None,
default_parameters: Optional[Dict[str, Any]] = None,
headers: Optional[Sequence[Union[str, Header]]] = None,
) -> Callable[..., GeoPandasUdfV2]:
A decorator that transforms a function into a Fused UDF.
Args:
-
schema: The schema for the DataFrame returned by the UDF. The schema may be either a string (in the form
"field_name:DataType field_name2:DataType"
, or as JSON), as a Python dictionary representing the schema, or aSchema
model object.Defaults to None, in which case a schema must be evaluated by calling
run_local
for a job to be able to write output. The return value ofrun_local
will also indicate how to include the schema in the decorator sorun_local
does not need to be run again. -
name: The name of the UDF object. Defaults to the name of the function.
-
cache_max_age: The maximum age when returning a result from the cache.
-
default_parameters: Parameters to embed in the UDF object, separately from the arguments list of the function. Defaults to None for empty parameters.
-
headers: A list of files to include as modules when running the UDF. For example, when specifying
headers=['my_header.py']
, inside the UDF function it may be referenced as:import my_header
my_header.my_function()Defaults to None for no headers.
Returns:
- A callable that represents the transformed UDF. This callable can be used within GeoPandas workflows to apply the defined operation on geospatial data.
Examples:
To create a simple UDF that calls a utility function to calculate the area of geometries in a GeoDataFrame:
@fused.udf
def udf(bbox, table_path="s3://fused-asset/infra/building_msft_us"):
...
gdf = table_to_tile(bbox, table=table_path)
return gdf
@fused.cache
DEFAULT_CACHE_MAX_AGE = "12h"
def cache(
func: Optional[Callable[..., Any]] = None,
cache_max_age: str | int = DEFAULT_CACHE_MAX_AGE,
path: str = "tmp",
concurrent_lock_timeout: str | int = 120,
reset: bool = False,
) -> Callable[..., Any]:
Decorator to cache the return value of a function.
This function serves as a decorator that can be applied to any function to cache its return values. The cache behavior can be customized through keyword arguments.
Args:
- func (Callable, optional): The function to be decorated. If None, this returns a partial decorator with the passed keyword arguments.
- cache_max_age: A string with a numbered component and units. Supported units are seconds (s), minutes (m), hours (h), and days (d) (e.g. "48h", "10s", etc.).
- path: Folder to append to the configured cache directory.
- concurrent_lock_timeout: Max amount of time in seconds for subsequent concurrent calls to wait for a previous concurrent call to finish execution and to write the cache file.
Returns:
- Callable: A decorator that, when applied to a function, caches its return values according to the specified keyword arguments.
Examples:
Use the @cache
decorator to cache the return value of a function in a custom path.
@cache(path="/tmp/custom_path/")
def expensive_function():
# Function implementation goes here
return result
If the output of a cached function changes, for example if remote data is modified,
it can be reset by running the function with the reset
keyword argument. Afterward,
the argument can be cleared.
@cache(path="/tmp/custom_path/", reset=True)
def expensive_function():
# Function implementation goes here
return result
load
def load(url_or_udf: Union[str, Path], /, *, cache_key: Any = None) -> AnyBaseUdf:
Loads a UDF from various sources including GitHub URLs, and a Fused platform-specific identifier.
This function supports loading UDFs from a GitHub repository URL, or a Fused platform-specific identifier composed of an email and UDF name. It intelligently determines the source type based on the format of the input and retrieves the UDF accordingly.
Args:
- url_or_udf: A string representing the location of the UDF, or the raw code of the UDF. The location can be a GitHub URL starting with "https://github.com", a Fused platform-specific identifier in the format "email/udf_name", or a local file path pointing to a Python file.
- cache_key: An optional key used for caching the loaded UDF. If provided, the function will attempt to load the UDF from cache using this key before attempting to load it from the specified source. Defaults to None, indicating no caching.
Returns:
- AnyBaseUdf: An instance of the loaded UDF.
Raises:
- ValueError: If the URL or Fused platform-specific identifier format is incorrect or cannot be parsed.
- Exception: For errors related to network issues, file access permissions, or other unforeseen errors during the loading process.
Examples:
Load a UDF from a GitHub URL:
udf = fused.load("https://github.com/fusedio/udfs/tree/main/public/REM_with_HyRiver/")
Load a UDF using a Fused platform-specific identifier:
udf = fused.load("username@fused.io/REM_with_HyRiver")
run
def run(
udf: Union[str, None, UdfJobStepConfig, GeoPandasUdfV2, UdfAccessToken] = None,
*args,
x: Optional[int] = None,
y: Optional[int] = None,
z: Optional[int] = None,
sync: bool = True,
engine: Optional[Literal["remote", "local"]] = None,
type: Optional[Literal["tile", "file"]] = None,
max_retry: int = 0,
cache_max_age: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
_include_log: Optional[bool] = False,
_return_response: Optional[bool] = False,
**kw_parameters,
):
Executes a user-defined function (UDF) with various execution and input options.
This function supports executing UDFs in different environments (local or remote), with different types of inputs (tile coordinates, geographical bounding boxes, etc.), and allows for both synchronous and asynchronous execution. It dynamically determines the execution path based on the provided parameters.
Args:
- udf (str, GeoPandasUdfV2 or UdfJobStepConfig): the UDF to execute.
The UDF can be specified in several ways:
- A string representing a UDF name or UDF shared token.
- A UDF object.
- A UdfJobStepConfig object for detailed execution configuration.
- x, y, z: Tile coordinates for tile-based UDF execution.
- sync: If True, execute the UDF synchronously. If False, execute asynchronously.
- engine: The execution engine to use ('remote' or 'local').
- type: The type of UDF execution ('tile' or 'file').
- max_retry: The maximum number of retries to attempt if the UDF fails. By default does not retry.
- cache_max_age: The maximum age when returning a result from the cache.
Supported units are seconds (s), minutes (m), hours (h), and days (d) (e.g. “48h”, “10s”, etc.).
Default is
None
so a UDF run withfused.run()
will followcache_max_age
defined in@fused.udf()
unless this value is changed. - parameters: Additional parameters to pass to the UDF.
- **kw_parameters: Additional parameters to pass to the UDF.
Raises:
- ValueError: If the UDF is not specified or is specified in more than one way.
- TypeError: If the first parameter is not of an expected type.
- Warning: Various warnings are issued for ignored parameters based on the execution path chosen.
Returns:
- The result of the UDF execution, which varies based on the UDF and execution path.
Examples:
Run a UDF saved in the Fused system:
fused.run("username@fused.io/my_udf_name")
Run a UDF saved in GitHub:
loaded_udf = fused.load("https://github.com/fusedio/udfs/tree/main/public/Building_Tile_Example")
fused.run(loaded_udf, bbox=bbox)
Run a UDF saved in a local directory:
loaded_udf = fused.load("/Users/local/dir/Building_Tile_Example")
fused.run(loaded_udf, bbox=bbox)
Note:
This function dynamically determines the execution path and parameters based on the inputs. It is designed to be flexible and support various UDF execution scenarios.
submit
def submit(
udf,
arg_list,
/,
*,
engine: Optional[Literal["remote", "local"]] = "remote",
max_workers: Optional[int] = None,
max_retry: int = 2,
debug_mode: bool = False,
collect: bool = True,
**kwargs,
) -> Union[JobPool, ResultType, "pd.DataFrame"]:
Executes a user-defined function (UDF) multiple times for a list of input parameters, and return immediately a "lazy" JobPool object allowing to inspect the jobs and wait on the results.
See fused.run
for more details on the UDF execution.
Args:
-
udf: the UDF to execute. See
fused.run
for more details on how tos specify the UDF. -
arg_list: a list of input parameters for the UDF. Can be specified as:
- a list of values for parametrizing over a single parameter, i.e. the first parameter of the UDF
- a list of dictionaries for parametrizing over multiple parameters
- A DataFrame for parametrizing over multiple parameters where each row is a set of parameters
-
engine: The execution engine to use. Defaults to 'remote'.
-
max_workers: The maximum number of workers to use. Defaults to 32.
-
max_retry: The maximum number of retries for failed jobs. Defaults to 2.
-
debug_mode: If True, executes only the first item in arg_list directly using
fused.run()
, useful for debugging UDF execution. Default is False. -
collect: If True, waits for all jobs to complete and returns the collected DataFrame containing the results. If False, returns a JobPool object, which is non-blocking and allows you to inspect the individual results and logs. Default is True.
Returns:
- JobPool
Examples:
Run a UDF multiple times for the values 0 to 9 passed to as the first positional argument of the UDF:
pool = fused.submit("username@fused.io/my_udf_name", range(10))
Being explicit about the parameter name:
pool = fused.submit(udf, [dict(n=i) for i in range(10)])
download
def download(path: str, local_path: Union[str, Path]) -> None:
Download the contents at the path to disk.
Args:
path
: URL to a file, likefd://bucket-name/file.parquet
local_path
: Path to a local file.
ingest
def ingest(
input: Union[str, Path, Sequence[Union[str, Path]], "gpd.GeoDataFrame"],
output: Optional[str] = None,
*,
output_metadata: Optional[str] = None,
schema: Optional[Schema] = None,
file_suffix: Optional[str] = None,
load_columns: Optional[Sequence[str]] = None,
remove_cols: Optional[Sequence[str]] = None,
explode_geometries: bool = False,
drop_out_of_bounds: Optional[bool] = None,
partitioning_method: Literal["area", "length", "coords", "rows"] = "rows",
partitioning_maximum_per_file: Union[int, float, None] = None,
partitioning_maximum_per_chunk: Union[int, float, None] = None,
partitioning_max_width_ratio: Union[int, float] = 2,
partitioning_max_height_ratio: Union[int, float] = 2,
partitioning_force_utm: Literal["file", "chunk", None] = "chunk",
partitioning_split_method: Literal["mean", "median"] = "mean",
subdivide_method: Literal["area", None] = None,
subdivide_start: Optional[float] = None,
subdivide_stop: Optional[float] = None,
split_identical_centroids: bool = True,
target_num_chunks: int = 5000,
lonlat_cols: Optional[Tuple[str, str]] = None,
partitioning_schema_input: Optional[Union[str, "pd.DataFrame"]] = None,
gdal_config: Union[GDALOpenConfig, Dict[str, Any], None] = None,
) -> GeospatialPartitionJobStepConfig:
Ingest a dataset into the Fused partitioned format.
Args:
-
input: A GeoPandas
GeoDataFrame
or a path to file or files on S3 to ingest. Files may be Parquet or another geo data format. -
output: Location on S3 to write the
main
table to. -
output_metadata: Location on S3 to write the
fused
table to. -
schema: Schema of the data to be ingested. This is optional and will be inferred from the data if not provided.
-
file_suffix: filter which files are used for ingestion. If
input
is a directory on S3, all files under that directory will be listed and used for ingestion. Iffile_suffix
is not None, it will be used to filter paths by checking the trailing characters of each filename. E.g. passfile_suffix=".geojson"
to include only GeoJSON files inside the directory. -
load_columns: Read only this set of columns when ingesting geospatial datasets. Defaults to all columns.
-
remove_cols: The named columns to drop when ingesting geospatial datasets. Defaults to not drop any columns.
-
explode_geometries: Whether to unpack multipart geometries to single geometries when ingesting geospatial datasets, saving each part as its own row. Defaults to
False
. -
drop_out_of_bounds: Whether to drop geometries outside of the expected WGS84 bounds. Defaults to True.
-
partitioning_method: The method to use for grouping rows into partitions. Defaults to
"rows"
."area"
: Construct partitions where all contain a maximum total area among geometries."length"
: Construct partitions where all contain a maximum total length among geometries."coords"
: Construct partitions where all contain a maximum total number of coordinates among geometries."rows"
: Construct partitions where all contain a maximum number of rows.
-
partitioning_maximum_per_file: Maximum value for
partitioning_method
to use per file. IfNone
, defaults to 1/10th of the total value ofpartitioning_method
. So if the value isNone
andpartitioning_method
is"area"
, then each file will be have no more than 1/10th the total area of all geometries. Defaults toNone
. -
partitioning_maximum_per_chunk: Maximum value for
partitioning_method
to use per chunk. IfNone
, defaults to 1/100th of the total value ofpartitioning_method
. So if the value isNone
andpartitioning_method
is"area"
, then each file will be have no more than 1/100th the total area of all geometries. Defaults toNone
. -
partitioning_max_width_ratio: The maximum ratio of width to height of each partition to use in the ingestion process. So for example, if the value is
2
, then if the width divided by the height is greater than2
, the box will be split in half along the horizontal axis. Defaults to2
. -
partitioning_max_height_ratio: The maximum ratio of height to width of each partition to use in the ingestion process. So for example, if the value is
2
, then if the height divided by the width is greater than2
, the box will be split in half along the vertical axis. Defaults to2
. -
partitioning_force_utm: Whether to force partitioning within UTM zones. If set to
"file"
, this will ensure that the centroid of all geometries per file are contained in the same UTM zone. If set to"chunk"
, this will ensure that the centroid of all geometries per chunk are contained in the same UTM zone. If set toNone
, then no UTM-based partitioning will be done. Defaults to "chunk". -
partitioning_split_method: How to split one partition into children. Defaults to
"mean"
(this may change in the future)."mean"
: Split each axis according to the mean of the centroid values."median"
: Split each axis according to the median of the centroid values.
-
subdivide_method: The method to use for subdividing large geometries into multiple rows. Currently the only option is
"area"
, where geometries will be subdivided based on their area (in WGS84 degrees). -
subdivide_start: The value above which geometries will be subdivided into smaller parts, according to
subdivide_method
. -
subdivide_stop: The value below which geometries will not be subdivided into smaller parts, according to
subdivide_method
. Recommended to be equal to subdivide_start. IfNone
, geometries will be subdivided up to a recursion depth of 100 or until the subdivided geometry is rectangular. -
split_identical_centroids: If
True
, should split a partition that has identical centroids (such as if all geometries in the partition are the same) if there are more such rows than defined in "partitioning_maximum_per_file" and "partitioning_maximum_per_chunk". -
target_num_chunks: The target for the number of files if
partitioning_maximum_per_file
is None. Note that this number is only a target and the actual number of files generated can be higher or lower than this number, depending on the spatial distribution of the data itself. -
lonlat_cols: Names of longitude, latitude columns to construct point geometries from.
If your point columns are named
"x"
and"y"
, then pass:fused.ingest(
...,
lonlat_cols=("x", "y")
)This only applies to reading from Parquet files. For reading from CSV files, pass options to
gdal_config
. -
gdal_config: Configuration options to pass to GDAL for how to read these files. For all files other than Parquet files, Fused uses GDAL as a step in the ingestion process. For some inputs, like CSV files or zipped shapefiles, you may need to pass some parameters to GDAL to tell it how to open your files.
This config is expected to be a dictionary with up to two keys:
layer
:str
. Define the layer of the input file you wish to read when the source contains multiple layers, as in GeoPackage.open_options
:Dict[str, str]
. Pass in key-value pairs with GDAL open options. These are defined on each driver's page in the GDAL documentation. For example, the CSV driver defines these open options you can pass in.
For example, if you're ingesting a CSV file with two columns
"longitude"
and"latitude"
denoting the coordinate information, passfused.ingest(
...,
gdal_config={
"open_options": {
"X_POSSIBLE_NAMES": "longitude",
"Y_POSSIBLE_NAMES": "latitude",
}
}
)
Returns:
- Configuration object describing the ingestion process. Call
.execute
on this object to start a job.
Examples:
For example, to ingest the California Census dataset for the year 2022:
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/",
output_metadata="s3://fused-sample/census/ca_bg_2022/fused/",
explode_geometries=True,
partitioning_maximum_per_file=2000,
partitioning_maximum_per_chunk=200,
).execute()
job.run_remote
def run_remote(output_table: Optional[str] = ...,
instance_type: Optional[WHITELISTED_INSTANCE_TYPES] = None,
*,
region: str | None = None,
disk_size_gb: int | None = None,
additional_env: List[str] | None = None,
image_name: Optional[str] = None,
ignore_no_udf: bool = False,
ignore_no_output: bool = False,
validate_imports: Optional[bool] = None,
validate_inputs: bool = True,
overwrite: Optional[bool] = None) -> RunResponse
Begin execution of the ingestion job by calling run_remote
on the job object.
Arguments:
output_table
- The name of the table to write to. Defaults to None.instance_type
- The AWS EC2 instance type to use for the job. Acceptable strings arem5.large
,m5.xlarge
,m5.2xlarge
,m5.4xlarge
,m5.8xlarge
,m5.12xlarge
,m5.16xlarge
,r5.large
,r5.xlarge
,r5.2xlarge
,r5.4xlarge
,r5.8xlarge
,r5.12xlarge
, orr5.16xlarge
. Defaults to None.region
- The AWS region in which to run. Defaults to None.disk_size_gb
- The disk size to specify for the job. Defaults to None.additional_env
- Any additional environment variables to be passed into the job. Defaults to None.image_name
- Custom image name to run. Defaults to None for default image.ignore_no_udf
- Ignore validation errors about not specifying a UDF. Defaults to False.ignore_no_output
- Ignore validation errors about not specifying output location. Defaults to False.
Monitor and manage job
Calling run_remote
returns a RunResponse
object with helper methods.
# Declare ingest job
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/"
)
# Start ingest job
job_id = job.run_remote()
Fetch the job status.
job_id.get_status()
Fetch and print the job's logs.
job_id.print_logs()
Determine the job's execution time.
job_id.get_exec_time()
Continuously print the job's logs.
job_id.tail_logs()
Cancel the job.
job_id.cancel()
file_path
def file_path(file_path: str, mkdir: bool = True) -> str:
Creates a directory in a predefined temporary directory.
This gives users the ability to manage directories during the execution of a UDF. It takes a relative file_path, creates the corresponding directory structure, and returns its absolute path.
This is useful for UDFs that temporarily store intermediate results as files, such as when writing intermediary files to disk when processing large datasets. file_path ensures that necessary directories exist.
Args:
- file_path: The file path to locate.
- mkdir: If True, create the directory if it doesn't already exist. Defaults to True.
Returns:
- The located file path.
get_chunks_metadata
def get_chunks_metadata(url: str) -> "gpd.GeoDataFrame":
Returns a GeoDataFrame with each chunk in the table as a row.
Args:
- url: URL of the table.
get_chunk_from_table
def get_chunk_from_table(
url: str,
file_id: Union[str, int, None],
chunk_id: Optional[int],
*,
columns: Optional[Iterable[str]] = None,
) -> "gpd.GeoDataFrame":
Returns a chunk from a table and chunk coordinates.
This can be called with file_id and chunk_id from get_chunks_metadata
.
Args:
- url: URL of the table.
- file_id: File ID to read. -chunk_id: Chunk ID to read.