Call UDFs asynchronously
A UDF can be called asynchronously using the async/await syntax. A common implementation is to call a UDF multiple times in parallel with different parameters then combine the results.
Setting sync=False
in fused.run
is intended for asynchronous calls when running in the cloud with engine='remote'
. The parameter has no effect if the UDF is ran in the local environment with engine='local'
.
To illustrate this concept, let's create a simple UDF and save it as udf_to_run_async
in the workbench:
@fused.udf
def udf(date: str='2020-01-01'):
import pandas as pd
import time
time.sleep(2)
return pd.DataFrame({'selected_date': [date]})
We can not pass a UDF object directly to fused.run
. Asynchronous execution is only supported for saved UDFs specifed by name or token.
We can now invoke the UDF asynchronously for each date in the dates
list and concatenate the results:
async def parent_fn():
import pandas as pd
import asyncio
# Parameter to loop through
dates = ['2020-01-01', '2021-01-01', '2022-01-01', '2023-01-01']
# Invoke the UDF as coroutines
promises_dfs = []
for date in dates:
df = fused.run("udf_to_run_async", date=date, engine='remote', sync=False)
promises_dfs.append(df)
# Run concurrently and collect the results
dfs = await asyncio.gather(*promises_dfs)
return pd.concat(dfs)
nest_asyncio might be required to run UDFs async from Jupyter Notebooks.
!pip install nest-asyncio -q
import nest_asyncio
nest_asyncio.apply()