Build & Running UDFs
An opinionated guide to making the most out of Fused UDFs
Fused UDFs are Python functions that run on serverless compute and can be called from anywhere with fused.run(udf)
. This guide is a resource meant to help you on your way to making the most out of UDFs.
A short reminder: The anatomy of a UDF
@fused.udf
def udf(my_awesome_input: int = 1):
import pandas as pd
return pd.DataFrame({"Look at my output: ": [my_awesome_input]})
Each UDF has a few specific elements:
- The
@fused.udf
decorator - Arguments -ideally typed-
- Imports inside the function
- Some logic
- A supported
return
object
All of this is explained in the "Write UDF" section in much more details.
You can then run UDFs from anywhere with fused.run(udf)
. These are still Python functions, giving you a lot of flexibility on what oyu can do, but we have some recommendations for keeping them fast & efficient.
Writing efficient UDFs
Keep things small
The main benefit of Fused UDFs is how responsive they are. To do so, they run on Python serverless compute, but quickly timeout. The best way to do that is to keep things fast is to keep them small:
- Break pipelines into single tasks UDFs
- Leverage
fused.run()
to chain together UDFs
Example: Breaking down a complex pipeline into smaller UDFs
❌ Not recommended:
@fused.udf
def inefficient_pipeline_udf(data_path):
import pandas as pd
df = pd.read_csv(data_path)
# Some complicated processing logic to create df_processed
processed_df = ...
return processed_df
✅ Instead, break it down:
@fused.udf
def load_data_udf(data_path):
import pandas as pd
return pd.read_csv(data_path)
@fused.udf
def process_data_udf(df):
import pandas as pd
# Some complicated processing logic to create df_processed
processed_df = ...
return processed_df
@fused.udf
def pipeline_udf(data_path):
import pandas as pd
df = fused.run(load_data_udf, data_path=data_path)
processed_df = fused.run(process_data_udf, df=df)
return processed_df
Run often, Iterate quickly
Just like writing short cells when developing in a Jupyter Notebook, we recommend you keep your UDFs short & fast to execute
⚡️ Aim for UDFs that take up to 1min to run
UDFs run with fused.run()
time out after 120s so we recommend you keep a buffer in case your UDF takes a bit longer to execute
Visual: UDF timing guideline
This is a breakdown of what happens when you run a UDF with fused.run()
and why we recommend you keep your UDFs at the 30s-1min mark:
But what if I want a longer UDF?
A lot of processing of large datasets sometimes doesn't fit in a 30s-1min job and needs to run for longer. You have a few options:
- Can you break your pipeline into smaller UDFs?
- You can run multiple small UDF in parallel
- If you need a much longer run or simply need more RAM, you can run a large UDF with our offline instances
Cache as much as you can
Fused relies heavily on caching repetitive tasks to make recurring calls much faster (and more compute efficient)
✅ You want to use caching for functions with inputs that are recurring:
- Loading a dataset
- Computing a recurring operation with default variables
- Intermediate results you'll reuse soon
❌ When not to use caching:
- In most cases, for functions taking
bbox
as an argument -> your function + input cache would get re-generated for each newbbox
(which changes each time you pan around in Workbench Map view for example) - Data you want others in your team or external to Fused to use. You're better off writing your data to cloud storage like
s3
orgcs
Example: Caching a repetitive task
Re-using the example from keeping things small:
❌ Not recommended:
@fused.udf
def inefficient_pipeline_udf(data_path):
import pandas as pd
df = pd.read_csv(data_path)
# Some complicated processing logic to create df_processed
processed_df = ...
return processed_df
✅ Instead, break it down AND cache the calls:
@fused.udf
def load_data_udf(data_path):
import pandas as pd
return pd.read_csv(data_path)
@fused.udf
def process_data_udf(df):
import pandas as pd
# Some complicated processing logic to create df_processed
# ...
return processed_df
@fused.udf
def pipeline_udf(data_path):
import pandas as pd
@fused.cache
def load_data(data_path):
return fused.run(load_data_udf, data_path=data_path)
@fused.cache
def process_data(df):
return fused.run(process_data_udf, df=df)
df = load_data(data_path)
processed_df = process_data(df)
return processed_df
Read more about the caching details:
- in the dedicated section
- How you can use cache to speed up exploration of slow to read datasets
Prepare your large datasets
Fused works at its best with data that is fast to read and can be read in tiles or chunks. We know that most of the data out there isn't in the most efficient file formats which is why we provide tools to ingest your own data into cloud-optimized, partitioned formats.
We have a dedicated page for when you should consider ingesting your own data. As a rule of thumb you want to consider ingesting your data when:
- Files are read multiple times and >100MB
- Files that are slow or require some processing to open (
.zip
for example)
Don't start from scratch: UDF Catalog
Just like using libraries in Python to leverage existing tools, you don't need to start from scratch in Fused. We have a Catalog of existing UDFs built & maintained by us and the community.
You can find a host of different UDFs that can serve as a starting point or as inspiration to create your own UDFs:
- Open datasets from common open repository like STAC catalogs
- Run on the fly ML prediction on satellite images
- Compute a spatial join between 2 datasets
You can also contribute your own UDFs to the community!
Debugging UDFs
The reality of writing code is that stuff breaks, often and sometimes in mysterious ways. Here's some of our recommendations for how to debug your UDFs
Use print()
UDFs return stdout
either in Workbench Code Editor or locally when running fused.run(udf)
so the easiest way to get info about your UDFs is to use good old print
:
@fused.udf
def udf(n: int = 1):
print(f"{n=}")
return
Since Python 3.8 you can use f-string debugging which is what we recommend you use:
print(f"{my_fancy_variable=}")
This allows you to print many variables without getting lost with what is what
- Workbench
- notebook
Type all your inputs
We strongly recommend you type all your inputs with the appropriate type:
@fused.udf
def udf(
bbox:fused.types.TileGDF=None, n:int=1
):
...
return
This has 2 effects:
- It makes your code more readable to others
- Fused only supports a few types at the moment. Any non-typed or unsupported types will be passed as
str
Use time.time()
Sometimes you're not sure what's taking so long. The simplest way to figure this out is to use time.time()
:
Example: finding a slow process
@fused.udf
def udf():
import time
beginning_time = time.time()
# long processing step #1
time.sleep(5)
end_process_1 = time.time()
process_time_1 = round(
end_process_1 - beginning_time, 2
)
print(f"{process_time_1=}")
# short processing step
time.sleep(0.2)
process_time_2 = round(
time.time() - end_process_1, 2
)
print(f"{process_time_2=}")
return
Would give us:
>>> process_time_1=5.0
>>> process_time_2=0.2
Join the Discord for support
We host & run a Discord server where you can ask any questions! We or the community will do our best to help you out!