Skip to main content

Top-Level Functions

@fused.udf

def udf(
fn: Optional[Callable] = None,
*,
schema: Union[Schema, Dict, None] = None,
name: Optional[str] = None,
cache_max_age: Optional[str] = None,
default_parameters: Optional[Dict[str, Any]] = None,
headers: Optional[Sequence[Union[str, Header]]] = None
) -> Callable[..., GeoPandasUdfV2Callable]

A decorator that transforms a function into a Fused UDF.

Arguments:

  • schema Schema | Dict | None - The schema for the DataFrame returned by the UDF. The schema may be either a string (in the form "field_name:DataType field_name2:DataType", or as JSON), as a Python dictionary representing the schema, or a Schema model object. Defaults to None, in which case a schema must be evaluated by calling run_local for a job to be able to write output. The return value of run_local will also indicate how to include the schema in the decorator so run_local does not need to be run again.
  • name str | None - The name of the UDF object. Defaults to the name of the function.
  • cache_max_age str | None - The maximum age when returning a result from the cache. Defaults to "90d"
  • default_parameters Dict[str, Any] | None - Parameters to embed in the UDF object, separately from the arguments list of the function. Defaults to None for empty parameters.
  • headers Sequence[str | Header] | None - A list of files to include as modules when running the UDF. For example, when specifying headers=['my_header.py'], inside the UDF function it may be referenced as:
    import my_header
    my_header.my_function()
    Defaults to None for no headers.
  • fn Callable | None - The function to decorate.

Returns:

  • Callable - A callable that represents the transformed UDF. This callable can be used within GeoPandas workflows to apply the defined operation on geospatial data.

Examples:

To create a simple UDF that calls a utility function to calculate the area of geometries in a GeoDataFrame:

@fused.udf
def udf(bbox, table_path="s3://fused-asset/infra/building_msft_us"):
...
gdf = table_to_tile(bbox, table=table_path)
return gdf

@fused.cache

def cache(
func: Optional[Callable[..., Any]] = None,
cache_max_age: Union[str, int] = '12h',
path: str = 'tmp',
concurrent_lock_timeout: int = 120,
**kwargs: Any
) -> Callable[..., Any]

Decorator to cache the return value of a function.

This function serves as a decorator that can be applied to any function to cache its return values. The cache behavior can be customized through keyword arguments.

Arguments:

  • func Callable, optional - The function to be decorated. If None, this returns a partial decorator with the passed keyword arguments.
  • cache_max_age str | int - A string with a numbered component and units. Supported units are seconds (s), minutes (m), hours (h), and days (d) (e.g. “48h”, “10s”, etc.). Defaults to "12h"
  • path str - Folder to append to the configured cache directory.
  • concurrent_lock_timeout int - Max amount of time in seconds for subsequent concurrent calls to wait for a previous concurrent call to finish execution and to write the cache file.
  • **kwargs - Arbitrary keyword arguments that are passed to the internal caching mechanism. These could specify cache size, expiration time, and other cache-related settings.

Returns:

  • Callable - A decorator that, when applied to a function, caches its return values according to the specified keyword arguments.

Examples:

Use the @cache decorator to cache the return value of a function in a custom path.

@cache(path="/mount/custom_path/")
def expensive_function():
# Function implementation goes here
return result

If the output of a cached function changes, for example, if remote data is modified, it can be reset by running the function with the reset keyword argument. Afterward, the argument can be cleared.

@cache(path="/mount/custom_path/", reset=True)
def expensive_function():
# Function implementation goes here
return result

load

def load(url_or_udf: Union[str, Path], *, cache_key: Any = None) -> AnyBaseUdf

Loads a UDF from various sources including GitHub URLs, local files, or directories, and a Fused platform-specific identifier.

This function supports loading UDFs from a GitHub repository URL, a local file path, a directory containing UDF definitions, or a Fused platform-specific identifier composed of an email and UDF name. It intelligently determines the source type based on the format of the input and retrieves the UDF accordingly.

Arguments:

  • url_or_udf - A string or Path object representing the location of the UDF. This can be a GitHub URL starting with "https://github.com", a local file path, a directory containing one or more UDFs, or a Fused platform-specific identifier in the format "email/udf_name".
  • cache_key - An optional key used for caching the loaded UDF. If provided, the function will attempt to load the UDF from cache using this key before attempting to load it from the specified source. Defaults to None, indicating no caching.

Returns:

  • AnyBaseUdf - An instance of the loaded UDF.

Raises:

  • FileNotFoundError - If a local file or directory path is provided but does not exist.
  • ValueError - If the URL or Fused platform-specific identifier format is incorrect or cannot be parsed.
  • Exception - For errors related to network issues, file access permissions, or other unforeseen errors during the loading process.

