Scheduling UDFs
Schedule UDFs to run automatically on a recurring cron schedule. Scheduled UDFs execute on batch compute instances with no time limit, making them suitable for data pipelines, periodic refreshes, and automated workflows.
- Cronjobs can be scheduled at any combination of minute, hour, day of month, month, or day of week.
- Cronjobs can only be scheduled once per hour.
- Cronjobs run on batch compute instances with no execution time limit.
- There is a 1:1 relationship between a UDF and a cronjob — the same UDF cannot have multiple schedules.
- A Fused developer or enterprise account is required to schedule UDFs.
Creating a schedule
With fused.api.schedule_udf()
Use fused.api.schedule_udf() to schedule any UDF by name or object:
@fused.udf
def udf():
job = fused.api.schedule_udf(
udf=fused.load("my_etl_pipeline"),
minute=0,
hour=8,
udf_args={"source": "production"},
enabled=True,
)
print(job)
# <CronJob my_etl_pipeline (0 8 * * *)>
Cron field reference
| Field | Parameter | Range | Default |
|---|---|---|---|
| Minute | minute | 0–59 | required |
| Hour | hour | 0–23 | required |
| Day of month | day_of_month | 1–31 | Every day |
| Month | month | 1–12 | Every month |
| Day of week | day_of_week | 0–6 (0 = Sunday) | Every day |
Pass a single int for a specific value, or a list[int] for multiple values:
@fused.udf
def udf():
# Run every day at 4:59 AM
fused.api.schedule_udf(udf="my_udf", minute=59, hour=4)
# Run at midnight on the 1st and 15th of every month
fused.api.schedule_udf(udf="my_udf", minute=0, hour=0, day_of_month=[1, 15])
# Run every Monday (1) and Friday (5) at noon
fused.api.schedule_udf(udf="my_udf", minute=0, hour=12, day_of_week=[1, 5])
Passing arguments
Use udf_args to pass parameters to the scheduled UDF. These are forwarded as keyword arguments each time the job runs:
@fused.udf
def udf():
fused.api.schedule_udf(
udf=fused.load("job_to_run"),
minute=59,
hour=4,
udf_args={"message": "cron"},
enabled=True,
)
The target UDF receives them as typed parameters:
@fused.udf(cache_max_age="0s")
def udf(message: str = "hello from cron!"):
import pandas as pd
print(message)
return pd.DataFrame({"status": [message]})
Listing schedules
Use fused.api.schedule_list() to retrieve all scheduled jobs for your team:
@fused.udf
def udf():
jobs = fused.api.schedule_list()
print(f"Total jobs: {len(jobs)}")
for job in jobs:
print(f"{job.name} — {job._schedule} — enabled={job.enabled}")
To inspect schedules as a DataFrame:
@fused.udf(cache_max_age="0s")
def udf():
import pandas as pd
jobs = fused.api.schedule_list()
rows = []
for job in jobs:
row = job.model_dump()
rows.append({
"name": row.get("name"),
"enabled": row.get("enabled"),
"last_run": row.get("last_run"),
"udf_id": row.get("udf_id"),
})
return pd.DataFrame(rows)
You can also check a specific UDF's schedule with udf.get_schedule():
@fused.udf
def udf():
my_udf = fused.load("my_etl_pipeline")
schedules = my_udf.get_schedule()
print(schedules)
Updating a schedule
Calling fused.api.schedule_udf() (or udf.schedule()) with a UDF that already has a schedule will update the existing schedule rather than creating a duplicate. This makes schedule creation idempotent:
@fused.udf
def udf():
# Initially schedule at 8 AM
fused.api.schedule_udf(udf="my_udf", minute=0, hour=8)
# Change to 6 AM — updates the same cronjob
fused.api.schedule_udf(udf="my_udf", minute=0, hour=6)
To disable a schedule without deleting it, set enabled=False:
@fused.udf
def udf():
fused.api.schedule_udf(udf="my_udf", minute=0, hour=8, enabled=False)
Triggering a run manually
Use run_now() on a CronJob to trigger an immediate execution outside of the regular schedule:
@fused.udf
def udf():
jobs = fused.api.schedule_list()
for job in jobs:
if job.name == "my_etl_pipeline":
result = job.run_now()
print(result)
break
Deleting a schedule
Call .delete() on a CronJob to permanently remove it:
@fused.udf
def udf(name: str = "my_etl_pipeline"):
jobs = fused.api.schedule_list()
for job in jobs:
if job.name == name:
job.delete()
print(f"Deleted job: {job.name}")
break
Example: scheduled data pipeline
A complete example that fetches data and writes results to cloud storage, scheduled to run daily:
@fused.udf(cache_max_age="0s")
def daily_weather_sync(city: str = "San Francisco"):
import pandas as pd
import requests
from datetime import datetime
response = requests.get(
f"https://api.open-meteo.com/v1/forecast?latitude=37.77&longitude=-122.42¤t_weather=true"
)
data = response.json()["current_weather"]
df = pd.DataFrame([{
"city": city,
"temperature": data["temperature"],
"windspeed": data["windspeed"],
"timestamp": datetime.utcnow().isoformat(),
}])
path = f"fd://weather/{city}/{datetime.utcnow().strftime('%Y-%m-%d')}.parquet"
df.to_parquet(path)
print(f"Wrote {len(df)} rows to {path}")
return df
Schedule it to run every day at 6:00 AM:
@fused.udf
def udf():
job = fused.api.schedule_udf(
udf=fused.load("daily_weather_sync"),
minute=0,
hour=6,
udf_args={"city": "San Francisco"},
)
print(job)
# <CronJob daily_weather_sync (0 6 * * *)>
Monitor execution results in the Job logs page.
API reference
| Function | Description |
|---|---|
fused.api.schedule_udf() | Schedule a UDF on a cron schedule |
fused.api.schedule_list() | List all scheduled jobs |
udf.schedule() | Schedule a UDF (instance method) |
udf.get_schedule() | Get existing schedules for a UDF |