Skip to main content

Large UDF run

Some jobs require more resources than a few Gb of RAM or take more than a few seconds to run. This section will show how to run such larger jobs

Defining "Large" jobs

Large jobs are jobs to run UDFs that are:

  • Longer than 120s to run
  • Require large resources (more than a few Gb of RAM)

To run these we use higher-latency instances than for small jobs, but with the ability to specific RAM, CPU count & storage depending on the needs.

This is useful for example when running large ingestion jobs which can require a lot of RAM & storage

A Simple UDF to demonstrate

We'll use the same UDF as in the running multiple small jobs section:

def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})

As mentioned in the Small UDF job section, to call it 1 time we can use, val=1)

Running a large job: job.run_remote()

Running a UDF over a large instance is done in 2 steps:

  1. Creating a job with the UDF to run and passing input parameters to run the UDF over
  2. Send the job to a remote instance and (optionally) defining the instance arguments
# We'll run this UDF 5 times with 5 different inputs
job = udf(arg_list=[0,1,2,3,4])

Passing UDF parameters with arg_list

Single Parameter

As mentioned above to pass UDF arguments to a remote job, use arg_list to specify a list of inputs to run your job over:

job = udf(arg_list=[0,1,2,3,4])

Multiple Parameters

Currently arg_list only supports giving 1 input variables to each UDF. We can work around this by aggregating multiple variables into a dict and having a UDF take a dict as input:

def udf(variables: dict = {'val1':1, 'val2':2}):
import pandas as pd
import fused

# Retrieving each variables from the dictionary
val1 = variables['val1']
val2 = variables['val2']

# Some simple boilerplate logic
output_value = int(val1)*int(val2)
df = pd.DataFrame(data={'output':[output_value]})

# Saving output to shared file location to access results later
# `/mnt/cache` is the shared file location for all small & large jobs

return df

You do need to type the input variable for this to work. If we had defined variables without typing it as a dict:

def udf(variables = {}):
# Notice the lack of `variables: dict = {}`
return df

our remote job run would have fail as Fused server has no way of knowing what to expect from variables

We can then call this UDF as a remote job by passing a list of dictionaries to arg_list:

job = udf(arg_list=[{"val1": 5, "val2": 2}, {"val1": 3, "val2": 4}])

We can confirm this worked by viewing our results by browsing File Explorer:

Multiple arg list run remote output on File Explorer

run_remote instance arguments

With job.run_remote() you also have access to a few other arguments to make your remote job fit your needs:

  • instance_type: Decide which type of machine to run your job on (see below for which ones we support)
  • disk_size_gb: The amount of disk space in Gb allocated to your instance (between 16 and 999 Gb)

For example if you want a job with 16 vCPUs, 64Gb of RAM and 100Gb of storage you can call:

job.run_remote(instance_type="m5.4xlarge", disk_size_gb=100)
Currently supported instance_type

Fused run_remote() instance_type are based around AWS General Purpose instance types. We do support the following:

supported_instance_types = [

Accessing job logs

While your job is running you can access monitor & manage it with the following:

# View the job status

# Follow the job logs

# Get the job logs

# Cancel the job

These logs can also be accessed:

In a notebook

Running job.run_remote() in a notebook gives you a clickable link:

Dark Vessel Detection workflow
In Fused Workbench

Under the "Jobs" tab, on the bottom left of Workbench:

By email (you'll receive 1 email for each job)

Each job leads to an email summary with logs upon completion:

Dark Vessel Detection workflow

Getting results

To get data back from your run_remote() jobs is a bit more complicated than for "real-time". Our recommendation is to have your UDF write data directly to shared storage /mount/ or cloud storage and access it after

Example job: saving to shared storage /mount/

A common use case for offline jobs is as a "pre-ingestion" process. You can find a real-life example of this in our dark vessel detection example

Here all we're returning is a status information in a pandas dataframe, but the our data in unzipped, read and saved to S3:

import fused

def read_ais_from_noaa_udf(datestr='2023_03_29'):
import os
import requests
import io
import zipfile
import pandas as pd

# This is our local mount file path,
daily_ais_parquet = f'{path}/{datestr[-2:]}.parquet'

# Download ZIP file to mounted disk
if r.status_code == 200:
with zipfile.ZipFile(io.BytesIO(r.content), 'r') as z:
with'AIS_{datestr}.csv') as f:
df = pd.read_csv(f)

return pd.DataFrame({'status':['Done']})
return pd.DataFrame({'status':[f'read_error_{r.status_code}']})

Data written to /mount/ can be accessed by any other instance used by anyone on your team so it can be used by any other UDF you run after.


You can use File Explorer to easily see your outputs! In this case of the above example typing efs://AIS_{datestr}.csv (and replacing datestr with your date) will show the results directly in File Explorer!

Large jobs trade-offs

  • Takes a few seconds to startup machine
  • Can run as long as needed