Examples:

Load a UDF from a GitHub URL:

udf = fused.load("https://github.com/fusedio/udfs/tree/main/public/REM_with_HyRiver/")

Load a UDF from a local file:

udf = fused.load("/localpath/REM_with_HyRiver/")

Load a UDF using a Fused platform-specific identifier:

udf = fused.load("username@fused.io/REM_with_HyRiver")

run

def run(
udf: Optional[Union[str, GeoPandasUdfV2, UdfJobStepConfig]] = None,
*args: Any,
x: Optional[int] = None,
y: Optional[int] = None,
z: Optional[int] = None,
sync: bool = True,
engine: Optional[Literal["remote", "local"]] = None,
type: Optional[Literal["tile", "file"]] = None,
max_retry: int = 0,
cache_max_age: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
_include_log: bool = False,
_return_response: bool = False,
**kw_parameters: Any
) -> Any

Executes a user-defined function (UDF) with various execution and input options.

This function supports executing UDFs in different environments (local or remote), with different types of inputs (tile coordinates, geographical bounding boxes, etc.), and allows for both synchronous and asynchronous execution. It dynamically determines the execution path based on the provided parameters.

Arguments:

  • udf - The UDF to execute. The UDF can be specified in several ways:
    • A string representing a UDF name or UDF shared token.
    • A UDF object.
    • A UdfJobStepConfig object for detailed execution configuration.
  • x int | None - Tile coordinates for tile-based UDF execution.
  • y int | None - Tile coordinates for tile-based UDF execution.
  • z int | None - Tile coordinates for tile-based UDF execution.
  • sync bool - If True, execute the UDF synchronously. If False, execute asynchronously.
  • engine Literal["remote", "local"] | None - The execution engine to use ('remote' or 'local').
  • type Literal["tile", "file"] | None - The type of UDF execution ('tile' or 'file').
  • max_retry int - The maximum number of retries to attempt if the UDF fails. By default does not retry.
  • cache_max_age str | None - The maximum age when returning a result from the cache. Default is None so a UDF run with fused.run() will follow cache_max_age defined in @fused.udf() unless this value is changed.
  • parameters Dict[str, Any] | None - Additional parameters to pass to the UDF.
  • _include_log bool | None
  • _return_response bool | None
  • **kw_parameters - Additional parameters to pass to the UDF.

Raises:

  • ValueError - If the UDF is not specified or is specified in more than one way.
  • TypeError - If the first parameter is not of an expected type.
  • Warning - Various warnings are issued for ignored parameters based on the execution path chosen.

Returns:

The result of the UDF execution, which varies based on the UDF and execution path.

Examples:

Run a UDF saved in the Fused system:

    fused.run("username@fused.io/my_udf_name")

Run a UDF saved in GitHub:

    loaded_udf = fused.load("https://github.com/fusedio/udfs/tree/main/public/Building_Tile_Example")
fused.run(loaded_udf, bbox=bbox)

Run a UDF saved in a local directory:

    loaded_udf = fused.load("/Users/local/dir/Building_Tile_Example")
fused.run(loaded_udf, bbox=bbox)
note

This function dynamically determines the execution path and parameters based on the inputs. It is designed to be flexible and support various UDF execution scenarios.

Because the output must be serializable and returned via an HTTP response, Fused validates the output of UDFs that execute remotely with the realtime engine and will hold back invalid responses. This might result in perceived inconsistencies because running locally with the local engine does not validate and will instead return any output. See the set of supported return object types here.

import fused

@fused.udf
def udf(x=1):
return x
fused.run(udf, engine='local') # Returns 1
fused.run(udf, engine='remote') # Returns None because output is not a valid response object

submit

def submit(
udf: Callable[..., Any],
arg_list: Union[List[Any], List[Dict[str, Any]], pd.DataFrame],
*,
engine: str = 'remote',
max_workers: Optional[int] = None,
max_retry: int = 2,
debug_mode: bool = False,
wait_on_results: bool = False,
**kwargs: Any
) -> JobPool

Executes a user-defined function (UDF) multiple times for a list of input parameters, and returns immediately a “lazy” JobPool object allowing to inspect the jobs and wait on the results.

See fused.run for more details on the UDF execution.

