Skip to main content

Parallel execution

Run a UDF over multiple inputs in parallel.

Using .map()

Call .map() on a UDF to run it over multiple inputs in parallel:

my_udf.map(
arg_list, # list of arguments, each element becomes a job
*,
engine=None, # "remote" (default), "local", or instance type
cache_max_age=None, # max cache age
max_workers=None, # max parallel workers (default 32)
worker_concurrency=None, # concurrency per worker (default 1)
cache=True, # set False to disable caching
max_retry=2, # max retries for failed jobs
) -> JobPool

See the Udf.map() API reference for full details.

Basic example

@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])

# Wait and collect results as DataFrame
df = pool.df()
return df

@fused.udf
def my_udf(x: int):
return x ** 2

Input formats

FormatExampleUse case
List[0, 1, 2, 3]Single parameter
List of dicts[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]Multiple parameters

Each item becomes a separate job.

@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}])
df = pool.df()
return df

@fused.udf
def my_udf(a: int, b: int):
return a + b

Passing multiple arguments

You can also pass multiple arguments as a list of dictionaries:

@fused.udf
def udf():
arg_list = [
{"a": 1, "b": 2},
{"a": 3, "b": 4},
{"a": 5, "b": 6},
]
pool = my_udf.map(arg_list)
return pool.df()

@fused.udf
def my_udf(a: int, b: int):
return a + b

Each dict is one job, so this runs (1,2), (3,4), (5,6).

pool.df() returns:

abresult
123
347
5611

If your values are already split across separate lists, zip is a compact alternative:

arg_list = [{"a": a, "b": b} for a, b in zip(a_values, b_values)]

You can also pass a pandas DataFrame where each row is one job:

@fused.udf
def udf():
import pandas as pd

arg_list = pd.DataFrame({
"a": [1, 3, 5],
"b": [2, 4, 6],
})

pool = my_udf.map(arg_list)
return pool.df()

@fused.udf
def my_udf(a: int, b: int):
return a + b

This returns the same pool.df() output shown above.

For single-parameter UDFs, this also works with an input column:

@fused.udf
def udf():
import pandas as pd

arg_list = pd.DataFrame({"input": [1, 2, 3]})
pool = my_udf.map(arg_list)
return pool.df()

@fused.udf
def my_udf(input: int):
return input ** 2

Parameters

engine

Same as running UDFs. Default: "remote"

ModeBest for
"remote" (default)Many quick jobs
"small"/"medium"/"large"Long/heavy jobs

max_workers

Number of instances to spin up in parallel. Default: 32, Max: 1000.

# Spin up 100 instances in parallel
pool = my_udf.map(inputs, max_workers=100)

max_retry

Max retries per failed job. Default: 2

cache_max_age

Same as running UDFs.

worker_concurrency

Number of arguments to pass to the UDF per instance. Default: 1.

@fused.udf
def udf():
# Running 10 jobs in parallel but with 4 jobs per UDF instance
pool = my_udf.map(range(10), worker_concurrency=4)
df = pool.df()

return df

@fused.udf
def my_udf(x: int):
return x ** 2

Working with results

.map() returns a JobPool object:

pool = my_udf.map(inputs)

pool.wait() # Show progress bar
pool.total_time() # Total wall time
pool.times() # Time per job
pool.first_error() # First error encountered
pool.df() # Get results as DataFrame
pool.results() # Get results as list

See the JobPool API reference for full details.

See also


[Legacy]: fused.submit()

The fused.submit() function still works but .map() is preferred:

# Legacy
results = fused.submit(my_udf, inputs)

# Preferred
pool = my_udf.map(inputs)
results = pool.df()