Udf (class)
class Udf
A user-defined function. The Udf class is the object returned when decorating a function with @fused.udf, or when loading a saved UDF with fused.load().
Calling a UDF
Call this UDF. UDFs are callable; pass arguments as for a regular function, plus optional engine, cache_max_age, and cache:
udf(
*args, # positional arguments to pass to the UDF
engine=None, # "remote" (default), "local", or "small"/"medium"/"large"
cache_max_age=None, # max cache age, e.g. "48h", "10s"; None uses @fused.udf() default
cache=True, # set False to disable caching
**kwargs, # keyword arguments to pass to the UDF
)
- engine: "remote" (default), "local", or batch instance type ("small", "medium", "large"). Other values are interpreted as batch instance type.
- cache_max_age: Maximum age when returning a result from the cache (e.g. "48h", "10s"). Default
Noneso the UDF followscache_max_agefrom@fused.udf()unless overridden. - cache: Set to False as a shortcut for
cache_max_age='0s'to disable caching. (Default True.) - Returns: The result of the UDF execution.
Learn more about engine options and caching.
@fused.udf
def udf():
# Call directly
result = my_udf(x=5, multiplier=3) # Returns 15
# Run locally
result = my_udf(x=5, engine="local")
# Run on a dedicated instance
result = my_udf(x=5, engine="small")
# Cache results for 1 hour
result = my_udf(x=5, cache_max_age="1h")
# Disable caching
result = my_udf(x=5, cache=False)
return result
@fused.udf
def my_udf(x: int, multiplier: int = 2):
return x * multiplier
Parallel execution
Use .map() to run a UDF in parallel across multiple inputs:
@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3])
df = pool.df()
return df
@fused.udf
def my_udf(x: int):
return x ** 2
cache_max_age
The maximum age when returning a result from the cache.
catalog_url
Returns the link to open this UDF in the Workbench Catalog, or None if the UDF is not saved.
udf = fused.load("my_udf")
print(udf.catalog_url)
# https://www.fused.io/workbench/catalog/my_udf-abc123
code
The source code of the UDF as a string.
collection_id
The ID of the collection this UDF belongs to.
collection_name
The name of the collection this UDF belongs to.
create_access_token
udf.create_access_token(
*,
client_id=None, # override realtime environment
public_read=None, # allow public read access
access_scope=None, # access scope for the token
cache=True, # enable caching on the token
metadata_json=None, # additional metadata for tiles
enabled=True, # whether the token is active
) -> UdfAccessToken
Create an access token for sharing this UDF.
delete_cache
udf.delete_cache()
Delete cached results for this UDF.
delete_saved
udf.delete_saved(
inplace=True, # update this object to reflect deletion
)
Delete this UDF from the Fused service.
disk_size_gb
The size of the disk in GB to use for remote execution. Used in batch jobs.
entrypoint
Name of the function within the code to invoke.
from_gist
@classmethod
Udf.from_gist(
gist_id, # the GitHub gist ID
) -> Udf
Create a UDF from a GitHub gist.
get_access_token
udf.get_access_token() -> UdfAccessToken | None
Get the first access token, or None if none exists.
get_access_tokens
udf.get_access_tokens() -> UdfAccessTokenList
Get all access tokens for this UDF.
get_schedule
udf.get_schedule() -> CronJobSequence
Retrieve scheduled runs of this UDF.
engine
The engine to run this UDF on by default, if not specified in fused.run(), e.g., "small"/"medium"/"large".
instance_type [Deprecated]
[Deprecated] Use engine instead. This property is an alias for engine.
map
udf.map(
arg_list,
*,
engine=None,
cache_max_age=None,
max_workers=None,
worker_concurrency=None,
cache=True,
max_retry=2,
) -> JobPool
Submit a job for each element in arg_list.
- arg_list: A list of arguments to pass to the UDF. Each element in arg_list will become a job and run.
- engine: "remote" (default), "local", or "small"/"medium"/"large". Other values are interpreted as a batch instance type.
- max_workers: For realtime: number of instances (default 32). For local: number of threads (default 1). For batch: number of worker machines (default 1).
- worker_concurrency: For realtime: arguments per instance (default 1). For batch: processes per worker. Not settable for local.
- cache_max_age: Max age when returning a result from the cache (e.g. "48h", "10s"). Default
Noneto follow@fused.udf(). - cache: Set to False to disable caching. (Default True.)
- max_retry: Max retries for failed jobs (default 2). Retries only if the pool is waited on (e.g.
pool.wait(),pool.tail(),pool.df()).
Returns: A JobPool object. Call .df() to get the results.
Example:
@fused.udf
def my_udf(x: int):
return x * 2
pool = my_udf.map([1, 2, 3])
df = pool.df() # Returns [2, 4, 6]
map_async
await udf.map_async(
arg_list,
*,
engine=None,
max_workers=None,
cache_max_age=None,
cache=True,
max_retry=2,
) -> AsyncJobPool
Submit a job for each element in arg_list. (Async; use in async contexts.)
- arg_list: A list of arguments to pass to the UDF. Each element will become a job and run.
- engine: "remote" (default) or "local". Note: batch instance types are not supported for async map.
- max_workers: For realtime: number of instances (default 32). For local: number of threads (default 1).
- cache_max_age: Max age when returning a result from the cache. Default
Noneto follow@fused.udf(). - cache: Set to False to disable caching. (Default True.)
- max_retry: Max retries for failed jobs (default 2). Retries only if the pool is waited on.
Note: worker_concurrency is not supported for async map.
Returns: An AsyncJobPool object. Call .df() to get the results.
Example:
@fused.udf
def my_udf(x: int):
return x ** 2
# In an async context
pool = await my_udf.map_async([1, 2, 3, 4, 5])
await pool.wait_async()
results = await pool.results_async()
metadata
Optional dictionary of metadata associated with the UDF.
name
The name of the UDF. Defaults to the function name.
parameters
Parameters to pass into the entrypoint.
region
The region to use for remote execution. Used in batch jobs.
render
udf.render()
Render the UDF code in a Jupyter notebook cell. Only works in Jupyter environments.
schedule
udf.schedule(
minute, # minute(s) to run (0-59)
hour, # hour(s) to run (0-23)
day_of_month=None, # day(s) of month (1-31)
month=None, # month(s) (1-12)
day_of_week=None, # day(s) of week (0-6, 0=Sunday)
udf_args=None, # arguments to pass to the UDF
enabled=True, # whether the schedule is active
) -> CronJob
Schedule this UDF to run on a cron schedule.
Example:
# Run every day at 8:00 AM
udf.schedule(minute=0, hour=8)
# Run every Monday at noon
udf.schedule(minute=0, hour=12, day_of_week=1)
shared_url
udf.shared_url(
format=None, # result format (file type) for the URL
) -> str | None
Get the shared URL for this UDF.
to_directory
udf.to_directory(
where=None, # directory path (defaults to UDF name)
*,
overwrite=False, # allow overwriting
)
Write the UDF to disk as a directory.
to_file
udf.to_file(
where, # file path or file-like object
*,
overwrite=False, # allow overwriting
)
Write the UDF to disk as a zip file.
to_fused
udf.to_fused(
*,
overwrite=None, # if True, overwrite existing remote UDF
collection_name=None, # collection to save to (default: "default")
)
Save this UDF to the Fused service.