Skip to main content

Scheduling UDFs

Schedule UDFs to run automatically on a recurring cron schedule. Scheduled UDFs execute on batch compute instances with no time limit, making them suitable for data pipelines, periodic refreshes, and automated workflows.

Key constraints
  • Cronjobs can be scheduled at any combination of minute, hour, day of month, month, or day of week.
  • Cronjobs can only be scheduled once per hour.
  • Cronjobs run on batch compute instances with no execution time limit.
  • There is a 1:1 relationship between a UDF and a cronjob — the same UDF cannot have multiple schedules.
  • A Fused developer or enterprise account is required to schedule UDFs.

Creating a schedule

With fused.api.schedule_udf()

Use fused.api.schedule_udf() to schedule any UDF by name or object:

@fused.udf
def udf():
job = fused.api.schedule_udf(
udf=fused.load("my_etl_pipeline"),
minute=0,
hour=8,
udf_args={"source": "production"},
enabled=True,
)
print(job)
# <CronJob my_etl_pipeline (0 8 * * *)>

Cron field reference

FieldParameterRangeDefault
Minuteminute0–59required
Hourhour0–23required
Day of monthday_of_month1–31Every day
Monthmonth1–12Every month
Day of weekday_of_week0–6 (0 = Sunday)Every day

Pass a single int for a specific value, or a list[int] for multiple values:

@fused.udf
def udf():
# Run every day at 4:59 AM
fused.api.schedule_udf(udf="my_udf", minute=59, hour=4)

# Run at midnight on the 1st and 15th of every month
fused.api.schedule_udf(udf="my_udf", minute=0, hour=0, day_of_month=[1, 15])

# Run every Monday (1) and Friday (5) at noon
fused.api.schedule_udf(udf="my_udf", minute=0, hour=12, day_of_week=[1, 5])

Passing arguments

Use udf_args to pass parameters to the scheduled UDF. These are forwarded as keyword arguments each time the job runs:

@fused.udf
def udf():
fused.api.schedule_udf(
udf=fused.load("job_to_run"),
minute=59,
hour=4,
udf_args={"message": "cron"},
enabled=True,
)

The target UDF receives them as typed parameters:

@fused.udf(cache_max_age="0s")
def udf(message: str = "hello from cron!"):
import pandas as pd
print(message)
return pd.DataFrame({"status": [message]})

Listing schedules

Use fused.api.schedule_list() to retrieve all scheduled jobs for your team:

@fused.udf
def udf():
jobs = fused.api.schedule_list()
print(f"Total jobs: {len(jobs)}")

for job in jobs:
print(f"{job.name}{job._schedule} — enabled={job.enabled}")

To inspect schedules as a DataFrame:

@fused.udf(cache_max_age="0s")
def udf():
import pandas as pd

jobs = fused.api.schedule_list()
rows = []
for job in jobs:
row = job.model_dump()
rows.append({
"name": row.get("name"),
"enabled": row.get("enabled"),
"last_run": row.get("last_run"),
"udf_id": row.get("udf_id"),
})

return pd.DataFrame(rows)

You can also check a specific UDF's schedule with udf.get_schedule():

@fused.udf
def udf():
my_udf = fused.load("my_etl_pipeline")
schedules = my_udf.get_schedule()
print(schedules)

Updating a schedule

Calling fused.api.schedule_udf() (or udf.schedule()) with a UDF that already has a schedule will update the existing schedule rather than creating a duplicate. This makes schedule creation idempotent:

@fused.udf
def udf():
# Initially schedule at 8 AM
fused.api.schedule_udf(udf="my_udf", minute=0, hour=8)

# Change to 6 AM — updates the same cronjob
fused.api.schedule_udf(udf="my_udf", minute=0, hour=6)

To disable a schedule without deleting it, set enabled=False:

@fused.udf
def udf():
fused.api.schedule_udf(udf="my_udf", minute=0, hour=8, enabled=False)

Triggering a run manually

Use run_now() on a CronJob to trigger an immediate execution outside of the regular schedule:

@fused.udf
def udf():
jobs = fused.api.schedule_list()
for job in jobs:
if job.name == "my_etl_pipeline":
result = job.run_now()
print(result)
break

Deleting a schedule

Call .delete() on a CronJob to permanently remove it:

@fused.udf
def udf(name: str = "my_etl_pipeline"):
jobs = fused.api.schedule_list()
for job in jobs:
if job.name == name:
job.delete()
print(f"Deleted job: {job.name}")
break

Example: scheduled data pipeline

A complete example that fetches data and writes results to cloud storage, scheduled to run daily:

@fused.udf(cache_max_age="0s")
def daily_weather_sync(city: str = "San Francisco"):
import pandas as pd
import requests
from datetime import datetime

response = requests.get(
f"https://api.open-meteo.com/v1/forecast?latitude=37.77&longitude=-122.42&current_weather=true"
)
data = response.json()["current_weather"]

df = pd.DataFrame([{
"city": city,
"temperature": data["temperature"],
"windspeed": data["windspeed"],
"timestamp": datetime.utcnow().isoformat(),
}])

path = f"fd://weather/{city}/{datetime.utcnow().strftime('%Y-%m-%d')}.parquet"
df.to_parquet(path)
print(f"Wrote {len(df)} rows to {path}")
return df

Schedule it to run every day at 6:00 AM:

@fused.udf
def udf():
job = fused.api.schedule_udf(
udf=fused.load("daily_weather_sync"),
minute=0,
hour=6,
udf_args={"city": "San Francisco"},
)
print(job)
# <CronJob daily_weather_sync (0 6 * * *)>

Monitor execution results in the Job logs page.

API reference

FunctionDescription
fused.api.schedule_udf()Schedule a UDF on a cron schedule
fused.api.schedule_list()List all scheduled jobs
udf.schedule()Schedule a UDF (instance method)
udf.get_schedule()Get existing schedules for a UDF