Skip to main content

Multiple UDFs

UDF work best when handling small amounts of data at time, but often datasets are large. This is where running UDFs in bulk becomes needed

There are 2 main approaches to running multiple UDFs:

  • Using "offline" instances
    • For high latency, large volumes of data
  • Using "real-time" instances
    • For small requests to finish quick

A Simple UDF

Here's a simple UDF:

@fused.udf
def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})

As mentioned in the Single UDF section, to call it 1 time we can do:

fused.run(udf, val=1)

Which returns:

  | val
------
0 | 1

We then have 2 options to run this multiple times:

Using "offline" instances (run_remote())

When to use: This is for high-latency (anything more than a few seconds) parallel processing of longer or more complex jobs

Running "offline" job

Using the same UDF as above we can run this UDF 10 times on an "offline" instance:

# We'll run this UDF 10 times
job = udf(arg_list=range(10))
job.run_remote()

This pushes a job onto a persistent machine on Fused server.

run_remote additional arguments

With job.run_remote() you also have access to a few other arguments to make your remote job fit your need:

  • instance_type: Decide which type of machine to run your job on (based on the AWS General Purpose instance types)
  • disk_size_gb: The amount of disk space in Gb your instance requires

For example if you want a job with 16 vCPUs, 64Gb of RAM and 100Gb of storage you can call:

job.run_remote(instance_type="m5.4xlarge", disk_size_gb=100)

Accessing "offline" job logs

You can view the logs of all your on-going and past run_remote() jobs either:

In a notebook

Running job.run_remote() in a notebook gives you a clickable link:

Dark Vessel Detection workflow
In Fused Workbench

Under the "Jobs" tab, on the bottom left of Workbench:

By email (you'll receive 1 email for each job)

Each job leads to an email summary with logs upon completion:

Dark Vessel Detection workflow

Getting "offline" results

To get data back from your "offline" run is a bit more complicated than for "real-time". Our recommendation is to have your UDF write data directly to disk or cloud storage and access it after

Example job: saving to disk

A common use case for offline jobs is as a "pre-ingestion" process. You can find a real-life example of this in our dark vessel detection example

Here all we're returning is a status information in a pandas dataframe, but the our data in unzipped, read and saved to S3:

import fused

@fused.udf()
def read_ais_from_noaa_udf(datestr='2023_03_29'):
import os
import requests
import io
import zipfile
import pandas as pd

url=f'https://coast.noaa.gov/htdata/CMSP/AISDataHandler/{datestr[:4]}/AIS_{datestr}.zip'
# This is our local mount file path,
path=fused.file_path(f'/AIS/{datestr[:7]}/')
daily_ais_parquet = f'{path}/{datestr[-2:]}.parquet'

# Download ZIP file to mounted disk
r=requests.get(url)
if r.status_code == 200:
with zipfile.ZipFile(io.BytesIO(r.content), 'r') as z:
with z.open(f'AIS_{datestr}.csv') as f:
df = pd.read_csv(f)

df.to_parquet(daily_ais_parquet)
return pd.DataFrame({'status':['Done']})
else:
return pd.DataFrame({'status':[f'read_error_{r.status_code}']})

Since our data is written to cloud storage, it can now be accessed anywhere else, through another UDF or any other application with access to cloud storage.

Tradeoffs:

  • Takes a few seconds to startup machine
  • Can run as long as needed

[Experimental] Using "real-time" instances (run_pool & PoolRunner)

When to use: This is for quick jobs that can finish in less than 120s

note

This is not a feature directly implemented in fused. Instead we're going to use a code from the fused.public.common utils module, that might change over time

You can learn more about import utils from UDFs here

If you want to quickly run a UDF a few times over a dataset, you can use PoolRunner. Using a lambda function you can map the function to pass to PoolRunner:

runner = fused.utils.common.PoolRunner(lambda val: fused.run(udf, val=val), range(10))
runner.get_result_all()

In a notebook runner.get_result_all() prints the progress status over time until the job is done:

Pool Runner

Getting "real-time" results

You can then get all your results by concatinating them:

# In this example udf() returns a Pandas DataFrame so `.get_concat()` 
result = runner.get_concat()

with type(output) being a pandas.core.frame.DataFrame

Tradeoffs:

  • No startup time
  • Will timeout after 120s