Skip to main content

JobPool (class)

class JobPool

The JobPool class manages parallel job execution and result retrieval. It is returned by udf.map().

Basic usage

@fused.udf
def udf():
# Create a pool of jobs
pool = my_udf.map([1, 2, 3, 4, 5])

# Wait and collect results as DataFrame
df = pool.df()

return df

@fused.udf
def my_udf(x: int):
return x ** 2
note

my_udf is defined in the same file above, so there is no need to load it.

Monitoring progress

pool = my_udf.map(range(100))

# Show live progress
pool.tail()

# Check status
pool.status() # Series of status counts
pool.all_succeeded() # True if all jobs succeeded
pool.any_failed() # True if any job failed

Parallel processing within workers

Use worker_concurrency to combine multiple items per UDF invocation, reducing total invocations for large job counts:

@fused.udf
def udf():
# Running 10 jobs in parallel but with 4 jobs per UDF instance
pool = my_udf.map(range(10), worker_concurrency=4)
df = pool.df()

return df

@fused.udf
def my_udf(x: int):
return x ** 2

Local engine execution

When using engine="local", the pool uses process-based execution. The pool automatically shuts down after df():

@fused.udf
def udf():
# All jobs are executed locally
pool = my_udf.map(range(10), engine="local")
df = pool.df() # Pool automatically shuts down after collecting

return df

@fused.udf
def my_udf(x: int):
return x ** 2

Async methods (AsyncJobPool)

When using udf.map_async(), the returned object is an AsyncJobPool (subclass of JobPool) with async variants of the key methods. Use them in async code so they don't block the event loop (e.g. in Workbench):

@fused.udf
async def udf():
pool = await my_udf.map_async([1, 2, 3])
await pool.wait_async()
results = await pool.results_async()
return results

@fused.udf
def my_udf(x: int):
return x ** 2

The async variants are documented below in alphabetical order with the sync methods.


all_succeeded

pool.all_succeeded() -> bool

True if all tasks finished with success.


any_failed

pool.any_failed() -> bool

True if any task finished with an error.


any_succeeded

pool.any_succeeded() -> bool

True if any task finished with success.


arg_df

pool.arg_df() -> pd.DataFrame

The arguments passed to runs as a DataFrame.


cancel

pool.cancel(
wait=False, # if True, wait for running tasks to complete
)

Cancel any pending (not running) tasks. Note it will not be possible to retry on the same JobPool later.


cancel_async

await pool.cancel_async(wait: bool = False)

Cancel any pending (not running) tasks. (AsyncJobPool only.)


cancelled

pool.cancelled() -> dict[int, Any]

Retrieve the arguments that were cancelled and not run.


collect

pool.collect(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame

Alias for df().


df

pool.df(
ignore_exceptions=False, # if True, skip failed jobs instead of raising
flatten=True, # if True, flatten DataFrame results
drop_index=False, # if True, use position index instead of args
) -> pd.DataFrame

Collect all results into a DataFrame.

note

When using engine="local", df() automatically shuts down the pool after collecting results.


done

pool.done() -> bool

True if all tasks have finished, regardless of success or failure.


errors

pool.errors() -> dict[int, Exception]

Retrieve the results that are currently done and are errors. Results are indexed by position in the args list.


errors_async

await pool.errors_async() -> dict[int, Exception]

Async version of errors() that doesn't block the event loop. (AsyncJobPool only.)


first_error

pool.first_error() -> Exception | None

Retrieve the first (by order of arguments) error result, or None.


first_error_async

await pool.first_error_async() -> Exception | None

Async version of first_error() that doesn't block the event loop. (AsyncJobPool only.)


first_log

pool.first_log() -> str | None

Retrieve the first (by order of arguments) logs, or None.


first_log_async

await pool.first_log_async() -> str | None

Async version of first_log() that doesn't block the event loop. (AsyncJobPool only.)


logs

pool.logs() -> list[str | None]

Logs for each task. Incomplete tasks will be reported as None.


logs_async

await pool.logs_async() -> list[str | None]

Async version of logs() that doesn't block the event loop. (AsyncJobPool only.)


logs_df

pool.logs_df(
status_column="status", # column name for status (None to omit)
result_column="result", # column name for result (None to omit)
time_column="time", # column name for time (None to omit)
logs_column="logs", # column name for logs (None to omit)
exception_column=None, # column name for exceptions (None to omit)
include_exceptions=True, # include exceptions in result column
) -> pd.DataFrame

Get a DataFrame of results as they are currently. The DataFrame will have columns for each argument passed, and columns for: status, result, time, logs and optionally exception.


logs_df_async

await pool.logs_df_async(
status_column="status",
result_column="result",
time_column="time",
logs_column="logs",
exception_column=None,
include_exceptions=True,
) -> pd.DataFrame

Async version of logs_df() that doesn't block the event loop. (AsyncJobPool only.)


n_jobs

The number of jobs in the pool.


pending

pool.pending() -> dict[int, Any]

Retrieve the arguments that are currently pending and not yet submitted.


results

pool.results(
return_exceptions=False, # if True, return exceptions instead of raising
) -> list[Any]

Retrieve all results of the job. Results are ordered by the order of the args list.


results_async

await pool.results_async(return_exceptions: bool = False) -> list[Any]

Async version of results(); assumes waiting has already been done. (AsyncJobPool only.)


results_now

pool.results_now(
return_exceptions=False, # if True, return exceptions instead of raising
) -> dict[int, Any]

Retrieve the results that are currently done. Results are indexed by position in the args list.


retry

pool.retry()

Rerun any tasks in error or timeout states. Tasks are rerun in the same pool.


running

pool.running() -> dict[int, Any]

Retrieve the results that are currently running.


shutdown

pool.shutdown(
wait=True, # if True, wait for all workers to complete
)

Shutdown the process pool executor.

This should be called when all jobs are complete to ensure worker processes are properly terminated and don't hang. Only applicable when using engine="local". Called automatically by collect().


status

pool.status() -> pd.Series

Return a Series indexed by status of task counts.


success

pool.success() -> dict[int, Any]

Retrieve the results that are currently done and are successful. Results are indexed by position in the args list.


success_async

await pool.success_async() -> dict[int, Any]

Async version of success() that doesn't block the event loop. (AsyncJobPool only.)


tail

pool.tail(
stop_on_exception=False, # if True, stop when first exception occurs
)

Wait until all jobs are finished, printing statuses as they become available.

This is useful for interactively watching for the state of the pool. Use pool._wait_sleep to set if sleep should occur while waiting.


tail_async

await pool.tail_async(stop_on_exception: bool = False)

Async version of tail() that doesn't block the event loop. (AsyncJobPool only.)


times

pool.times() -> list[timedelta | None]

Time taken for each task. Incomplete tasks will be reported as None.


total_time

pool.total_time(
since_retry=False, # if True, measure from last retry instead of start
) -> timedelta

Returns how long the entire job took. If only partial results are available, returns based on the last task to have been completed.


wait

pool.wait()

Wait until all jobs are finished.

Use fused.options.show.enable_tqdm to enable/disable tqdm. Use pool._wait_sleep to set if sleep should occur while waiting.


wait_async

await pool.wait_async()

Wait until all jobs are finished. Does not block the event loop. (AsyncJobPool only.)