Skip to main content

Build & Running UDFs

An opinionated guide to making the most out of Fused UDFs

Fused UDFs are Python functions that run on serverless compute and can be called from anywhere with fused.run(udf). This guide is a resource meant to help you on your way to making the most out of UDFs.

A short reminder: The anatomy of a UDF

@fused.udf
def udf(my_awesome_input: int = 1):
import pandas as pd

return pd.DataFrame({"Look at my output: ": [my_awesome_input]})

Each UDF has a few specific elements:

note

All of this is explained in the "Write UDF" section in much more details.

You can then run UDFs from anywhere with fused.run(udf). These are still Python functions, giving you a lot of flexibility on what oyu can do, but we have some recommendations for keeping them fast & efficient.

Writing efficient UDFs

Keep things small

The main benefit of Fused UDFs is how responsive they are. They achieve this by running on Python serverless compute. They can time out, so the best way to keep workflows fast is to keep them small:

Example: Breaking down a complex pipeline into smaller UDFs

❌ Not recommended:

@fused.udf
def inefficient_pipeline_udf(data_path):
import pandas as pd

df = pd.read_csv(data_path)
# Some complicated processing logic to create df_processed
processed_df = ...

return processed_df

✅ Instead, break it down:

@fused.udf
def load_data_udf(data_path):
import pandas as pd
return pd.read_csv(data_path)
@fused.udf
def process_data_udf(df):
import pandas as pd

# Some complicated processing logic to create df_processed
processed_df = ...

return processed_df
@fused.udf
def pipeline_udf(data_path):
import pandas as pd

df = fused.run(load_data_udf, data_path=data_path)
processed_df = fused.run(process_data_udf, df=df)

return processed_df

Run often, Iterate quickly

Just like writing short cells when developing in a Jupyter Notebook, we recommend you keep your UDFs short & fast to execute

⚡️ Aim for UDFs that take up to 1min to run

UDFs run with fused.run() time out after 120s so we recommend you keep a buffer in case your UDF takes a bit longer to execute

Visual: UDF timing guideline

This is a breakdown of what happens when you run a UDF with fused.run() and why we recommend you keep your UDFs at the 30s-1min mark:

UDF Design Guidelines

Run tasks in parallel

Sometimes you need to run a UDF over a large amount of inputs that takes longer than 120s to run, for example loading a large dataset or running a more complex process.

In this case you can use fused.submit() to run a small UDF over a set of inputs in parallel.

Example: Parallelizing a UDF with fused.submit()

Let's imagine we have a fetch_single_data that loads data from an API for a large amount of inputs input_data=[0,1,2,3,4,5,7,8,9]:

@fused.udf
def fetch_single_data(single_input: int):
import pandas as pd
import time

# Considering this as our API call, sleeping to simulate the time it takes to get the data
time.sleep(3)

return pd.DataFrame({"data": [f"processed_{str(single_input)}"]})

If we were to run this UDF in Workbench we would only be able to run it for 1 input at a time, so we could edit our UDF to loop over the inputs:

@fused.udf
def fetch_data(inputs: list):
import pandas as pd
import time

fetched_data = []
for i in inputs:
# Considering this as our API call
time.sleep(3)
fetched_data.append(f"processed_{str(i)}")

return pd.DataFrame({"data": fetched_data})

However, running this UDF with fused.run(fetch_data, inputs=input_data) will take longer as we add inputs, we could even quickly go over the 120s limit. We still do want to fetch data across all our inputs which is where fused.submit() comes in:

Going back to our original UDF, we can now run it with fused.submit() to run it in parallel:

@fused.udf
def load_data_udf(input_data):

results = fused.submit(
fetch_single_data,
input_data,
engine='local', # This ensures the UDF is run in our local server rather than spinning up new instances.
)
fetched_data = results.collect_df()

return fetched_data

This is of course a simplified example, but it shows how you can use fused.submit() to run a UDF in parallel for each input.

This now runs a lot faster by running the fetch_single_data UDF in parallel for each input.

note

The example here blocks the main thread until all the fused.submit() calls have finished. This means you might have to wait longer in Workbench for the results to show up.

Comparison of both approaches in Workbench:

Running with fused.run(), 31.5s: 10 inputs fused.run

