Running Jobs in Parallel: fused.submit()
Sometimes you want to run a UDF over a list of inputs (for example running a UDF that unzips a file over a list of files). If each run itself is quite small, then you can run a batch of UDFs over a list of inputs.
Let's use a simple UDF to demonstrate:
@fused.udf
def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})
Say we wanted to run this udf 10 times over:
inputs = [0,1,2,3,4,5,6,7,8,9]
fused.submit()
Fused is built to help you scale your processing to huge datasets and the core of this ability is fused.submit(). You can run 1 UDF over a large number of arguments:
results = fused.submit(udf, inputs)
>>> 100%|██████████| 10/10
Note that you will only see the progress bar if you are running the code in a notebook, Workbench does not yet currently stream the progress to the console.
fused.submit() runs all these jobs in parallel and defaults to directly returning the results back to you as a dataframe:
print(results)
>>>
val
val
0 0 0
1 0 1
...
Tips for using fused.submit()
- Check that your parameters are correctly setup with
debug_mode=True:
single_run = fused.submit(udf, inputs, debug_mode=True)
- Start with a small number of jobs, then scale up:
# Only running the first 5 inputs to make sure the UDF is working as expected
results = fused.submit(udf, inputs[:5])
- Check the runtime of each job:
# Run only 10 jobs and see how long each one took
results = fused.submit(udf, inputs[:5], collect=False)
print(results.times())
Aim for a single UDF that takes 30-45s. This gives you a "safety" margin as UDFs will timeout after 120s. So they can still take a bit longer and not time out.
Advanced Options
Blocking vs non-blocking calls
By default we've set up fused.submit() to be blocking, meaning it will wait for all the jobs to finish before returning the results.
However, you can set collect=False and then track the progress of jobs as they run:
results = fused.submit(udf, inputs, collect=False)
Real time logs
- Show a progress bar of number of jobs completed:
print(results.wait())
100%|██████████| 10/10 [00:01<00:00, 9.31it/s]
- Show total time it took to run all the jobs:
print(results.total_time())
>>> datetime.timedelta(seconds=1, microseconds=96764)
- Check the first error that occurred:
print(results.first_error())
- Get your data back as a dataframe:
print(results.collect())
Debug mode
Sometimes you might just want to make sure your code is running correctly before kicking off a large number of jobs. That's what Debug Mode allows you to do:
results = fused.submit(udf, inputs, debug_mode=True)
This will run the first item in inputs directly using fused.run() (equivalent to fused.run(udf, inputs[0])) and then return the results.
Execution parameters
fused.submit() also have parameters giving you more control over the execution. See the Python SDK docs page for more details:
max_workers: The number of workers to use for the job pool.engine:localorremote(default isremote). Just likefused.run(), by defaultfused.submit()will run the UDF in the Fused server (engine='remote'). You can setengine='local'to runudflocally either on your machine or inside a large machine that you spin up.max_retry: The maximum number of retries for a job.
Wall time vs CPU time
There are 2 different runtime calculations for fused.submit() jobs:
Wall time: Total actual time taken to run the jobs:
print(results.total_time())
CPU time: Sum of all the CPU time taken by the jobs. This is the time that counts towards your bill.
round(sum([t.seconds for t in results.times()]) + sum([t.microseconds for t in results.times()]) / 10e6, 2)
Example use cases
fused.submit() is used in many places across our docs:
- 📈 Processing 20Tb of data in minutes: Making a Climate Dashboard of 20 years of data
- ⛴️ Retrieving 30 days of AIS boat transponder data around the United States to detect illegal fishing
- 🛰️ Retrieving all of Maxar's Open Data STAC Catalogs across every events they have imagery for.