Skip to main content

JobPool (class)

class JobPool

The JobPool class manages parallel job execution and result retrieval. It is returned by udf.map().

Basic usage

@fused.udf
def my_udf(x: int):
return x ** 2

# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])

# Wait and collect results as DataFrame
df = pool.collect()

# Or get raw results as list
pool.wait()
results = pool.results() # [1, 4, 9, 16, 25]

Monitoring progress

pool = my_udf.map(range(100))

# Show live progress
pool.tail()

# Check status
pool.status() # Series of status counts
pool.all_succeeded() # True if all jobs succeeded
pool.any_failed() # True if any job failed

Parallel processing within workers

Use n_processes_per_worker to combine multiple items per UDF invocation, reducing total invocations for large job counts:

# Process 4 items in parallel within each UDF instance
pool = my_udf.map(range(1000), n_processes_per_worker=4)
df = pool.collect()

Local engine execution

When using engine="local", the pool uses process-based execution. The pool automatically shuts down after collect():

pool = my_udf.map([1, 2, 3], engine="local")
df = pool.collect() # Pool automatically shuts down after collecting

Async methods

When using udf.map_async(), the returned pool has async variants of all key methods:

pool = await my_udf.map_async([1, 2, 3])

await pool.wait_async()
results = await pool.results_async()
results_now = await pool.results_now_async()
errors = await pool.errors_async()
first_error = await pool.first_error_async()
logs = await pool.logs_async()
first_log = await pool.first_log_async()
logs_df = await pool.logs_df_async()
success = await pool.success_async()
await pool.tail_async()
await pool.cancel_async()

all_succeeded

pool.all_succeeded() -> bool

True if all tasks finished with success.


any_failed

pool.any_failed() -> bool

True if any task finished with an error.


any_succeeded

pool.any_succeeded() -> bool

True if any task finished with success.


arg_df

pool.arg_df() -> pd.DataFrame

The arguments passed to runs as a DataFrame.


cancel

pool.cancel(
wait=False, # if True, wait for running tasks to complete
)

Cancel any pending (not running) tasks. Note it will not be possible to retry on the same JobPool later.


cancelled

pool.cancelled() -> dict[int, Any]

Retrieve the arguments that were cancelled and not run. Results are indexed by position in the args list.


collect

pool.collect(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame

Wait for all jobs and collect results into a DataFrame.

note

When using engine="local", collect() automatically shuts down the pool after collecting results.


df

pool.df(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame

Alias for collect().


done

pool.done() -> bool

True if all tasks have finished, regardless of success or failure.


errors

pool.errors() -> dict[int, Exception]

Retrieve the results that are currently done and are errors. Results are indexed by position in the args list.


first_error

pool.first_error() -> Exception | None

Retrieve the first (by order of arguments) error result, or None.


first_log

pool.first_log() -> str | None

Retrieve the first (by order of arguments) logs, or None.


logs

pool.logs() -> list[str | None]

Logs for each task. Incomplete tasks will be reported as None.


logs_df

pool.logs_df(
status_column="status", # column name for status (None to omit)
result_column="result", # column name for result (None to omit)
time_column="time", # column name for time (None to omit)
logs_column="logs", # column name for logs (None to omit)
exception_column=None, # column name for exceptions (None to omit)
include_exceptions=True, # include exceptions in result column
) -> pd.DataFrame

Get a DataFrame of results as they are currently. Includes columns for each argument passed, plus status, result, time, and logs.


n_jobs

The number of jobs in the pool.


pending

pool.pending() -> dict[int, Any]

Retrieve the arguments that are currently pending and not yet submitted.


results

pool.results(
return_exceptions=False, # if True, return exceptions instead of raising
) -> list[Any]

Retrieve all results of the job. Results are ordered by the order of the args list.


results_now

pool.results_now(
return_exceptions=False, # if True, return exceptions instead of raising
) -> dict[int, Any]

Retrieve the results that are currently done. Results are indexed by position in the args list.


retry

pool.retry()

Rerun any tasks in error or timeout states. Tasks are rerun in the same pool.


running

pool.running() -> dict[int, Any]

Retrieve the results that are currently running. Results are indexed by position in the args list.


shutdown

pool.shutdown(
wait=True, # if True, wait for all workers to complete
)

Shutdown the pool. Only applicable when using engine="local". Called automatically by collect().


status

pool.status() -> pd.Series

Return a Series indexed by status of task counts.


success

pool.success() -> dict[int, Any]

Retrieve the results that are currently done and are successful. Results are indexed by position in the args list.


tail

pool.tail(
stop_on_exception=False, # if True, stop when first exception occurs
)

Wait until all jobs are finished, printing statuses as they become available. Useful for interactively watching the state of the pool.


times

pool.times() -> list[timedelta | None]

Time taken for each task. Incomplete tasks will be reported as None.


total_time

pool.total_time(
since_retry=False, # if True, measure from last retry instead of start
) -> timedelta

Returns how long the entire job took. If only partial results are available, returns based on the last task to have been completed.


wait

pool.wait()

Wait until all jobs are finished. Use fused.options.show.enable_tqdm to enable/disable progress bar.