Udf (class)
class Udf
A Fused UDF (User-Defined Function) is a callable Python function that runs on Fused infrastructure.
The Udf class is the object returned when decorating a function with @fused.udf, or when loading a saved UDF with fused.load().
Calling a UDF
UDFs are callable objects. Call them directly like regular Python functions:
udf(
*args, # positional arguments to pass to the UDF
engine=None, # "remote" (default), "local", or instance type
cache_max_age=None, # max cache age, e.g. "48h", "10s"
cache=True, # set False to disable caching
**kwargs, # keyword arguments to pass to the UDF
)
Learn more about engine options and caching.
@fused.udf
def my_udf(x: int, multiplier: int = 2):
return x * multiplier
# Call directly
result = my_udf(x=5, multiplier=3) # Returns 15
# Run locally
result = my_udf(x=5, engine="local")
# Run on a dedicated instance
result = my_udf(x=5, engine="medium")
# Cache results for 1 hour
result = my_udf(x=5, cache_max_age="1h")
# Disable caching
result = my_udf(x=5, cache=False)
Parallel execution
Use .map() to run a UDF in parallel across multiple inputs:
@fused.udf
def my_udf(x: int):
return x * 2
pool = my_udf.map([1, 2, 3])
df = pool.collect() # Returns [2, 4, 6]
cache_max_age
Maximum cache age in seconds. None for default caching behavior.
catalog_url
Returns the URL to open this UDF in Workbench, or None if not saved.
udf = fused.load("my_udf")
print(udf.catalog_url)
# https://www.fused.io/workbench/catalog/my_udf-abc123
code
The source code of the UDF as a string.
collection_id
The ID of the collection this UDF belongs to.
collection_name
The name of the collection this UDF belongs to.
create_access_token
udf.create_access_token(
*,
client_id=None, # override realtime environment
public_read=None, # allow public read access
access_scope=None, # access scope for the token
cache=True, # enable caching on the token
metadata_json=None, # additional metadata for tiles
enabled=True, # whether the token is active
) -> UdfAccessToken
Create an access token for sharing this UDF.
delete_cache
udf.delete_cache()
Delete cached results for this UDF.
delete_saved
udf.delete_saved(
inplace=True, # update this object to reflect deletion
)
Delete this UDF from the Fused service.
disk_size_gb
Disk size in GB for batch execution.
entrypoint
The name of the entrypoint function within the UDF code.
from_gist
@classmethod
Udf.from_gist(
gist_id, # the GitHub gist ID
) -> Udf
Create a UDF from a GitHub gist.
get_access_token
udf.get_access_token() -> UdfAccessToken | None
Get the first access token, or None if none exists.
get_access_tokens
udf.get_access_tokens() -> UdfAccessTokenList
Get all access tokens for this UDF.
get_schedule
udf.get_schedule() -> CronJobSequence
Retrieve scheduled runs of this UDF.
engine
Default engine for execution ("remote", "local", "realtime", or batch instance types like "small", "medium", "large").
instance_type [Deprecated]
[Deprecated] Use engine instead. This property is an alias for engine.
map
udf.map(
arg_list, # list of arguments, each element becomes a job
*,
engine=None, # "remote" (default), "local", or instance type
cache_max_age=None, # max cache age
max_workers=None, # max parallel workers (default 32 remote, 1 local/batch)
worker_concurrency=None, # concurrency per worker
cache=True, # set False to disable caching
max_retry=2, # max retries for failed jobs
) -> JobPool
Parallel map over a set of inputs. Returns a JobPool object.
Learn more about engine options and caching.
Each element in arg_list becomes a separate job. Pass a list of values for single-parameter UDFs, or a list of dicts for multi-parameter UDFs.
Example:
@fused.udf
def my_udf(x: int):
return x * 2
pool = my_udf.map([1, 2, 3])
df = pool.collect() # Returns [2, 4, 6]
map_async
async udf.map_async(
arg_list, # list of arguments, each element becomes a job
*,
engine=None, # "remote" (default) or "local" only
max_workers=None, # max parallel workers
cache_max_age=None, # max cache age
cache=True, # set False to disable caching
max_retry=2, # max retries for failed jobs
) -> JobPool
Async version of map() for use in async contexts. Does not support batch instance types or worker_concurrency.
Learn more about caching.
@fused.udf
def my_udf(x: int):
return x ** 2
# In an async context
pool = await my_udf.map_async([1, 2, 3, 4, 5])
await pool.wait_async()
results = await pool.results_async()
metadata
Optional dictionary of metadata associated with the UDF.
name
The name of the UDF. Defaults to the function name.
parameters
Dictionary of default parameters for the UDF.
region
AWS region for execution.
render
udf.render()
Render the UDF code in a Jupyter notebook cell. Only works in Jupyter environments.
schedule
udf.schedule(
minute, # minute(s) to run (0-59)
hour, # hour(s) to run (0-23)
day_of_month=None, # day(s) of month (1-31)
month=None, # month(s) (1-12)
day_of_week=None, # day(s) of week (0-6, 0=Sunday)
udf_args=None, # arguments to pass to the UDF
enabled=True, # whether the schedule is active
) -> CronJob
Schedule this UDF to run on a cron schedule.
Example:
# Run every day at 8:00 AM
udf.schedule(minute=0, hour=8)
# Run every Monday at noon
udf.schedule(minute=0, hour=12, day_of_week=1)
shared_url
udf.shared_url(
format=None, # result format (file type) for the URL
) -> str | None
Get the shared URL for this UDF.
to_directory
udf.to_directory(
where=None, # directory path (defaults to UDF name)
*,
overwrite=False, # allow overwriting
)
Write the UDF to disk as a directory.
to_file
udf.to_file(
where, # file path or file-like object
*,
overwrite=False, # allow overwriting
)
Write the UDF to disk as a zip file.
to_fused
udf.to_fused(
*,
overwrite=None, # if True, overwrite existing remote UDF
collection_name=None, # collection to save to (default: "default")
)
Save this UDF to the Fused service.