Arguments:

  • udf - The UDF to execute. See fused.run for more details on how to specify the UDF.
  • arg_list - A list of input parameters for the UDF. Can be specified as:
    • A list of values for parametrizing over a single parameter, i.e., the first parameter of the UDF.
    • A list of dictionaries for parametrizing over multiple parameters.
    • A DataFrame for parametrizing over multiple parameters where each row is a set of parameters.
  • engine str - The execution engine to use. Defaults to 'remote'.
  • max_workers int | None - The maximum number of workers to use. Defaults to None.
  • max_retry int - The maximum number of retries for failed jobs. Defaults to 2.
  • debug_mode bool - If True, enables debug mode. Defaults to False.
  • wait_on_results bool - If True, waits for all results before returning. Defaults to False.
  • **kwargs - Additional keyword arguments to pass to the UDF.

Returns:

  • JobPool - A JobPool object that allows inspecting the jobs and waiting on the results.

Examples:

Running a UDF with values passed as first positional argument:

pool = fused.submit(udf, [dict(n=i) for i in range(10)])

Being explicit about parameter names:

pool = fused.submit(udf, [dict(n=i) for i in range(10)])

download

def download(url: str, file_path: str) -> str

Download a file.

May be called from multiple processes with the same inputs to get the same result.

Fused runs UDFs from top to bottom each time code changes. This means objects in the UDF are recreated each time, which can slow down a UDF that downloads files from a remote server.

💡 Downloaded files are written to a mounted volume shared across all UDFs in an organization. This means that a file downloaded by one UDF can be read by other UDFs.

Fused addresses the latency of downloading files with the download utility function. It stores files in the mounted filesystem so they only download the first time.

💡 Because a Tile UDF runs multiple chunks in parallel, the download function sets a signal lock during the first download attempt, to ensure the download happens only once.

Arguments:

  • url str - The URL to download.
  • file_path str - The local path where to save the file.

Returns:

The function downloads the file only on the first execution and returns the file path.

Examples:

@fused.udf
def geodataframe_from_geojson():
import geopandas as gpd
url = "s3://sample_bucket/my_geojson.zip"
path = fused.core.download(url, "tmp/my_geojson.zip")
gdf = gpd.read_file(path)
return gdf

utils

A module to access utility functions located in the UDF called "common".

They can be imported by other UDFs with utils = fused.utils.common. They contain common operations such as:

  • read_shape_zip
  • url_to_arr
  • get_collection_bbox
  • read_tiff_pc
  • table_to_tile
  • rasterize_geometry

Examples:

This example shows how to access the geo_buffer function from the common UDF.

import fused
import geopandas as gpd

gdf = gpd.read_file('https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/11_DISTRICT_OF_COLUMBIA/11/tl_rd22_11_bg.zip')
gdf_buffered = fused.utils.common.geo_buffer(gdf, 10)

This example shows how to load a table with table_to_tile, which efficiently loads a table by filtering and adjusting based on the provided bounding box (bbox) and zoom level.

table_path = "s3://fused-asset/infra/census_bg_us"
gdf = fused.utils.common.table_to_tile(
bbox, table_path, use_columns=["GEOID", "geometry"], min_zoom=12
)

This example shows how to use rasterize_geometry to place an input geometry within an image array.

geom_masks = [fused.utils.common.rasterize_geometry(geom, arr.shape[-2:], transform) for geom in gdf.geometry]

Public UDFs are listed at https://github.com/fusedio/udfs/tree/main/public


ingest

def ingest(
input: Union[str, Sequence[str], Path, gpd.GeoDataFrame],
output: Optional[str] = None,
*,
output_metadata: Optional[str] = None,
schema: Optional[Schema] = None,
file_suffix: Optional[str] = None,
load_columns: Optional[Sequence[str]] = None,
remove_cols: Optional[Sequence[str]] = None,
explode_geometries: bool = False,
drop_out_of_bounds: Optional[bool] = None,
partitioning_method: Literal["area", "length", "coords", "rows"] = "rows",
partitioning_maximum_per_file: Union[int, float, None] = None,
partitioning_maximum_per_chunk: Union[int, float, None] = None,
partitioning_max_width_ratio: Union[int, float] = 2,
partitioning_max_height_ratio: Union[int, float] = 2,
partitioning_force_utm: Literal["file", "chunk", None] = "chunk",
partitioning_split_method: Literal["mean", "median"] = "mean",
subdivide_method: Literal["area", None] = None,
subdivide_start: Optional[float] = None,
subdivide_stop: Optional[float] = None,
split_identical_centroids: bool = True,
target_num_chunks: int = 5000,
lonlat_cols: Optional[Tuple[str, str]] = None,
partitioning_schema_input: Optional[str] = None,
gdal_config: Union[GDALOpenConfig, Dict[str, Any], None] = None
) -> GeospatialPartitionJobStepConfig

Ingest a dataset into the Fused partitioned format.

