Skip to main content

Running Jobs in Parallel: fused.submit()

Sometimes you want to run a UDF over a list of inputs (for example running a UDF that unzips a file over a list of files). If each run itself is quite small, then you can run a batch of UDFs over a list of inputs.

Let's use a simple UDF to demonstrate:

@fused.udf
def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})

Say we wanted to run this udf 10 times over:

inputs = [0,1,2,3,4,5,6,7,8,9]

fused.submit()

Fused is built to help you scale your processing to huge datasets and the core of this ability is fused.submit(). You can run 1 UDF over a large number of arguments:

results = fused.submit(udf, inputs)
>>> 100%|██████████| 10/10 

Note that you will only see the progress bar if you are running the code in a notebook, Workbench does not yet currently stream the progress to the console.

fused.submit() runs all these jobs in parallel and defaults to directly returning the results back to you as a dataframe:

print(results)
>>>
val
val
0 0 0
1 0 1
...

Tips for using fused.submit()

  • Check that your parameters are correctly setup with debug_mode=True:
single_run = fused.submit(udf, inputs, debug_mode=True)
  • Start with a small number of jobs, then scale up:
# Only running the first 5 inputs to make sure the UDF is working as expected
results = fused.submit(udf, inputs[:5])
  • Check the runtime of each job:
# Run only 10 jobs and see how long each one took
results = fused.submit(udf, inputs[:5], collect=False)
print(results.times())
Job length rule of thumb: 30-45s

Aim for a single UDF that takes 30-45s. This gives you a "safety" margin as UDFs will timeout after 120s. So they can still take a bit longer and not time out.

Advanced Options

Blocking vs non-blocking calls

By default we've set up fused.submit() to be blocking, meaning it will wait for all the jobs to finish before returning the results.

However, you can set collect=False and then track the progress of jobs as they run:

results = fused.submit(udf, inputs, collect=False)

Real time logs

  • Show a progress bar of number of jobs completed:
print(results.wait())
100%|██████████| 10/10 [00:01<00:00, 9.31it/s]
  • Show total time it took to run all the jobs:
print(results.total_time())
>>> datetime.timedelta(seconds=1, microseconds=96764)
  • Check the first error that occurred:
print(results.first_error())
  • Get your data back as a dataframe:
print(results.collect())

Debug mode

Sometimes you might just want to make sure your code is running correctly before kicking off a large number of jobs. That's what Debug Mode allows you to do:

results = fused.submit(udf, inputs, debug_mode=True)

This will run the first item in inputs directly using fused.run() (equivalent to fused.run(udf, inputs[0])) and then return the results.

Execution parameters

fused.submit() also have parameters giving you more control over the execution. See the Python SDK docs page for more details:

  • max_workers: The number of workers to use for the job pool.
  • engine: local or remote (default is remote). Just like fused.run(), by default fused.submit() will run the UDF in the Fused server (engine='remote'). You can set engine='local' to run udf locally either on your machine or inside a large machine that you spin up.
  • max_retry: The maximum number of retries for a job.

Wall time vs CPU time

There are 2 different runtime calculations for fused.submit() jobs:

Wall time: Total actual time taken to run the jobs:

print(results.total_time())

CPU time: Sum of all the CPU time taken by the jobs. This is the time that counts towards your bill.

round(sum([t.seconds for t in results.times()]) + sum([t.microseconds for t in results.times()]) / 10e6, 2)

Example use cases

fused.submit() is used in many places across our docs: