Large UDF run
Some jobs require more resources than a few Gb of RAM or take more than a few seconds to run. This section will show how to run such larger jobs
Defining "Large" jobs
Large jobs are jobs to run UDFs that are:
- Longer than 120s to run
- Require large resources (more than a few Gb of RAM)
To run these we use higher-latency instances than for small jobs, but with the ability to specific RAM, CPU count & storage depending on the needs.
This is useful for example when running large ingestion jobs which can require a lot of RAM & storage
A Simple UDF to demonstrate
We'll use the same UDF as in the running multiple small jobs section:
@fused.udf
def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})
As mentioned in the Small UDF job section, to call it 1 time we can use fused.run()
:
fused.run(udf, val=1)
Running a large job job: job.run_remote()
Running a UDF over a large instance is done in 2 steps:
- Creating a job with the UDF to run and passing input parameters to run the UDF over
- Send the job to a remote instance and (optionally) defining the instance arguments
# We'll run this UDF 5 times with 5 different inputs
job = udf(arg_list=[0,1,2,3,4])
job.run_remote()
Passing UDF parameters with arg_list
Single Parameter
As mentioned above to pass UDF arguments to a remote job, use arg_list
to specify a list
of inputs to run your job over:
job = udf(arg_list=[0,1,2,3,4])
job.run_remote()
Multiple Parameters
Currently arg_list
only supports giving 1 input variables to each UDF. We can work around this by aggregating multiple variables into a dict
and having a UDF take a dict
as input:
@fused.udf
def udf(variables: dict = {'val1':1, 'val2':2}):
import pandas as pd
import fused
# Retrieving each variables from the dictionary
val1 = variables['val1']
val2 = variables['val2']
# Some simple boilerplate logic
output_value = int(val1)*int(val2)
df = pd.DataFrame(data={'output':[output_value]})
# Saving output to shared file location to access results later
# `/mnt/cache` is the shared file location for all small & large jobs
df.to_csv(f"/mnt/cache/demo_multiple_arg_list_run/output_{str(output_value)}.csv")
return df
You do need to type the input variable for this to work. If we had defined variables
without typing it as a dict
:
@fused.udf
def udf(variables = {}):
# Notice the lack of `variables: dict = {}`
...
return df
our remote job run would have fail as Fused server has no way of knowing what to expect from variables
We can then call this UDF as a remote job by passing a list of dictionaries to arg_list
:
job = udf(arg_list=[{"val1": 5, "val2": 2}, {"val1": 3, "val2": 4}])
job.run_remote()
We can confirm this worked by viewing our results by browsing File Explorer:
run_remote
instance arguments
With job.run_remote()
you also have access to a few other arguments to make your remote job fit your needs:
instance_type
: Decide which type of machine to run your job on (see below for which ones we support)disk_size_gb
: The amount of disk space in Gb your instance requires (up to999
Gb)
For example if you want a job with 16 vCPUs, 64Gb of RAM and 100Gb of storage you can call:
job.run_remote(instance_type="m5.4xlarge", disk_size_gb=100)
Currently supported instance_type
Fused run_remote()
instance_type
are based around AWS General Purpose instance types. We do support the following:
supported_instance_types = [
"m5.large",
"m5.xlarge",
"m5.2xlarge",
"m5.4xlarge",
"m5.8xlarge",
"m5.12xlarge",
"m5.16xlarge",
"r5.large",
"r5.xlarge",
"r5.2xlarge",
"r5.4xlarge",
"r5.8xlarge",
"r5.12xlarge",
"r5.16xlarge",
]
Accessing job logs
While your job is running you can access monitor & manage it with the following:
# View the job status
job.status
# Follow the job logs
job.tail_logs()
# Get the job logs
job.print_logs()
# Cancel the job
job.cancel()
You can also have multiple options to view your job logs:
In a notebook
Running job.run_remote()
in a notebook gives you a clickable link:
In Fused Workbench
Under the "Jobs" tab, on the bottom left of Workbench:
By email (you'll receive 1 email for each job)
Each job leads to an email summary with logs upon completion:
Getting results
To get data back from your run_remote()
jobs is a bit more complicated than for "real-time". Our recommendation is to have your UDF write data directly to disk or cloud storage and access it after
Example job: saving to disk
A common use case for offline jobs is as a "pre-ingestion" process. You can find a real-life example of this in our dark vessel detection example
Here all we're returning is a status information in a pandas dataframe, but the our data in unzipped, read and saved to S3:
import fused
@fused.udf()
def read_ais_from_noaa_udf(datestr='2023_03_29'):
import os
import requests
import io
import zipfile
import pandas as pd
url=f'https://coast.noaa.gov/htdata/CMSP/AISDataHandler/{datestr[:4]}/AIS_{datestr}.zip'
# This is our local mount file path,
path=f'/mnt/cache/AIS_demo/{datestr[:7]}/'
daily_ais_parquet = f'{path}/{datestr[-2:]}.parquet'
# Download ZIP file to mounted disk
r=requests.get(url)
if r.status_code == 200:
with zipfile.ZipFile(io.BytesIO(r.content), 'r') as z:
with z.open(f'AIS_{datestr}.csv') as f:
df = pd.read_csv(f)
df.to_parquet(daily_ais_parquet)
return pd.DataFrame({'status':['Done']})
else:
return pd.DataFrame({'status':[f'read_error_{r.status_code}']})
Since our data is written to cloud storage, it can now be accessed anywhere else, through another UDF or any other application with access to cloud storage.
You can use File Explorer to easily see your outputs! In this case of the above example typing efs://AIS_{datestr}.csv
(and replacing datestr
with your date) will show the results directly in File Explorer!
Large jobs trade-offs
- Takes a few seconds to startup machine
- Can run as long as needed