Arguments:

  • input - A GeoPandas GeoDataFrame or a path to file or files on S3 to ingest. Files may be Parquet or another geo data format.

  • output - Location on S3 to write the main table to.

  • output_metadata - Location on S3 to write the fused table to.

  • schema - Schema of the data to be ingested. This is optional and will be inferred from the data if not provided.

  • file_suffix - filter which files are used for ingestion. If input is a directory on S3, all files under that directory will be listed and used for ingestion. If file_suffix is not None, it will be used to filter paths by checking the trailing characters of each filename. E.g. pass file_suffix=".geojson" to include only GeoJSON files inside the directory.

  • load_columns - Read only this set of columns when ingesting geospatial datasets. Defaults to all columns.

  • remove_cols - The named columns to drop when ingesting geospatial datasets. Defaults to not drop any columns.

  • explode_geometries - Whether to unpack multipart geometries to single geometries when ingesting geospatial datasets, saving each part as its own row. Defaults to False.

  • drop_out_of_bounds - Whether to drop geometries outside of the expected WGS84 bounds. Defaults to True.

  • partitioning_method - The method to use for grouping rows into partitions. Defaults to "rows".

    • "area": Construct partitions where all contain a maximum total area among geometries.
    • "length": Construct partitions where all contain a maximum total length among geometries.
    • "coords": Construct partitions where all contain a maximum total number of coordinates among geometries.
    • "rows": Construct partitions where all contain a maximum number of rows.
  • partitioning_maximum_per_file - Maximum value for partitioning_method to use per file. If None, defaults to 1/10th of the total value of partitioning_method. So if the value is None and partitioning_method is "area", then each file will be have no more than 1/10th the total area of all geometries. Defaults to None.

  • partitioning_maximum_per_chunk - Maximum value for partitioning_method to use per chunk. If None, defaults to 1/100th of the total value of partitioning_method. So if the value is None and partitioning_method is "area", then each file will have no more than 1/100th the total area of all geometries. Defaults to None.

  • partitioning_max_width_ratio - The maximum ratio of width to height of each partition to use in the ingestion process. So for example, if the value is 2, then if the width divided by the height is greater than 2, the box will be split in half along the horizontal axis. Defaults to 2.

  • partitioning_max_height_ratio - The maximum ratio of height to width of each partition to use in the ingestion process. So for example, if the value is 2, then if the height divided by the width is greater than 2, the box will be split in half along the vertical axis. Defaults to 2.

  • partitioning_force_utm - Whether to force partitioning within UTM zones. If set to "file", this will ensure that the centroid of all geometries per file are contained in the same UTM zone. If set to "chunk", this will ensure that the centroid of all geometries per chunk are contained in the same UTM zone. If set to None, then no UTM-based partitioning will be done. Defaults to "chunk".

  • partitioning_split_method - How to split one partition into children. Defaults to "mean" (this may change in the future).

    • "mean": Split each axis according to the mean of the centroid values.
    • "median": Split each axis according to the median of the centroid values.
  • subdivide_method - The method to use for subdividing large geometries into multiple rows. Currently the only option is "area", where geometries will be subdivided based on their area (in WGS84 degrees).

  • subdivide_start - The value above which geometries will be subdivided into smaller parts, according to subdivide_method.

  • subdivide_stop - The value below which geometries will never be subdivided into smaller parts, according to subdivide_method.

  • split_identical_centroids - If True, should split a partition that has identical centroids (such as if all geometries in the partition are the same) if there are more such rows than defined in "partitioning_maximum_per_file" and "partitioning_maximum_per_chunk".

  • target_num_chunks - The target for the number of chunks if partitioning_maximum_per_file is None. Note that this number is only a target and the actual number of files and chunks generated can be higher or lower than this number, depending on the spatial distribution of the data itself.

  • lonlat_cols - Names of longitude, latitude columns to construct point geometries from.

  • partitioning_schema_input - Path to Parquet file with pre-defined partitioning schema

    If your point columns are named "x" and "y", then pass:

    fused.ingest(
    ...,
    lonlat_cols=("x", "y")
    )

    This only applies to reading from Parquet files. For reading from CSV files, pass options to gdal_config.

  • gdal_config - Configuration options to pass to GDAL for how to read these files. For all files other than Parquet files, Fused uses GDAL as a step in the ingestion process. For some inputs, like CSV files or zipped shapefiles, you may need to pass some parameters to GDAL to tell it how to open your files.

    This config is expected to be a dictionary with up to two keys:

    • layer: str. Define the layer of the input file you wish to read when the source contains multiple layers, as in GeoPackage.
    • open_options: Dict[str, str]. Pass in key-value pairs with GDAL open options. These are defined on each driver's page in the GDAL documentation. For example, the CSV driver defines these open options you can pass in.

    For example, if you're ingesting a CSV file with two columns "longitude" and "latitude" denoting the coordinate information, pass

    fused.ingest(
    ...,
    gdal_config={
    "open_options": {
    "X_POSSIBLE_NAMES": "longitude",
    "Y_POSSIBLE_NAMES": "latitude",
    }
    }
    )

