Skip to main content

Udf (class)

class Udf

A user-defined function. The Udf class is the object returned when decorating a function with @fused.udf, or when loading a saved UDF with fused.load().

Calling a UDF

Call this UDF. UDFs are callable; pass arguments as for a regular function, plus optional engine, cache_max_age, and cache:

udf(
*args, # positional arguments to pass to the UDF
engine=None, # "remote" (default), "local", or "small"/"medium"/"large"
cache_max_age=None, # max cache age, e.g. "48h", "10s"; None uses @fused.udf() default
cache=True, # set False to disable caching
**kwargs, # keyword arguments to pass to the UDF
)
  • engine: "remote" (default), "local", or batch instance type ("small", "medium", "large"). Other values are interpreted as batch instance type.
  • cache_max_age: Maximum age when returning a result from the cache (e.g. "48h", "10s"). Default None so the UDF follows cache_max_age from @fused.udf() unless overridden.
  • cache: Set to False as a shortcut for cache_max_age='0s' to disable caching. (Default True.)
  • Returns: The result of the UDF execution.

Learn more about engine options and caching.

@fused.udf
def udf():
# Call directly
result = my_udf(x=5, multiplier=3) # Returns 15

# Run locally
result = my_udf(x=5, engine="local")

# Run on a dedicated instance
result = my_udf(x=5, engine="small")

# Cache results for 1 hour
result = my_udf(x=5, cache_max_age="1h")

# Disable caching
result = my_udf(x=5, cache=False)

return result

@fused.udf
def my_udf(x: int, multiplier: int = 2):
return x * multiplier

Parallel execution

Use .map() to run a UDF in parallel across multiple inputs:

@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3])
df = pool.df()
return df

@fused.udf
def my_udf(x: int):
return x ** 2

cache_max_age

The maximum age when returning a result from the cache.


catalog_url

Returns the link to open this UDF in the Workbench Catalog, or None if the UDF is not saved.

udf = fused.load("my_udf")
print(udf.catalog_url)
# https://www.fused.io/workbench/catalog/my_udf-abc123

code

The source code of the UDF as a string.


collection_id

The ID of the collection this UDF belongs to.


collection_name

The name of the collection this UDF belongs to.


create_access_token

udf.create_access_token(
*,
client_id=None, # override realtime environment
public_read=None, # allow public read access
access_scope=None, # access scope for the token
cache=True, # enable caching on the token
metadata_json=None, # additional metadata for tiles
enabled=True, # whether the token is active
) -> UdfAccessToken

Create an access token for sharing this UDF.


delete_cache

udf.delete_cache()

Delete cached results for this UDF.


delete_saved

udf.delete_saved(
inplace=True, # update this object to reflect deletion
)

Delete this UDF from the Fused service.


disk_size_gb

The size of the disk in GB to use for remote execution. Used in batch jobs.


entrypoint

Name of the function within the code to invoke.


from_gist

@classmethod
Udf.from_gist(
gist_id, # the GitHub gist ID
) -> Udf

Create a UDF from a GitHub gist.


get_access_token

udf.get_access_token() -> UdfAccessToken | None

Get the first access token, or None if none exists.


get_access_tokens

udf.get_access_tokens() -> UdfAccessTokenList

Get all access tokens for this UDF.


get_schedule

udf.get_schedule() -> CronJobSequence

Retrieve scheduled runs of this UDF.


engine

The engine to run this UDF on by default, if not specified in fused.run(), e.g., "small"/"medium"/"large".


instance_type [Deprecated]

[Deprecated] Use engine instead. This property is an alias for engine.


map

udf.map(
arg_list,
*,
engine=None,
cache_max_age=None,
max_workers=None,
worker_concurrency=None,
cache=True,
max_retry=2,
) -> JobPool

Submit a job for each element in arg_list.

  • arg_list: A list of arguments to pass to the UDF. Each element in arg_list will become a job and run.
  • engine: "remote" (default), "local", or "small"/"medium"/"large". Other values are interpreted as a batch instance type.
  • max_workers: For realtime: number of instances (default 32). For local: number of threads (default 1). For batch: number of worker machines (default 1).
  • worker_concurrency: For realtime: arguments per instance (default 1). For batch: processes per worker. Not settable for local.
  • cache_max_age: Max age when returning a result from the cache (e.g. "48h", "10s"). Default None to follow @fused.udf().
  • cache: Set to False to disable caching. (Default True.)
  • max_retry: Max retries for failed jobs (default 2). Retries only if the pool is waited on (e.g. pool.wait(), pool.tail(), pool.df()).

Returns: A JobPool object. Call .df() to get the results.

Example:

@fused.udf
def my_udf(x: int):
return x * 2

pool = my_udf.map([1, 2, 3])
df = pool.df() # Returns [2, 4, 6]

map_async

await udf.map_async(
arg_list,
*,
engine=None,
max_workers=None,
cache_max_age=None,
cache=True,
max_retry=2,
) -> AsyncJobPool

Submit a job for each element in arg_list. (Async; use in async contexts.)

  • arg_list: A list of arguments to pass to the UDF. Each element will become a job and run.
  • engine: "remote" (default) or "local". Note: batch instance types are not supported for async map.
  • max_workers: For realtime: number of instances (default 32). For local: number of threads (default 1).
  • cache_max_age: Max age when returning a result from the cache. Default None to follow @fused.udf().
  • cache: Set to False to disable caching. (Default True.)
  • max_retry: Max retries for failed jobs (default 2). Retries only if the pool is waited on.

Note: worker_concurrency is not supported for async map.

Returns: An AsyncJobPool object. Call .df() to get the results.

Example:

@fused.udf
def my_udf(x: int):
return x ** 2

# In an async context
pool = await my_udf.map_async([1, 2, 3, 4, 5])
await pool.wait_async()
results = await pool.results_async()

metadata

Optional dictionary of metadata associated with the UDF.


name

The name of the UDF. Defaults to the function name.


parameters

Parameters to pass into the entrypoint.


region

The region to use for remote execution. Used in batch jobs.


render

udf.render()

Render the UDF code in a Jupyter notebook cell. Only works in Jupyter environments.


schedule

udf.schedule(
minute, # minute(s) to run (0-59)
hour, # hour(s) to run (0-23)
day_of_month=None, # day(s) of month (1-31)
month=None, # month(s) (1-12)
day_of_week=None, # day(s) of week (0-6, 0=Sunday)
udf_args=None, # arguments to pass to the UDF
enabled=True, # whether the schedule is active
) -> CronJob

Schedule this UDF to run on a cron schedule.

Example:

# Run every day at 8:00 AM
udf.schedule(minute=0, hour=8)

# Run every Monday at noon
udf.schedule(minute=0, hour=12, day_of_week=1)

shared_url

udf.shared_url(
format=None, # result format (file type) for the URL
) -> str | None

Get the shared URL for this UDF.


to_directory

udf.to_directory(
where=None, # directory path (defaults to UDF name)
*,
overwrite=False, # allow overwriting
)

Write the UDF to disk as a directory.


to_file

udf.to_file(
where, # file path or file-like object
*,
overwrite=False, # allow overwriting
)

Write the UDF to disk as a zip file.


to_fused

udf.to_fused(
*,
overwrite=None, # if True, overwrite existing remote UDF
collection_name=None, # collection to save to (default: "default")
)

Save this UDF to the Fused service.