Skip to main content

Run UDFs in parallel

Run a UDF over multiple inputs in parallel.

Signature

fused.submit(
udf,
arg_list,
engine='remote',
instance_type='realtime',
max_workers=32,
collect=True,
cache_max_age=None,
)

Parameters

arg_list — Input formats

FormatExampleUse case
List[0, 1, 2, 3]Single parameter
List of dicts[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]Multiple parameters
DataFramepd.DataFrame({'a': [1, 3], 'b': [2, 4]})Multiple parameters

Each item/row becomes a separate job.

@fused.udf
def udf():
results = fused.submit(
single_job_udf,
[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
)
return results

@fused.udf
def single_job_udf(a: str, b: str):
import pandas as pd
return pd.DataFrame({"result": [a + b]})

engine

Same as fused.run. Default: remote

instance_type

Same as fused.run. Default: realtime

ModeBest for
realtimeMany quick jobs
small/medium/largeLong/heavy jobs

max_workers

Number of realtime instances to spin up in parallel. Default: 32, Max: 1000.

# Spin up 100 realtime instances in parallel
fused.submit(udf, inputs, max_workers=100)

collect

ValueBehaviorReturns
True (default)Blocking, waits for all jobsDataFrame
FalseNon-blockingJobPool

debug_mode

When True, runs only the first input via fused.run(). Use for testing before scaling.

# Test with first input only
fused.submit(udf, inputs, debug_mode=True)

max_retry

Max retries per failed job. Default: 2

ignore_exceptions

When True, failed runs are silently skipped in results. Default: False

cache_max_age

Same as fused.run. Additionally, when collect=True, collected results are cached locally for cache_max_age or 12h by default.

JobPool methods

When using collect=False, you get a JobPool object:

job = fused.submit(udf, inputs, collect=False)

job.wait() # Show progress bar
job.total_time() # Total wall time
job.times() # Time per job
job.first_error() # First error encountered
job.collect() # Get results as DataFrame

Tips

Test first:

fused.submit(udf, inputs, debug_mode=True)

Start small:

fused.submit(udf, inputs[:5])

Aim for 30-45s per job — gives safety margin before 120s timeout.

For batch jobs, save to S3:

@fused.udf
def batch_udf(input_path: str):
result = process(input_path)
output_path = f"s3://bucket/results/{...}"
result.to_parquet(output_path)
return output_path # Return path, not data

See also