Skip to main content

Build & Running UDFs

An opinionated guide to making the most out of Fused UDFs

Fused UDFs are Python functions that run on serverless compute and can be called from anywhere with fused.run(udf). This guide is a resource meant to help you on your way to making the most out of UDFs.

A short reminder: The anatomy of a UDF

@fused.udf
def udf(my_awesome_input: int = 1):
import pandas as pd

return pd.DataFrame({"Look at my output: ": [my_awesome_input]})

Each UDF has a few specific elements:

note

All of this is explained in the "Write UDF" section in much more details.

You can then run UDFs from anywhere with fused.run(udf). These are still Python functions, giving you a lot of flexibility on what oyu can do, but we have some recommendations for keeping them fast & efficient.

Writing efficient UDFs

Keep things small

The main benefit of Fused UDFs is how responsive they are. To do so, they run on Python serverless compute, but quickly timeout. The best way to do that is to keep things fast is to keep them small:

  • Break pipelines into single tasks UDFs
  • Leverage fused.run() to chain together UDFs
Example: Breaking down a complex pipeline into smaller UDFs

❌ Not recommended:

@fused.udf
def inefficient_pipeline_udf(data_path):
import pandas as pd

df = pd.read_csv(data_path)
# Some complicated processing logic to create df_processed
processed_df = ...

return processed_df

✅ Instead, break it down:

@fused.udf
def load_data_udf(data_path):
import pandas as pd
return pd.read_csv(data_path)
@fused.udf
def process_data_udf(df):
import pandas as pd

# Some complicated processing logic to create df_processed
processed_df = ...

return processed_df
@fused.udf
def pipeline_udf(data_path):
import pandas as pd

df = fused.run(load_data_udf, data_path=data_path)
processed_df = fused.run(process_data_udf, df=df)

return processed_df

Run often, Iterate quickly

Just like writing short cells when developing in a Jupyter Notebook, we recommend you keep your UDFs short & fast to execute

⚡️ Aim for UDFs that take up to 1min to run

UDFs run with fused.run() time out after 120s so we recommend you keep a buffer in case your UDF takes a bit longer to execute

Visual: UDF timing guideline

This is a breakdown of what happens when you run a UDF with fused.run() and why we recommend you keep your UDFs at the 30s-1min mark:

UDF Design Guidelines

But what if I want a longer UDF?

A lot of processing of large datasets sometimes doesn't fit in a 30s-1min job and needs to run for longer. You have a few options:

Cache as much as you can

Fused relies heavily on caching repetitive tasks to make recurring calls much faster (and more compute efficient)

✅ You want to use caching for functions with inputs that are recurring:

  • Loading a dataset
  • Computing a recurring operation with default variables
  • Intermediate results you'll reuse soon

❌ When not to use caching:

  • In most cases, for functions taking bbox as an argument -> your function + input cache would get re-generated for each new bbox (which changes each time you pan around in Workbench Map view for example)
  • Data you want others in your team or external to Fused to use. You're better off writing your data to cloud storage like s3 or gcs
Example: Caching a repetitive task

Re-using the example from keeping things small:

❌ Not recommended:

@fused.udf
def inefficient_pipeline_udf(data_path):
import pandas as pd

df = pd.read_csv(data_path)
# Some complicated processing logic to create df_processed
processed_df = ...

return processed_df

✅ Instead, break it down AND cache the calls:

@fused.udf
def load_data_udf(data_path):
import pandas as pd
return pd.read_csv(data_path)
@fused.udf
def process_data_udf(df):
import pandas as pd
# Some complicated processing logic to create df_processed
# ...
return processed_df
@fused.udf
def pipeline_udf(data_path):
import pandas as pd

@fused.cache
def load_data(data_path):
return fused.run(load_data_udf, data_path=data_path)

@fused.cache
def process_data(df):
return fused.run(process_data_udf, df=df)

df = load_data(data_path)
processed_df = process_data(df)

return processed_df
tip

Read more about the caching details:

Prepare your large datasets

Fused works at its best with data that is fast to read and can be read in tiles or chunks. We know that most of the data out there isn't in the most efficient file formats which is why we provide tools to ingest your own data into cloud-optimized, partitioned formats.

We have a dedicated page for when you should consider ingesting your own data. As a rule of thumb you want to consider ingesting your data when:

  • Files are read multiple times and >100MB
  • Files that are slow or require some processing to open (.zip for example)

Don't start from scratch: UDF Catalog

Just like using libraries in Python to leverage existing tools, you don't need to start from scratch in Fused. We have a Catalog of existing UDFs built & maintained by us and the community.

You can find a host of different UDFs that can serve as a starting point or as inspiration to create your own UDFs:

UDF Catalog

You can also contribute your own UDFs to the community!

Debugging UDFs

The reality of writing code is that stuff breaks, often and sometimes in mysterious ways. Here's some of our recommendations for how to debug your UDFs

Use print()

UDFs return stdout either in Workbench Code Editor or locally when running fused.run(udf) so the easiest way to get info about your UDFs is to use good old print:

@fused.udf
def udf(n: int = 1):
print(f"{n=}")
return

Since Python 3.8 you can use f-string debugging which is what we recommend you use:

print(f"{my_fancy_variable=}")

This allows you to print many variables without getting lost with what is what

UDF Catalog

Type all your inputs

We strongly recommend you type all your inputs with the appropriate type:

@fused.udf
def udf(
bbox:fused.types.TileGDF=None, n:int=1
):
...
return

This has 2 effects:

  • It makes your code more readable to others
  • Fused only supports a few types at the moment. Any non-typed or unsupported types will be passed as str

Use time.time()

Sometimes you're not sure what's taking so long. The simplest way to figure this out is to use time.time():

Example: finding a slow process
@fused.udf
def udf():
import time
beginning_time = time.time()

# long processing step #1
time.sleep(5)
end_process_1 = time.time()
process_time_1 = round(
end_process_1 - beginning_time, 2
)
print(f"{process_time_1=}")


# short processing step
time.sleep(0.2)
process_time_2 = round(
time.time() - end_process_1, 2
)
print(f"{process_time_2=}")

return

Would give us:

>>> process_time_1=5.0
>>> process_time_2=0.2

Join the Discord for support

We host & run a Discord server where you can ask any questions! We or the community will do our best to help you out!

Discord