Running with fused.submit(), 4.6s: 10 inputs fused.submit

Use offline instances for your large jobs

You might not be able to run your UDF within the 120s limit of the default fused.run(udf) call or break it down into smaller UDFs and parallelize it with fused.submit(). In that case you can use offline instances. These also allow you to:

You can find an example of using offline instances in one of our end to end examples when ingesting data into portioned cloud native formats

Cache as much as you can

Fused relies heavily on caching repetitive tasks to make recurring calls much faster (and more compute efficient)

✅ You want to use caching for functions with inputs that are recurring:

  • Loading a dataset
  • Computing a recurring operation with default variables
  • Intermediate results you'll reuse soon

❌ When not to use caching:

  • In most cases, for functions taking bounds as an argument -> your function + input cache would get re-generated for each new bounds (which changes each time you pan around in Workbench Map view for example)
  • Data you want others in your team or external to Fused to use. You're better off writing your data to cloud storage like s3 or gcs
Example: Caching a repetitive task

Re-using the example from keeping things small:

❌ Not recommended:

@fused.udf
def inefficient_pipeline_udf(data_path):
import pandas as pd

df = pd.read_csv(data_path)
# Some complicated processing logic to create df_processed
processed_df = ...

return processed_df

✅ Instead, break it down AND cache the calls:

@fused.udf
def load_data_udf(data_path):
import pandas as pd
return pd.read_csv(data_path)
@fused.udf
def process_data_udf(df):
import pandas as pd
# Some complicated processing logic to create df_processed
# ...
return processed_df
@fused.udf
def pipeline_udf(data_path):
import pandas as pd

@fused.cache
def load_data(data_path):
return fused.run(load_data_udf, data_path=data_path)

@fused.cache
def process_data(df):
return fused.run(process_data_udf, df=df)

df = load_data(data_path)
processed_df = process_data(df)

return processed_df
tip

Read more about the caching details:

Prepare your large datasets

Fused works at its best with data that is fast to read and can be read in tiles or chunks. We know that most of the data out there isn't in the most efficient file formats which is why we provide tools to ingest your own data into cloud-optimized, partitioned formats.

We have a dedicated page for when you should consider ingesting your own data. As a rule of thumb you want to consider ingesting your data when:

  • Files are read multiple times and >100MB
  • Files that are slow or require some processing to open (.zip for example)

Don't start from scratch: UDF Catalog

Just like using libraries in Python to leverage existing tools, you don't need to start from scratch in Fused. We have a Catalog of existing UDFs built & maintained by us and the community.

You can find a host of different UDFs that can serve as a starting point or as inspiration to create your own UDFs:

UDF Catalog

You can also contribute your own UDFs to the community!

Debugging UDFs

The reality of writing code is that stuff breaks, often and sometimes in mysterious ways. Here's some of our recommendations for how to debug your UDFs

Use print()

UDFs return stdout either in Workbench Code Editor or locally when running fused.run(udf) so the easiest way to get info about your UDFs is to use good old print:

@fused.udf
def udf(n: int = 1):
print(f"{n=}")
return

Since Python 3.8 you can use f-string debugging which is what we recommend you use:

print(f"{my_fancy_variable=}")

This allows you to print many variables without getting lost with what is what

UDF Catalog

Type all your inputs

We strongly recommend you type all your inputs with the appropriate type:

@fused.udf
def udf(
bounds:fused.types.Tile=None, n:int=1
):
...
return

This has 2 effects:

  • It makes your code more readable to others
  • Fused only supports a few types at the moment. Any non-typed or unsupported types will be passed as str

Use time.time()

Sometimes you're not sure what's taking so long. The simplest way to figure this out is to use time.time():

Example: finding a slow process
@fused.udf
def udf():
import time
beginning_time = time.time()

# long processing step #1
time.sleep(5)
end_process_1 = time.time()
process_time_1 = round(
end_process_1 - beginning_time, 2
)
print(f"{process_time_1=}")


# short processing step
time.sleep(0.2)
process_time_2 = round(
time.time() - end_process_1, 2
)
print(f"{process_time_2=}")

return

Would give us:

>>> process_time_1=5.0
>>> process_time_2=0.2

Join the Discord for support

We host & run a Discord server where you can ask any questions! We or the community will do our best to help you out!

Discord