Run UDFs in parallel
Run a UDF over multiple inputs in parallel.
Signature
fused.submit(
udf,
arg_list,
engine='remote',
instance_type='realtime',
max_workers=32,
collect=True,
cache_max_age=None,
)
Parameters
arg_list — Input formats
| Format | Example | Use case |
|---|---|---|
| List | [0, 1, 2, 3] | Single parameter |
| List of dicts | [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}] | Multiple parameters |
| DataFrame | pd.DataFrame({'a': [1, 3], 'b': [2, 4]}) | Multiple parameters |
Each item/row becomes a separate job.
@fused.udf
def udf():
results = fused.submit(
single_job_udf,
[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
)
return results
@fused.udf
def single_job_udf(a: str, b: str):
import pandas as pd
return pd.DataFrame({"result": [a + b]})
engine
Same as fused.run. Default: remote
instance_type
Same as fused.run. Default: realtime
| Mode | Best for |
|---|---|
realtime | Many quick jobs |
small/medium/large | Long/heavy jobs |
max_workers
Number of realtime instances to spin up in parallel. Default: 32, Max: 1000.
# Spin up 100 realtime instances in parallel
fused.submit(udf, inputs, max_workers=100)
collect
| Value | Behavior | Returns |
|---|---|---|
True (default) | Blocking, waits for all jobs | DataFrame |
False | Non-blocking | JobPool |
debug_mode
When True, runs only the first input via fused.run(). Use for testing before scaling.
# Test with first input only
fused.submit(udf, inputs, debug_mode=True)
max_retry
Max retries per failed job. Default: 2
ignore_exceptions
When True, failed runs are silently skipped in results. Default: False
cache_max_age
Same as fused.run. Additionally, when collect=True, collected results are cached locally for cache_max_age or 12h by default.
JobPool methods
When using collect=False, you get a JobPool object:
job = fused.submit(udf, inputs, collect=False)
job.wait() # Show progress bar
job.total_time() # Total wall time
job.times() # Time per job
job.first_error() # First error encountered
job.collect() # Get results as DataFrame
Tips
Test first:
fused.submit(udf, inputs, debug_mode=True)
Start small:
fused.submit(udf, inputs[:5])
Aim for 30-45s per job — gives safety margin before 120s timeout.
For batch jobs, save to S3:
@fused.udf
def batch_udf(input_path: str):
result = process(input_path)
output_path = f"s3://bucket/results/{...}"
result.to_parquet(output_path)
return output_path # Return path, not data
See also
- Run UDFs in python — single UDF execution
- How to run in parallel — walkthrough with examples