JobPool (class)
class JobPool
The JobPool class manages parallel job execution and result retrieval. It is returned by udf.map().
Basic usage
@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])
# Wait and collect results as DataFrame
df = pool.df()
return df
@fused.udf
def my_udf(x: int):
return x ** 2
my_udf is defined in the same file above, so there is no need to load it.
Monitoring progress
pool = my_udf.map(range(100))
# Show live progress
pool.tail()
# Check status
pool.status() # Series of status counts
pool.all_succeeded() # True if all jobs succeeded
pool.any_failed() # True if any job failed
Parallel processing within workers
Use worker_concurrency to combine multiple items per UDF invocation, reducing total invocations for large job counts:
@fused.udf
def udf():
# Running 10 jobs in parallel but with 4 jobs per UDF instance
pool = my_udf.map(range(10), worker_concurrency=4)
df = pool.df()
return df
@fused.udf
def my_udf(x: int):
return x ** 2
Local engine execution
When using engine="local", the pool uses process-based execution. The pool automatically shuts down after df():
@fused.udf
def udf():
# All jobs are executed locally
pool = my_udf.map(range(10), engine="local")
df = pool.df() # Pool automatically shuts down after collecting
return df
@fused.udf
def my_udf(x: int):
return x ** 2
Async methods (AsyncJobPool)
When using udf.map_async(), the returned object is an AsyncJobPool (subclass of JobPool) with async variants of the key methods. Use them in async code so they don't block the event loop (e.g. in Workbench):
@fused.udf
async def udf():
pool = await my_udf.map_async([1, 2, 3])
await pool.wait_async()
results = await pool.results_async()
return results
@fused.udf
def my_udf(x: int):
return x ** 2
The async variants are documented below in alphabetical order with the sync methods.
all_succeeded
pool.all_succeeded() -> bool
True if all tasks finished with success.
any_failed
pool.any_failed() -> bool
True if any task finished with an error.
any_succeeded
pool.any_succeeded() -> bool
True if any task finished with success.
arg_df
pool.arg_df() -> pd.DataFrame
The arguments passed to runs as a DataFrame.
cancel
pool.cancel(
wait=False, # if True, wait for running tasks to complete
)
Cancel any pending (not running) tasks. Note it will not be possible to retry on the same JobPool later.
cancel_async
await pool.cancel_async(wait: bool = False)
Cancel any pending (not running) tasks. (AsyncJobPool only.)
cancelled
pool.cancelled() -> dict[int, Any]
Retrieve the arguments that were cancelled and not run.
collect
pool.collect(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame
Alias for df().
df
pool.df(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame
Collect all results into a DataFrame.
When using engine="local", df() automatically shuts down the pool after collecting results.
done
pool.done() -> bool
True if all tasks have finished, regardless of success or failure.
errors
pool.errors() -> dict[int, Exception]
Retrieve the results that are currently done and are errors. Results are indexed by position in the args list.
errors_async
await pool.errors_async() -> dict[int, Exception]
Async version of errors() that doesn't block the event loop. (AsyncJobPool only.)
first_error
pool.first_error() -> Exception | None
Retrieve the first (by order of arguments) error result, or None.
first_error_async
await pool.first_error_async() -> Exception | None
Async version of first_error() that doesn't block the event loop. (AsyncJobPool only.)
first_log
pool.first_log() -> str | None
Retrieve the first (by order of arguments) logs, or None.
first_log_async
await pool.first_log_async() -> str | None
Async version of first_log() that doesn't block the event loop. (AsyncJobPool only.)
logs
pool.logs() -> list[str | None]
Logs for each task. Incomplete tasks will be reported as None.
logs_async
await pool.logs_async() -> list[str | None]
Async version of logs() that doesn't block the event loop. (AsyncJobPool only.)
logs_df
pool.logs_df(
status_column="status", # column name for status (None to omit)
result_column="result", # column name for result (None to omit)
time_column="time", # column name for time (None to omit)
logs_column="logs", # column name for logs (None to omit)
exception_column=None, # column name for exceptions (None to omit)
include_exceptions=True, # include exceptions in result column
) -> pd.DataFrame
Get a DataFrame of results as they are currently. The DataFrame will have columns for each argument passed, and columns for: status, result, time, logs and optionally exception.
logs_df_async
await pool.logs_df_async(
status_column="status",
result_column="result",
time_column="time",
logs_column="logs",
exception_column=None,
include_exceptions=True,
) -> pd.DataFrame
Async version of logs_df() that doesn't block the event loop. (AsyncJobPool only.)
n_jobs
The number of jobs in the pool.
pending
pool.pending() -> dict[int, Any]
Retrieve the arguments that are currently pending and not yet submitted.
results
pool.results(
return_exceptions=False, # if True, return exceptions instead of raising
) -> list[Any]
Retrieve all results of the job. Results are ordered by the order of the args list.
results_async
await pool.results_async(return_exceptions: bool = False) -> list[Any]
Async version of results(); assumes waiting has already been done. (AsyncJobPool only.)
results_now
pool.results_now(
return_exceptions=False, # if True, return exceptions instead of raising
) -> dict[int, Any]
Retrieve the results that are currently done. Results are indexed by position in the args list.
retry
pool.retry()
Rerun any tasks in error or timeout states. Tasks are rerun in the same pool.
running
pool.running() -> dict[int, Any]
Retrieve the results that are currently running.
shutdown
pool.shutdown(
wait=True, # if True, wait for all workers to complete
)
Shutdown the process pool executor.
This should be called when all jobs are complete to ensure worker processes are properly terminated and don't hang. Only applicable when using engine="local". Called automatically by collect().
status
pool.status() -> pd.Series
Return a Series indexed by status of task counts.
success
pool.success() -> dict[int, Any]
Retrieve the results that are currently done and are successful. Results are indexed by position in the args list.
success_async
await pool.success_async() -> dict[int, Any]
Async version of success() that doesn't block the event loop. (AsyncJobPool only.)
tail
pool.tail(
stop_on_exception=False, # if True, stop when first exception occurs
)
Wait until all jobs are finished, printing statuses as they become available.
This is useful for interactively watching for the state of the pool. Use pool._wait_sleep to set if sleep should occur while waiting.
tail_async
await pool.tail_async(stop_on_exception: bool = False)
Async version of tail() that doesn't block the event loop. (AsyncJobPool only.)
times
pool.times() -> list[timedelta | None]
Time taken for each task. Incomplete tasks will be reported as None.
total_time
pool.total_time(
since_retry=False, # if True, measure from last retry instead of start
) -> timedelta
Returns how long the entire job took. If only partial results are available, returns based on the last task to have been completed.
wait
pool.wait()
Wait until all jobs are finished.
Use fused.options.show.enable_tqdm to enable/disable tqdm. Use pool._wait_sleep to set if sleep should occur while waiting.
wait_async
await pool.wait_async()
Wait until all jobs are finished. Does not block the event loop. (AsyncJobPool only.)