Skip to main content

Scaling Out UDFs

When realtime isn't enough: run many jobs in parallel, or run on a dedicated machine with more resources.

Choosing your approach

Your situationUse
Many inputs (files, dates, regions)Parallel execution
Single job needs >120s or >4GB RAMDedicated instance
Many heavy jobsCombining both

Parallel execution

fused.submit() runs a UDF over multiple inputs in parallel, spinning up separate instances for each job.

inputs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
results = fused.submit(udf, inputs)

For the full API reference, see fused.submit().

Start small, then scale

Don't immediately spin up 1000 jobs. Test progressively:

# First test with 5 inputs
results = fused.submit(udf, inputs[:5])

# Then 10, then 50, then scale up
results = fused.submit(udf, inputs[:50])

Target 30-45s per job

Each parallel job has a 120s timeout. Aim for 30-45s per job to leave safety margin for slower runs.

If your jobs consistently hit the timeout, either:

  • Break them into smaller chunks
  • Use instance_type for dedicated machines

Test first with debug mode

Run only the first input to verify your setup works:

result = fused.submit(udf, inputs, debug_mode=True)

Check timing

Monitor how long each job takes:

job = fused.submit(udf, inputs[:5], collect=False)
job.wait()
print(job.times()) # Time per job

Error handling

By default, errors aren't cached. If a job fails (e.g., API timeout), it will retry fresh on the next run. See Caching for more on how caching works.

However, if you wrap errors in try/except and return a result, that result gets cached:

@fused.udf
def udf(url: str):
try:
return fetch_data(url)
except Exception as e:
return {"error": str(e)} # This gets cached!
tip

Use ignore_exceptions=True to skip failed jobs when collecting results:

results = fused.submit(udf, inputs, ignore_exceptions=True)

Dedicated instances

Batch jobs run on dedicated machines with more resources and no time limits. Use them when your UDF needs more than realtime can offer.

When to use

Use a dedicated instance when your UDF exceeds realtime limits:

  • Takes longer than 120s to run
  • Needs more than ~4GB RAM

Tradeoffs:

  • ~30s startup time (machine needs to spin up)
  • Higher resource availability

Starting a batch job

Add instance_type to run on a dedicated machine:

@fused.udf(instance_type='small')
def udf():
# This UDF runs on a batch instance
...

Running a job from workbench

In Workbench, batch UDFs require manual execution (Shift+Enter) and show a confirmation modal.

note

Batch and realtime jobs have separate caches. Running the same UDF with and without instance_type will cache results independently.

Write to disk, don't return data

Batch jobs should write results to cloud storage or mount, not return them. Data returned from batch jobs can be lost if the connection times out.

@fused.udf(instance_type='small')
def batch_job(input_path: str):
import pandas as pd

# Process data
df = pd.read_parquet(input_path)
result = heavy_processing(df)

# Write to S3, not return
output_path = f"s3://my-bucket/results/{...}"
result.to_parquet(output_path)

return output_path # Return the path, not the data

Monitor your jobs

Batch jobs take time to start and run. Use the Jobs page in Workbench to monitor:

  • Status: Running, Completed, Failed
  • CPU & Memory: Resource usage over time
  • Disk: Storage consumption
  • Runtime: How long the job has been running
  • Logs: Real-time output from your UDF
# Programmatic monitoring
job.status # Check status
job.tail_logs() # Stream logs
job.cancel() # Stop a job

Expect startup delay

Batch machines take ~30s to spin up. Plan accordingly—batch jobs aren't for quick iterations.

Iterate on realtime first

Test your UDF on a small data sample using realtime execution. Once you're confident it works, switch to instance_type to scale up.

Instance types

AliasvCPUsRAM
small22 GB
medium1664 GB
large64512 GB

See the full list supported of AWS / GCP instance types


Combining both

If your jobs need more than 120s or ~4GB RAM each, combine parallel execution with dedicated instances:

results = fused.submit(
udf,
inputs,
instance_type="large",
collect=False
)

Example use cases

See also