Call UDFs asynchronously
A UDF can be called asynchronously using the async/await syntax. A common implementation is to call a UDF multiple times in parallel with different parameters then combine the results.
note
Setting sync=False
in fused.run
is intended for asynchronous calls when running in the cloud with engine='realtime'
. The parameter has no effect if the UDF is ran in the local environment with engine='local'
.
This UDF illustrates the concept. It calls udf_to_run_async
asynchronously for each date in the dates
list and concatenates the results.
@fused.udf
async def udf():
import pandas as pd
import asyncio
@fused.udf
def udf_to_run_async(date: str='2020-01-01'):
import pandas as pd
import time
time.sleep(2)
return pd.DataFrame({'selected_date': [date]})
# Parameter to loop through
dates = ['2020-01-01', '2021-01-01', '2022-01-01', '2023-01-01']
# Invoke the UDF as coroutines
promises_dfs = []
for date in dates:
df = fused.run(udf_to_run_async, date=date, engine='realtime', sync=False)
promises_dfs.append(df)
# Run concurrently and collect the results
dfs = await asyncio.gather(*promises_dfs)
return pd.concat(dfs)
note
nest_asyncio might be required to run UDFs async from Jupyter Notebooks.
!pip install nest-asyncio -q
import nest_asyncio
nest_asyncio.apply()