JobPool (class)
class JobPool
The JobPool class manages parallel job execution and result retrieval. It is returned by udf.map().
Basic usage
@fused.udf
def my_udf(x: int):
return x ** 2
# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])
# Wait and collect results as DataFrame
df = pool.collect()
# Or get raw results as list
pool.wait()
results = pool.results() # [1, 4, 9, 16, 25]
Monitoring progress
pool = my_udf.map(range(100))
# Show live progress
pool.tail()
# Check status
pool.status() # Series of status counts
pool.all_succeeded() # True if all jobs succeeded
pool.any_failed() # True if any job failed
Parallel processing within workers
Use n_processes_per_worker to combine multiple items per UDF invocation, reducing total invocations for large job counts:
# Process 4 items in parallel within each UDF instance
pool = my_udf.map(range(1000), n_processes_per_worker=4)
df = pool.collect()
Local engine execution
When using engine="local", the pool uses process-based execution. The pool automatically shuts down after collect():
pool = my_udf.map([1, 2, 3], engine="local")
df = pool.collect() # Pool automatically shuts down after collecting
Async methods
When using udf.map_async(), the returned pool has async variants of all key methods:
pool = await my_udf.map_async([1, 2, 3])
await pool.wait_async()
results = await pool.results_async()
results_now = await pool.results_now_async()
errors = await pool.errors_async()
first_error = await pool.first_error_async()
logs = await pool.logs_async()
first_log = await pool.first_log_async()
logs_df = await pool.logs_df_async()
success = await pool.success_async()
await pool.tail_async()
await pool.cancel_async()
all_succeeded
pool.all_succeeded() -> bool
True if all tasks finished with success.
any_failed
pool.any_failed() -> bool
True if any task finished with an error.
any_succeeded
pool.any_succeeded() -> bool
True if any task finished with success.
arg_df
pool.arg_df() -> pd.DataFrame
The arguments passed to runs as a DataFrame.
cancel
pool.cancel(
wait=False, # if True, wait for running tasks to complete
)
Cancel any pending (not running) tasks. Note it will not be possible to retry on the same JobPool later.
cancelled
pool.cancelled() -> dict[int, Any]
Retrieve the arguments that were cancelled and not run. Results are indexed by position in the args list.
collect
pool.collect(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame
Wait for all jobs and collect results into a DataFrame.
When using engine="local", collect() automatically shuts down the pool after collecting results.
df
pool.df(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame
Alias for collect().
done
pool.done() -> bool
True if all tasks have finished, regardless of success or failure.
errors
pool.errors() -> dict[int, Exception]
Retrieve the results that are currently done and are errors. Results are indexed by position in the args list.
first_error
pool.first_error() -> Exception | None
Retrieve the first (by order of arguments) error result, or None.
first_log
pool.first_log() -> str | None
Retrieve the first (by order of arguments) logs, or None.
logs
pool.logs() -> list[str | None]
Logs for each task. Incomplete tasks will be reported as None.
logs_df
pool.logs_df(
status_column="status", # column name for status (None to omit)
result_column="result", # column name for result (None to omit)
time_column="time", # column name for time (None to omit)
logs_column="logs", # column name for logs (None to omit)
exception_column=None, # column name for exceptions (None to omit)
include_exceptions=True, # include exceptions in result column
) -> pd.DataFrame
Get a DataFrame of results as they are currently. Includes columns for each argument passed, plus status, result, time, and logs.
n_jobs
The number of jobs in the pool.
pending
pool.pending() -> dict[int, Any]
Retrieve the arguments that are currently pending and not yet submitted.
results
pool.results(
return_exceptions=False, # if True, return exceptions instead of raising
) -> list[Any]
Retrieve all results of the job. Results are ordered by the order of the args list.
results_now
pool.results_now(
return_exceptions=False, # if True, return exceptions instead of raising
) -> dict[int, Any]
Retrieve the results that are currently done. Results are indexed by position in the args list.
retry
pool.retry()
Rerun any tasks in error or timeout states. Tasks are rerun in the same pool.
running
pool.running() -> dict[int, Any]
Retrieve the results that are currently running. Results are indexed by position in the args list.
shutdown
pool.shutdown(
wait=True, # if True, wait for all workers to complete
)
Shutdown the pool. Only applicable when using engine="local". Called automatically by collect().
status
pool.status() -> pd.Series
Return a Series indexed by status of task counts.
success
pool.success() -> dict[int, Any]
Retrieve the results that are currently done and are successful. Results are indexed by position in the args list.
tail
pool.tail(
stop_on_exception=False, # if True, stop when first exception occurs
)
Wait until all jobs are finished, printing statuses as they become available. Useful for interactively watching the state of the pool.
times
pool.times() -> list[timedelta | None]
Time taken for each task. Incomplete tasks will be reported as None.
total_time
pool.total_time(
since_retry=False, # if True, measure from last retry instead of start
) -> timedelta
Returns how long the entire job took. If only partial results are available, returns based on the last task to have been completed.
wait
pool.wait()
Wait until all jobs are finished. Use fused.options.show.enable_tqdm to enable/disable progress bar.