Skip to main content

Large UDF run

Some jobs require more resources than a few Gb of RAM or take more than a few seconds to run. This section will show how to run such larger jobs

Defining "Large" jobs

Large jobs are jobs to run UDFs that are:

  • Longer than 120s to run
  • Require large resources (more than a few Gb of RAM)

To run these we use higher-latency instances than for small jobs, but with the ability to specific RAM, CPU count & storage depending on the needs.

This is useful for example when running large ingestion jobs which can require a lot of RAM & storage

A Simple UDF to demonstrate

We'll use the same UDF as in the running multiple small jobs section:

@fused.udf
def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})

As mentioned in the Small UDF job section, to call it 1 time we can use fused.run():

fused.run(udf, val=1)

Running a large job job: job.run_remote()

Running a UDF over a large instance is done in 2 steps:

  1. Creating a job with the UDF to run and passing input parameters to run the UDF over
  2. Send the job to a remote instance and (optionally) defining the instance arguments
# We'll run this UDF 5 times with 5 different inputs
job = udf(arg_list=[0,1,2,3,4])
job.run_remote()

Passing UDF parameters with arg_list

Single Parameter

As mentioned above to pass UDF arguments to a remote job, use arg_list to specify a list of inputs to run your job over:

job = udf(arg_list=[0,1,2,3,4])
job.run_remote()

Multiple Parameters

Currently arg_list only supports giving 1 input variables to each UDF. We can work around this by aggregating multiple variables into a dict and having a UDF take a dict as input:

@fused.udf
def udf(variables: dict = {'val1':1, 'val2':2}):
import pandas as pd
import fused

# Retrieving each variables from the dictionary
val1 = variables['val1']
val2 = variables['val2']

# Some simple boilerplate logic
output_value = int(val1)*int(val2)
df = pd.DataFrame(data={'output':[output_value]})

# Saving output to shared file location to access results later
# `/mnt/cache` is the shared file location for all small & large jobs
df.to_csv(f"/mnt/cache/demo_multiple_arg_list_run/output_{str(output_value)}.csv")

return df
note

You do need to type the input variable for this to work. If we had defined variables without typing it as a dict:

@fused.udf
def udf(variables = {}):
# Notice the lack of `variables: dict = {}`
...
return df

our remote job run would have fail as Fused server has no way of knowing what to expect from variables

We can then call this UDF as a remote job by passing a list of dictionaries to arg_list:

job = udf(arg_list=[{"val1": 5, "val2": 2}, {"val1": 3, "val2": 4}])
job.run_remote()

We can confirm this worked by viewing our results by browsing File Explorer:

Multiple arg list run remote output on File Explorer

run_remote instance arguments

With job.run_remote() you also have access to a few other arguments to make your remote job fit your needs:

  • instance_type: Decide which type of machine to run your job on (see below for which ones we support)
  • disk_size_gb: The amount of disk space in Gb your instance requires (up to 999 Gb)

For example if you want a job with 16 vCPUs, 64Gb of RAM and 100Gb of storage you can call:

job.run_remote(instance_type="m5.4xlarge", disk_size_gb=100)
Currently supported instance_type

Fused run_remote() instance_type are based around AWS General Purpose instance types. We do support the following:

supported_instance_types = [
"m5.large",
"m5.xlarge",
"m5.2xlarge",
"m5.4xlarge",
"m5.8xlarge",
"m5.12xlarge",
"m5.16xlarge",
"r5.large",
"r5.xlarge",
"r5.2xlarge",
"r5.4xlarge",
"r5.8xlarge",
"r5.12xlarge",
"r5.16xlarge",
]

Accessing job logs

While your job is running you can access monitor & manage it with the following:

# View the job status
job.status

# Follow the job logs
job.tail_logs()

# Get the job logs
job.print_logs()

# Cancel the job
job.cancel()

You can also have multiple options to view your job logs:

In a notebook

Running job.run_remote() in a notebook gives you a clickable link:

Dark Vessel Detection workflow
In Fused Workbench

Under the "Jobs" tab, on the bottom left of Workbench:

By email (you'll receive 1 email for each job)

Each job leads to an email summary with logs upon completion:

Dark Vessel Detection workflow

Getting results

To get data back from your run_remote() jobs is a bit more complicated than for "real-time". Our recommendation is to have your UDF write data directly to disk or cloud storage and access it after

Example job: saving to disk

A common use case for offline jobs is as a "pre-ingestion" process. You can find a real-life example of this in our dark vessel detection example

Here all we're returning is a status information in a pandas dataframe, but the our data in unzipped, read and saved to S3:

import fused

@fused.udf()
def read_ais_from_noaa_udf(datestr='2023_03_29'):
import os
import requests
import io
import zipfile
import pandas as pd

url=f'https://coast.noaa.gov/htdata/CMSP/AISDataHandler/{datestr[:4]}/AIS_{datestr}.zip'
# This is our local mount file path,
path=f'/mnt/cache/AIS_demo/{datestr[:7]}/'
daily_ais_parquet = f'{path}/{datestr[-2:]}.parquet'

# Download ZIP file to mounted disk
r=requests.get(url)
if r.status_code == 200:
with zipfile.ZipFile(io.BytesIO(r.content), 'r') as z:
with z.open(f'AIS_{datestr}.csv') as f:
df = pd.read_csv(f)

df.to_parquet(daily_ais_parquet)
return pd.DataFrame({'status':['Done']})
else:
return pd.DataFrame({'status':[f'read_error_{r.status_code}']})

Since our data is written to cloud storage, it can now be accessed anywhere else, through another UDF or any other application with access to cloud storage.

tip

You can use File Explorer to easily see your outputs! In this case of the above example typing efs://AIS_{datestr}.csv (and replacing datestr with your date) will show the results directly in File Explorer!

Large jobs trade-offs

  • Takes a few seconds to startup machine
  • Can run as long as needed