Parallel execution
Run a UDF over multiple inputs in parallel.
Using .map()
Call .map() on a UDF to run it over multiple inputs in parallel:
my_udf.map(
arg_list, # list of arguments, each element becomes a job
*,
engine=None, # "remote" (default), "local", or instance type
cache_max_age=None, # max cache age
max_workers=None, # max parallel workers (default 32)
worker_concurrency=None, # concurrency per worker (default 1)
cache=True, # set False to disable caching
max_retry=2, # max retries for failed jobs
) -> JobPool
See the Udf.map() API reference for full details.
Basic example
@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])
# Wait and collect results as DataFrame
df = pool.df()
return df
@fused.udf
def my_udf(x: int):
return x ** 2
Input formats
| Format | Example | Use case |
|---|---|---|
| List | [0, 1, 2, 3] | Single parameter |
| List of dicts | [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}] | Multiple parameters |
Each item becomes a separate job.
@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}])
df = pool.df()
return df
@fused.udf
def my_udf(a: int, b: int):
return a + b
Parameters
engine
Same as running UDFs. Default: "remote"
| Mode | Best for |
|---|---|
"remote" (default) | Many quick jobs |
"small"/"medium"/"large" | Long/heavy jobs |
max_workers
Number of instances to spin up in parallel. Default: 32, Max: 1000.
# Spin up 100 instances in parallel
pool = my_udf.map(inputs, max_workers=100)
max_retry
Max retries per failed job. Default: 2
cache_max_age
Same as running UDFs.
worker_concurrency
Number of arguments to pass to the UDF per instance. Default: 1.
@fused.udf
def udf():
# Running 10 jobs in parallel but with 4 jobs per UDF instance
pool = my_udf.map(range(10), worker_concurrency=4)
df = pool.df()
return df
@fused.udf
def my_udf(x: int):
return x ** 2
Working with results
.map() returns a JobPool object:
pool = my_udf.map(inputs)
pool.wait() # Show progress bar
pool.total_time() # Total wall time
pool.times() # Time per job
pool.first_error() # First error encountered
pool.collect() # Get results as DataFrame
pool.results() # Get results as list
See the JobPool API reference for full details.
See also
- Running UDFs — single UDF execution
- Scaling out UDFs — best practices
JobPoolAPI reference — all JobPool methods
[Legacy]: fused.submit()
The fused.submit() function still works but .map() is preferred:
# Legacy
results = fused.submit(my_udf, inputs)
# Preferred
pool = my_udf.map(inputs)
results = pool.collect()