Top-Level Functions
def udf(
fn: Optional[Callable] = None,
schema: Union[Schema, Dict, None] = None,
name: Optional[str] = None,
cache_max_age: Optional[str] = None,
default_parameters: Optional[Dict[str, Any]] = None,
headers: Optional[Sequence[Union[str, Header]]] = None
) -> Callable[..., GeoPandasUdfV2Callable]
A decorator that transforms a function into a Fused UDF.
Schema | Dict | None - The schema for the DataFrame returned by the UDF. The schema may be either a string (in the form "field_name:DataType field_name2:DataType", or as JSON), as a Python dictionary representing the schema, or a Schema model object. Defaults to None, in which case a schema must be evaluated by calling run_local for a job to be able to write output. The return value of run_local will also indicate how to include the schema in the decorator so run_local does not need to be run
str | None - The name of the UDF object. Defaults to the name of the function.cache_max_age
str | None - The maximum age when returning a result from the cache. Supported units are seconds (s), minutes (m), hours (h), and days (d) (e.g. “48h”, “10s”, etc.). Defaults to"90d"
Dict[str, Any] | None - Parameters to embed in the UDF object, separately from the arguments list of the function. Defaults to None for empty parameters.headers
Sequence[str | Header] | None - A list of files to include as modules when running the UDF. For example, when specifying headers=[''], inside the UDF function it may be referenced as:Defaults to None for no headers.import my_header
Callable | None - The function to decorate.
- A callable that represents the transformed UDF. This callable can be used within GeoPandas workflows to apply the defined operation on geospatial data.
To create a simple UDF that calls a utility function to calculate the area of geometries in a GeoDataFrame:
def udf(bounds, table_path="s3://fused-asset/infra/building_msft_us"):
gdf = table_to_tile(bounds, table=table_path)
return gdf
def cache(
func: Optional[Callable[..., Any]] = None,
cache_max_age: Union[str, int] = '12h',
path: str = 'tmp',
concurrent_lock_timeout: int = 120,
**kwargs: Any
) -> Callable[..., Any]
Decorator to cache the return value of a function.
This function serves as a decorator that can be applied to any function to cache its return values. The cache behavior can be customized through keyword arguments.
Callable, optional - The function to be decorated. If None, this returns a partial decorator with the passed keyword arguments.cache_max_age
str | int - A string with a numbered component and units. Supported units are seconds (s), minutes (m), hours (h), and days (d) (e.g. “48h”, “10s”, etc.). Defaults to"12h"
str - Folder to append to the configured cache directory.concurrent_lock_timeout
int - Max amount of time in seconds for subsequent concurrent calls to wait for a previous concurrent call to finish execution and to write the cache file.**kwargs
- Arbitrary keyword arguments that are passed to the internal caching mechanism. These could specify cache size, expiration time, and other cache-related settings.
- A decorator that, when applied to a function, caches its return values according to the specified keyword arguments.
Use the @cache
decorator to cache the return value of a function in a custom path.
def expensive_function():
# Function implementation goes here
return result
If the output of a cached function changes, for example, if remote data is modified, it can be reset by running the function with the reset
keyword argument. Afterward,
the argument can be cleared.
@cache(path="/mount/custom_path/", reset=True)
def expensive_function():
# Function implementation goes here
return result
def load(url_or_udf: Union[str, Path], *, cache_key: Any = None) -> AnyBaseUdf
Loads a UDF from various sources including GitHub URLs, local files, or directories, and a Fused platform-specific identifier.
This function supports loading UDFs from a GitHub repository URL, a local file path, a directory containing UDF definitions, or a Fused platform-specific identifier composed of an email and UDF name. It intelligently determines the source type based on the format of the input and retrieves the UDF accordingly.
- A string or Path object representing the location of the UDF. This can be a GitHub URL starting with "", a local file path, a directory containing one or more UDFs, or a Fused platform-specific identifier in the format "email/udf_name".cache_key
- An optional key used for caching the loaded UDF. If provided, the function will attempt to load the UDF from cache using this key before attempting to load it from the specified source. Defaults to None, indicating no caching.
- An instance of the loaded UDF.
- If a local file or directory path is provided but does not exist.ValueError
- If the URL or Fused platform-specific identifier format is incorrect or cannot be parsed.Exception
- For errors related to network issues, file access permissions, or other unforeseen errors during the loading process.
Load a UDF from a GitHub URL:
udf = fused.load("")
Load a UDF from a local file:
udf = fused.load("/localpath/REM_with_HyRiver/")
Load a UDF using a Fused platform-specific identifier:
udf = fused.load("")
def run(
udf: Optional[Union[str, GeoPandasUdfV2, UdfJobStepConfig]] = None,
*args: Any,
x: Optional[int] = None,
y: Optional[int] = None,
z: Optional[int] = None,
sync: bool = True,
engine: Optional[Literal["remote", "local"]] = None,
type: Optional[Literal["tile", "file"]] = None,
max_retry: int = 0,
cache_max_age: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
_include_log: bool = False,
_return_response: bool = False,
**kw_parameters: Any
) -> Any
Executes a user-defined function (UDF) with various execution and input options.
This function supports executing UDFs in different environments (local or remote), with different types of inputs (tile coordinates, geographical bounding boxes, etc.), and allows for both synchronous and asynchronous execution. It dynamically determines the execution path based on the provided parameters.
- The UDF to execute. The UDF can be specified in several ways:- A string representing a UDF name or UDF shared token.
- A UDF object.
- A UdfJobStepConfig object for detailed execution configuration.
int | None - Tile coordinates for tile-based UDF execution.y
int | None - Tile coordinates for tile-based UDF execution.z
int | None - Tile coordinates for tile-based UDF execution.sync
bool - If True, execute the UDF synchronously. If False, execute asynchronously.engine
Literal["remote", "local"] | None - The execution engine to use.type
Literal["tile", "file"] | None - The type of UDF execution ('tile' or 'file').max_retry
int - The maximum number of retries to attempt if the UDF fails. By default does not retry.cache_max_age
str | None - The maximum age when returning a result from the cache. Supported units are seconds (s), minutes (m), hours (h), and days (d) (e.g. “48h”, “10s”, etc.). Default isNone
so a UDF run
will followcache_max_age
defined in@fused.udf()
unless this value is changed.parameters
Dict[str, Any] | None - Additional parameters to pass to the UDF._include_log
bool | None_return_response
bool | None**kw_parameters
- Additional parameters to pass to the UDF.
- If the UDF is not specified or is specified in more than one way.TypeError
- If the first parameter is not of an expected type.Warning
- Various warnings are issued for ignored parameters based on the execution path chosen.
The result of the UDF execution, which varies based on the UDF and execution path.
Run a UDF saved in the Fused system:"")
Run a UDF saved in GitHub:
loaded_udf = fused.load(""), bounds=bounds)
Run a UDF saved in a local directory:
loaded_udf = fused.load("/Users/local/dir/Building_Tile_Example"), bounds=bounds)
This function dynamically determines the execution path and parameters based on the inputs. It is designed to be flexible and support various UDF execution scenarios.
Because the output must be serializable and returned via an HTTP response, Fused validates the output of UDFs that execute remotely with the realtime
engine and will hold back invalid responses. This might result in perceived inconsistencies because running locally with the local
engine does not validate and will instead return any output. See the set of supported return object types here.
import fused
def udf(x=1):
return x, engine='local') # Returns 1, engine='remote') # Returns None because output is not a valid response object
def submit(
udf: Callable[..., Any],
arg_list: Union[List[Any], List[Dict[str, Any]], pd.DataFrame],
engine: Optional[Literal["remote", "local"]] = 'remote',
max_workers: Optional[int] = None,
max_retry: int = 2,
debug_mode: bool = False,
wait_on_results: bool = False,
**kwargs: Any
) -> JobPool
Executes a user-defined function (UDF) multiple times for a list of input parameters, and returns immediately a “lazy” JobPool object allowing to inspect the jobs and wait on the results.
for more details on the UDF execution.
- The UDF to execute.
for more details on how to specify the UDF.arg_list
- A list of input parameters for the UDF. Can be specified as:- A list of values for parametrizing over a single parameter, i.e., the first parameter of the UDF.
- A list of dictionaries for parametrizing over multiple parameters.
- A DataFrame for parametrizing over multiple parameters where each row is a set of parameters.
str - The execution engine to use. Defaults to 'remote'.max_workers
int | None - The maximum number of workers to use. Defaults to 32.max_retry
int - The maximum number of retries for failed jobs. Defaults to 2.debug_mode
bool - If True, executes only the first item in arg_list directly
. Useful for debugging UDF execution. Default is False.wait_on_results
bool - If True, waits for all jobs to complete and returns a DataFrame containing the results. If False, returns a JobPool object. Default is False.**kwargs
- Additional keyword arguments to pass to the UDF.
- A JobPool object that allows inspecting the jobs and waiting on the results.
Running a UDF with values passed as first positional argument:
pool = fused.submit(udf, [dict(n=i) for i in range(10)])
Being explicit about parameter names:
pool = fused.submit(udf, [dict(n=i) for i in range(10)])
def download(url: str, file_path: str) -> str
Download a file.
May be called from multiple processes with the same inputs to get the same result.
Fused runs UDFs from top to bottom each time code changes. This means objects in the UDF are recreated each time, which can slow down a UDF that downloads files from a remote server.
💡 Downloaded files are written to a mounted volume shared across all UDFs in an organization. This means that a file downloaded by one UDF can be read by other UDFs.
Fused addresses the latency of downloading files with the download utility function. It stores files in the mounted filesystem so they only download the first time.
💡 Because a Tile UDF runs multiple chunks in parallel, the download function sets a signal lock during the first download attempt, to ensure the download happens only once.
str - The URL to download.file_path
str - The local path where to save the file.
The function downloads the file only on the first execution and returns the file path.
def geodataframe_from_geojson():
import geopandas as gpd
url = "s3://sample_bucket/"
path =, "tmp/")
gdf = gpd.read_file(path)
return gdf
def ingest(
input: Union[str, Sequence[str], Path, gpd.GeoDataFrame],
output: Optional[str] = None,
output_metadata: Optional[str] = None,
schema: Optional[Schema] = None,
file_suffix: Optional[str] = None,
load_columns: Optional[Sequence[str]] = None,
remove_cols: Optional[Sequence[str]] = None,
explode_geometries: bool = False,
drop_out_of_bounds: Optional[bool] = None,
partitioning_method: Literal["area", "length", "coords", "rows"] = "rows",
partitioning_maximum_per_file: Union[int, float, None] = None,
partitioning_maximum_per_chunk: Union[int, float, None] = None,
partitioning_max_width_ratio: Union[int, float] = 2,
partitioning_max_height_ratio: Union[int, float] = 2,
partitioning_force_utm: Literal["file", "chunk", None] = "chunk",
partitioning_split_method: Literal["mean", "median"] = "mean",
subdivide_method: Literal["area", None] = None,
subdivide_start: Optional[float] = None,
subdivide_stop: Optional[float] = None,
split_identical_centroids: bool = True,
target_num_chunks: int = 5000,
lonlat_cols: Optional[Tuple[str, str]] = None,
partitioning_schema_input: Optional[str] = None,
gdal_config: Union[GDALOpenConfig, Dict[str, Any], None] = None
) -> GeospatialPartitionJobStepConfig
Ingest a dataset into the Fused partitioned format.
- A GeoPandasGeoDataFrame
or a path to file or files on S3 to ingest. Files may be Parquet or another geo data format. -
- Location on S3 to write themain
table to. -
- Location on S3 to write thefused
table to. -
- Schema of the data to be ingested. This is optional and will be inferred from the data if not provided. -
- filter which files are used for ingestion. Ifinput
is a directory on S3, all files under that directory will be listed and used for ingestion. Iffile_suffix
is not None, it will be used to filter paths by checking the trailing characters of each filename. E.g. passfile_suffix=".geojson"
to include only GeoJSON files inside the directory. -
- Read only this set of columns when ingesting geospatial datasets. Defaults to all columns. -
- The named columns to drop when ingesting geospatial datasets. Defaults to not drop any columns. -
- Whether to unpack multipart geometries to single geometries when ingesting geospatial datasets, saving each part as its own row. Defaults toFalse
. -
- Whether to drop geometries outside of the expected WGS84 bounds. Defaults to True. -
- The method to use for grouping rows into partitions. Defaults to"rows"
: Construct partitions where all contain a maximum total area among geometries."length"
: Construct partitions where all contain a maximum total length among geometries."coords"
: Construct partitions where all contain a maximum total number of coordinates among geometries."rows"
: Construct partitions where all contain a maximum number of rows.
- Maximum value forpartitioning_method
to use per file. IfNone
, defaults to 1/10th of the total value ofpartitioning_method
. So if the value isNone
, then each file will be have no more than 1/10th the total area of all geometries. Defaults toNone
. -
- Maximum value forpartitioning_method
to use per chunk. IfNone
, defaults to 1/100th of the total value ofpartitioning_method
. So if the value isNone
, then each file will have no more than 1/100th the total area of all geometries. Defaults toNone
. -
- The maximum ratio of width to height of each partition to use in the ingestion process. So for example, if the value is2
, then if the width divided by the height is greater than2
, the box will be split in half along the horizontal axis. Defaults to2
. -
- The maximum ratio of height to width of each partition to use in the ingestion process. So for example, if the value is2
, then if the height divided by the width is greater than2
, the box will be split in half along the vertical axis. Defaults to2
. -
- Whether to force partitioning within UTM zones. If set to"file"
, this will ensure that the centroid of all geometries per file are contained in the same UTM zone. If set to"chunk"
, this will ensure that the centroid of all geometries per chunk are contained in the same UTM zone. If set toNone
, then no UTM-based partitioning will be done. Defaults to "chunk". -
- How to split one partition into children. Defaults to"mean"
(this may change in the future)."mean"
: Split each axis according to the mean of the centroid values."median"
: Split each axis according to the median of the centroid values.
- The method to use for subdividing large geometries into multiple rows. Currently the only option is"area"
, where geometries will be subdivided based on their area (in WGS84 degrees). -
- The value above which geometries will be subdivided into smaller parts, according tosubdivide_method
. -
- The value below which geometries will never be subdivided into smaller parts, according tosubdivide_method
. -
- IfTrue
, should split a partition that has identical centroids (such as if all geometries in the partition are the same) if there are more such rows than defined in "partitioning_maximum_per_file" and "partitioning_maximum_per_chunk". -
- The target for the number of chunks ifpartitioning_maximum_per_file
is None. Note that this number is only a target and the actual number of files and chunks generated can be higher or lower than this number, depending on the spatial distribution of the data itself. -
- Names of longitude, latitude columns to construct point geometries from. -
- Path to Parquet file with pre-defined partitioning schemaIf your point columns are named
, then pass:fused.ingest(
lonlat_cols=("x", "y")
)This only applies to reading from Parquet files. For reading from CSV files, pass options to
. -
- Configuration options to pass to GDAL for how to read these files. For all files other than Parquet files, Fused uses GDAL as a step in the ingestion process. For some inputs, like CSV files or zipped shapefiles, you may need to pass some parameters to GDAL to tell it how to open your files.This config is expected to be a dictionary with up to two keys:
. Define the layer of the input file you wish to read when the source contains multiple layers, as in GeoPackage.open_options
:Dict[str, str]
. Pass in key-value pairs with GDAL open options. These are defined on each driver's page in the GDAL documentation. For example, the CSV driver defines these open options you can pass in.
For example, if you're ingesting a CSV file with two columns
denoting the coordinate information, passfused.ingest(
"open_options": {
"X_POSSIBLE_NAMES": "longitude",
"Y_POSSIBLE_NAMES": "latitude",
Configuration object describing the ingestion process. Call .run_remote
on this object to start a job.
job = fused.ingest(
def run_remote(output_table: Optional[str] = ...,
instance_type: Optional[WHITELISTED_INSTANCE_TYPES] = None,
region: str | None = None,
disk_size_gb: int | None = None,
additional_env: List[str] | None = None,
image_name: Optional[str] = None,
ignore_no_udf: bool = False,
ignore_no_output: bool = False,
validate_imports: Optional[bool] = None,
validate_inputs: bool = True,
overwrite: Optional[bool] = None) -> RunResponse
Begin execution of the ingestion job by calling run_remote
on the job object.
- The name of the table to write to. Defaults to None.instance_type
- The AWS EC2 instance type to use for the job. Acceptable strings arem5.large
, orr5.16xlarge
. Defaults to None.region
- The AWS region in which to run. Defaults to None.disk_size_gb
- The disk size to specify for the job. Defaults to None.additional_env
- Any additional environment variables to be passed into the job. Defaults to None.image_name
- Custom image name to run. Defaults to None for default image.ignore_no_udf
- Ignore validation errors about not specifying a UDF. Defaults to False.ignore_no_output
- Ignore validation errors about not specifying output location. Defaults to False.
Monitor and manage job
Calling run_remote
returns a RunResponse
object with helper methods.
# Declare ingest job
job = fused.ingest(
# Start ingest job
job_id = job.run_remote()
Fetch the job status.
Fetch and print the job's logs.
Determine the job's execution time.
Continuously print the job's logs.
Cancel the job.
def file_path(file_path: str, mkdir: bool = True) -> str
Creates a directory in a predefined temporary directory.
This gives users the ability to manage directories during the execution of a UDF. It takes a relative file_path
creates the corresponding directory structure, and returns its absolute path.
This is useful for UDFs that temporarily store intermediate results as files,
such as when writing intermediary files to disk when processing large datasets.
ensures that necessary directories exist.
: The file path to locate.mkdir
: If True, create the directory if it doesn't already exist. Defaults to True.
The located file path.
When run locally, file_path
returns the local /tmp/fused/
directory on your machine. If called in a UDF is run on Fused servers, the returned path will be of the shared mounted storage /mount
Because file_path
on a batch job using job.run_remote()
returns the directory of the remote instance, it recommended to write data directly to /mount
- Local
- In a UDF
- In a UDF run locally
- In a UDF run in a batch
# in a Jupyter notebook
import fused
Directory located (and created if not already there):
>>> /tmp/fused/hello
def see_file_path():
You can find the new directory in the File Explorer in Workbench after running this code.
>>> /mount/testing
def see_file_path():
path=fused.file_path('do_not_create', false)
return, engine='local')
Directory returned (but not created):
>>> /tmp/fused/do_not_create
def udf():
job = udf()
Directory of remote instance returned (that we can no longer access):
>>> 2025-01-27 17:40:06.429 | INFO | job2.log:write:160 partition=None - fused.file_path("where_am_i") = '/data/where_am_i'
def get_chunks_metadata(url: str) -> "gpd.GeoDataFrame"
Returns a GeoDataFrame with each chunk in the table as a row.
- URL of the table.
def get_chunk_from_table(
url: str,
file_id: Union[str, int, None],
chunk_id: Optional[int],
columns: Optional[Iterable[str]] = None) -> "gpd.GeoDataFrame"
Returns a chunk from a table and chunk coordinates.
This can be called with file_id and chunk_id from get_chunks_metadata
- URL of the table.file_id
- File ID to read.chunk_id
- Chunk ID to read.columns
- Read only the specified columns.