Skip to main content

Parallel execution

Run a UDF over multiple inputs in parallel.

Using .map()

Call .map() on a UDF to run it over multiple inputs in parallel:

my_udf.map(
arg_list, # list of arguments, each element becomes a job
*,
engine=None, # "remote" (default), "local", or instance type
cache_max_age=None, # max cache age
max_workers=None, # max parallel workers (default 32)
worker_concurrency=None, # concurrency per worker (default 1)
cache=True, # set False to disable caching
max_retry=2, # max retries for failed jobs
) -> JobPool

See the Udf.map() API reference for full details.

Basic example

@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])

# Wait and collect results as DataFrame
df = pool.df()
return df

@fused.udf
def my_udf(x: int):
return x ** 2

Input formats

FormatExampleUse case
List[0, 1, 2, 3]Single parameter
List of dicts[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]Multiple parameters

Each item becomes a separate job.

@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}])
df = pool.df()
return df

@fused.udf
def my_udf(a: int, b: int):
return a + b

Parameters

engine

Same as running UDFs. Default: "remote"

ModeBest for
"remote" (default)Many quick jobs
"small"/"medium"/"large"Long/heavy jobs

max_workers

Number of instances to spin up in parallel. Default: 32, Max: 1000.

# Spin up 100 instances in parallel
pool = my_udf.map(inputs, max_workers=100)

max_retry

Max retries per failed job. Default: 2

cache_max_age

Same as running UDFs.

worker_concurrency

Number of arguments to pass to the UDF per instance. Default: 1.

@fused.udf
def udf():
# Running 10 jobs in parallel but with 4 jobs per UDF instance
pool = my_udf.map(range(10), worker_concurrency=4)
df = pool.df()

return df

@fused.udf
def my_udf(x: int):
return x ** 2

Working with results

.map() returns a JobPool object:

pool = my_udf.map(inputs)

pool.wait() # Show progress bar
pool.total_time() # Total wall time
pool.times() # Time per job
pool.first_error() # First error encountered
pool.collect() # Get results as DataFrame
pool.results() # Get results as list

See the JobPool API reference for full details.

See also


[Legacy]: fused.submit()

The fused.submit() function still works but .map() is preferred:

# Legacy
results = fused.submit(my_udf, inputs)

# Preferred
pool = my_udf.map(inputs)
results = pool.collect()