Small UDF run
Fused UDF functions really shine once you start calling them from anywhere. You can call small jobs in 2 main ways:
fused.run()
in Python. All you need is thefused
Python package installed- Useful when wanting to run UDF as part of another pipeline, inside another UDF or anywhere in Python / code.
- HTTP call from anywhere
- Useful when you want to call a UDF outside of Python. For example receiving a dataframe into Google Sheets or plotting points and images in a Felt map
Defining "Small" job
"Small" jobs are defined as any job being:
- Less than 120s to execute
- Using less than a few Gb of RAM to run
These jobs run in "real-time" with no start-up time so are quick to run, but with limited resources and time-out if taking too long.
fused.run()
fused.run()
is the simplest & most common way to execute a UDF from any Python script or notebook.
The simplest way to call a public UDF is using a public UDF name and calling it as: UDF_
+ name. Let's take this UDF that returns the location of the Eiffel Tower in a GeoDataFrame
as an example:
import fused
fused.run("UDF_Single_point_Eiffel_Tower")
There are a few other ways to run a UDF:
- By name from your account
- By public UDF name
- Using a token
- Using a
udf
object - From Github URL
- From git commit hash (most recommended for teams)
Name (from your account)
When to use: When calling a UDF you made, from your own account.
You can call any UDFs you have made simply by calling it by name (given when you save a UDF).
(Note: This requires authentication)
This UDF can then be run in a notebook locally (granted that you have authenticated):
fused.run("Hello_World_bbox")
Public UDF Name
When to run: Whenever you want to run a public UDF for free from anywhere
Any UDF saved in the public UDF repo can be run for free.
Reference them by prefixing their name with UDF_
. For example, the public UDF Get_Isochrone
is run with UDF_Get_Isochrone
:
fused.run('UDF_Get_Isochrone')
Token
When to use: Whenever you want someone to be able to execute a UDF but might not want to share the code with them.
You can get the token from a UDF either in Workbench (Save your UDF then click "Share") or returning the token in Python.
Here's a toy UDF that we want others to be able to run, but we don't want them to see the code:
import fused
@fused.udf()
def my_super_duper_private_udf(my_udf_input):
import pandas as pd
# This code is so private I don't want anyone to be able to read it
return pd.DataFrame({"input": [my_udf_input]})
We then need to save this UDF to Fused server to make it accessible from anywhere.
my_super_duper_private_udf.to_fused()
my_udf.to_fused()
saves your UDF to your personal user UDFs. These are private to you and your team. You can create a token than anyone (even outside your team) can use to run your UDF but by default these UDFs are private.
We can create a token for this my_super_duper_private_udf
and share it:
from fused.api import FusedAPI
api = FusedAPI()
token = api.create_udf_access_token("my_super_duper_private_udf").token
print(token)
This would return something like: 'fsh_**********q6X'
(You can recognise this to be a shared token because it starts with fsh_
)
fused.run(token, my_udf_input="I'm directly using the token object")
or directly:
fused.run('fsh_**********q6X', my_udf_input="I can't see you're private UDF but can still run it")
UDF object
When to run: When you're writing your UDF in the same Python file / jupyter notebook and want to refer to the Python object directly. You might want to do this to test your UDF works locally for example
You may also pass a UDF Python object to fused.run
:
# Running a local UDF
@fused.udf
def local_udf():
import pandas as pd
return pd.DataFrame({})
# Note that by default fused.run() will run your UDF on the Fused serverless server so we pass engine='local' to run this as a normal Python function
fused.run(local_udf, engine='local')
Github URL
When to use: [Not recommended] This is useful if you're working on a branch that you control over. This method always points to the last commit on a branch so your UDF can break without you knowing if someone else pushes a new commit or merges & deletes your branch
gh_udf = fused.load("https://github.com/fusedio/udfs/tree/main/public/REM_with_HyRiver/")
fused.run(gh_udf)
We do NOT recommend you use this approach as your UDF might break if changes are done to it
Especially using a URL pointing to a main
branch means that your UDF will change if someone else pushes towards it, in a way that isn't visible to you.
For that reason we recommend using git commit hash instead
Git commit hash (recommended for most stable use cases)
When to use: Whenever you want to rely on a UDF such as in production or when using a UDF as a building block for another UDF.
This is the safest way to use a UDF. Since you're pointing to a specific git commit hash you won't end up with changes breaking your UDF.
Using a git commit hash is the safest, and thus recommended way to call UDFs from Github.
This does mean you need to update the commit where your UDFs are being called if you want to propagate updates. But this gives you the most amount of control.
Let's again take the example of the Simple Eiffel Tower UDF:
commit_hash = "bdfb4d0"
commit_udf = fused.load(f"https://github.com/fusedio/udfs/tree/{commit_hash}/public/Single_point_Eiffel_Tower/")
fused.run(commit_udf)
Team UDF Names
Team UDFs can be loaded or run by specifying the name "team", as in:
fused.load("team/udf_name")
This can be helpful when collaborating with team members as this does not require making a shared token
Execution engines
fused.run
can run the UDF in various execution modes, as specified by the engine
parameter either local or remote
local
: Run in the current process.remote
: Run in the serverless Fused cloud engine (this is the default).
# By default, fused.run will use the remote engine
fused.run(my_udf)
# To run locally, explicitly specify engine="local"
fused.run(my_udf, engine="local")
⚠️ Important change:
fused.run()
now defaults toengine="remote"
in all cases, even when users are not authenticated. Previously, it would default toengine="local"
for unauthenticated users. If you are not authenticated, you must explicitly specifyengine="local"
to run UDFs locally.
Set sync=False
to run a UDF asynchronously.
Passing arguments in fused.run()
A typical fused.run()
call of a UDF looks like this:
@fused.udf
def my_udf(inputs: str):
import pandas as pd
return pd.DataFrame({"output": [inputs]})
fused.run(my_udf, inputs="hello world")
A fused.run()
call will require the following arguments:
- [Mandatory] The first argument needs to be the UDF to run (name, object, token, etc as seen above)
- [Optional] Any arguments of the UDF itself (if it has any). In the example above that's
inputs
becausemy_udf
takesinputs
as argument. - [Optional] Any protected arguments as seen in the dedicated API docs page (if applicable). These include for example:
bounds
-> A geographical bounding box (as a list of 4 point:[min_x, min_y, max_x, max_y]
) defining the area of interest.cache_max_age
-> The maximum age of the UDF's cache.
Running multiple jobs in parallel
Sometimes you want to run a UDF over a list of inputs (for example running a UDF that unzips a file over a list of files). If each run itself is quite small, than you can run all a batch of UDFs over a list of inputs.
Let's use a simple UDF to demonstrate:
@fused.udf
def udf(val):
import pandas as pd
return pd.DataFrame({'val':[val]})
Say we wanted to run this udf
10 times over:
inputs = [0,1,2,3,4,5,6,7,8,9]
fused.submit()
Fused is built to help you scale your processing to huge datasets and the core of this ability is fused.submit()
. You can run 1 UDF over a large amount of arguments:
job_pool = fused.submit(udf, inputs)
>>> 100% | 10/10
fused.submit()
runs all these jobs in parallel and defaults to directly returning the results back to you as a dataframe:
job_pool
>>>
val
val
0 0 0
1 0 1
...
Advanced fused.submit()
options
Blocking vs non-blocking calls
By default we've setup fused.submit()
to be a blocking, meaning it will wait for all the jobs to finish before returning the results.
However, you might want to just kick off these jobs and continue executing the rest of your Python code:
job_pool = fused.submit(udf, inputs, collect=False)
job_pool
then becomes an object you can query to get status of your jobs:
job_pool.get_status_df()
-> Returns a dataframe of the status of each job_pool:
job_pool.get_status_df()
>>>
val status result
0 0 success val 0 0
1 1 success val 0 1
2 2 success val 0 2
...
job_pool.collect()
-> Returns a dataframe of the results of all the jobs once they're all finished (equivalent to defaultfused.submit()
behaviour):
job_pool.collect()
>>>
val
val
0 0 0
1 0 1
...
Debug mode
Sometimes you might just want to make sure your code is running correctly before kicking off a large number of jobs. That's what Debug Mode allows you do to:
job = fused.submit(udf, inputs, debug_mode=True)
This will run the first item in inputs
directly using fused.run()
(equivalent to fused.run(udf, inputs[0])
) and then return the results:
job
>>>
val
0 0
You can then set debug_mode
back to False and be more confident that your UDF is working as expected!
Execution parameters
fused.submit()
also have parameters giving you more control over the execution. See the Python SDK docs page for more details:
max_workers
: The number of workers to use for the job pool.engine
:local
orremote
(default isremote
). Just likefused.run()
, by defaultfused.submit()
will run the UDF in the Fused server (engine='remote'
). You can setengine='local'
to runudf
locally either on your machine or inside a large machine that you spin up.max_retry
: The maximum number of retries for a job.
Benchmarking
Simple fused.submit()
Benchmark
fused.submit(udf)
runs all the UDF calls in parallel, making it a helpful tool to run multiple UDFs all at once.
We can demonstrate this by adding a simple time.sleep(1)
in our original UDF:
@fused.udf
def udf(val):
import pandas as pd
import time
time.sleep(1)
return pd.DataFrame({'val':[val]})
In a notebook, we can time how long each cell takes to execute with the %%time
magic command
# In a jupyter notebook
%%time
fused.run(udf, val=1)
This takes 2s: A few ms of overhead to send the UDF to Fused server & run + 1s of time.sleep(1)
Now using fused.submit()
to run this over 50 UDFs:
This takes a few more seconds, but not 100s. fused.submit()
is a helpful way to scale a single UDF to many inputs in a timely manner.
Example use cases
fused.submit()
is used in many places across our docs, here are some examples:
- ⛴️ In the Dark Vessel Detection example to scale retrieving daily AIS
.zip
files from NOAA over 30 days. - 🛰️ Retrieving all of Maxar's Open Data STAC Catalogs across every events they have imagery for.
- 💡 Check the Best Practices for more on when to use
submit()
and when to use other methods.
HTTP requests
In the UDF Builder, you can create an HTTP endpoint for a UDF in the "Snippets" section. This generates a unique URL to call the UDF via HTTP requests. The URL is scoped to that UDF only and it can be revoked to disable access. The same can be done with the Fused Python SDK.
Shared token
To run a UDF via HTTP request, generate a shared token and use the provided URL. Manage your account's shared tokens in fused.io/profile#tokens.
Structure the URL with the file
path parameter to run as a single batch operation.
https://www.fused.io/server/v1/realtime-shared/******/run/file?dtype_out_raster=png
To integrate with a tiling service, structure the URL with the tiles
path parameter, followed by templated /{z}/{x}/{y}
path parameters. See Lonboard for an example.
https://www.fused.io/server/v1/realtime-shared/******/run/tiles/{z}/{x}/{y}?dtype_out_raster=png
Private token
Calling UDFs with Bearer authentication requires an account's private token. The URL structure to run UDFs with the private token varies slightly, as the URL specifies the UDF's name and the owner's user account.
curl -XGET "https://app.fused.io/server/v1/realtime/fused/api/v1/run/udf/saved/user@fused.io/caltrain_live_location?dtype_out_raster=png" -H "Authorization: Bearer $ACCESS_TOKEN"
Specify parameters
When UDF endpoints are called via HTTP requests argument values are specified with query parameters, which require input parameters to be serializable. As such, the UDF should specify the types to cast them to. Read more about supported types for UDF parameters.
Response data types
The dtype_out_vector
and dtype_out_raster
parameters define the output data type for vector tables and raster arrays, respectively.
- The supported types for vector tables are
parquet
,geojson
,json
,feather
,csv
,mvt
,html
,excel
, andxml
. - For raster array:
png
,gif
,jpg
,jpeg
,webp
,tif
, andtiff
.
https://www.fused.io/server/v1/realtime-shared/****/run/file?dtype_out_raster=png
Read how to structure HTTP endpoints to call the UDF as a Map Tile & File.
Caching responses
If a UDF's cache is enabled, its endpoints cache outputs for each combination of code and parameters. The first call runs and caches the UDF, and subsequent calls return cached data.