Returns:

Configuration object describing the ingestion process. Call .run_remote on this object to start a job.

Examples:

    job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/",
output_metadata="s3://fused-sample/census/ca_bg_2022/fused/",
explode_geometries=True,
partitioning_maximum_per_file=2000,
partitioning_maximum_per_chunk=200,
)
job.run_remote()

job.run_remote

def run_remote(output_table: Optional[str] = ...,
instance_type: Optional[WHITELISTED_INSTANCE_TYPES] = None,
*,
region: str | None = None,
disk_size_gb: int | None = None,
additional_env: List[str] | None = None,
image_name: Optional[str] = None,
ignore_no_udf: bool = False,
ignore_no_output: bool = False,
validate_imports: Optional[bool] = None,
validate_inputs: bool = True,
overwrite: Optional[bool] = None) -> RunResponse

Begin execution of the ingestion job by calling run_remote on the job object.

Arguments:

  • output_table - The name of the table to write to. Defaults to None.
  • instance_type - The AWS EC2 instance type to use for the job. Acceptable strings are m5.large, m5.xlarge, m5.2xlarge, m5.4xlarge, m5.8xlarge, m5.12xlarge, m5.16xlarge, r5.large, r5.xlarge, r5.2xlarge, r5.4xlarge, r5.8xlarge, r5.12xlarge, or r5.16xlarge. Defaults to None.
  • region - The AWS region in which to run. Defaults to None.
  • disk_size_gb - The disk size to specify for the job. Defaults to None.
  • additional_env - Any additional environment variables to be passed into the job. Defaults to None.
  • image_name - Custom image name to run. Defaults to None for default image.
  • ignore_no_udf - Ignore validation errors about not specifying a UDF. Defaults to False.
  • ignore_no_output - Ignore validation errors about not specifying output location. Defaults to False.

Monitor and manage job

Calling run_remote returns a RunResponse object with helper methods.

# Declare ingest job
job = fused.ingest(
input="https://www2.census.gov/geo/tiger/TIGER_RD18/STATE/06_CALIFORNIA/06/tl_rd22_06_bg.zip",
output="s3://fused-sample/census/ca_bg_2022/main/"
)

# Start ingest job
job_id = job.run_remote()

Fetch the job status.

job_id.get_status()

Fetch and print the job's logs.

job_id.print_logs()

Determine the job's execution time.

job_id.get_exec_time()

Continuously print the job's logs.

job_id.tail_logs()

Cancel the job.

job_id.cancel()

file_path

def file_path(file_path: str, mkdir: bool = True) -> str

Creates a directory in a predefined temporary directory.

This gives users the ability to manage directories during the execution of a UDF. It takes a relative file_path, creates the corresponding directory structure, and returns its absolute path.

This is useful for UDFs that temporarily store intermediate results as files, such as when writing intermediary files to disk when processing large datasets. file_path ensures that necessary directories exist.

Arguments:

  • file_path: The file path to locate.
  • mkdir: If True, create the directory if it doesn't already exist. Defaults to True.

Returns:

The located file path.

Examples:

When run locally, file_path returns the local /tmp/fused/ directory on your machine. If called in a UDF is run on Fused servers, the returned path will be of the shared mounted storage /mount. Because file_path on a batch job using job.run_remote() returns the directory of the remote instance, it recommended to write data directly to /mount.

# in a Jupyter notebook
import fused

path=fused.file_path('hello')
print(path)

Directory located (and created if not already there):

>>> /tmp/fused/hello

get_chunks_metadata

def get_chunks_metadata(url: str) -> "gpd.GeoDataFrame"

Returns a GeoDataFrame with each chunk in the table as a row.

Arguments:

  • url - URL of the table.

get_chunk_from_table

def get_chunk_from_table(
url: str,
file_id: Union[str, int, None],
chunk_id: Optional[int],
*,
columns: Optional[Iterable[str]] = None) -> "gpd.GeoDataFrame"

Returns a chunk from a table and chunk coordinates.

This can be called with file_id and chunk_id from get_chunks_metadata.

Arguments:

  • url - URL of the table.
  • file_id - File ID to read.
  • chunk_id - Chunk ID to read.
  • columns - Read only the specified columns.