Skip to main content

context

Access runtime context information from within a UDF. These functions return information about the current execution environment.

import fused

@fused.udf
def udf():
# Check if running in realtime or batch
if fused.context.in_realtime():
print("Running in realtime")
# Current UDF (raises if not in a UDF)
udf_obj = fused.context.this_udf()
headers = fused.context.get_headers()

Functions

get_header

get_header(name: str) -> str | None

Get a specific request header by name (case-insensitive).

get_headers

get_headers() -> dict[str, str]

Get the request headers that were passed to the UDF.

get_recursion_factor

get_recursion_factor() -> int | None

Get the recursion factor from the current context.

get_realtime_client_id

get_realtime_client_id() -> str | None

Get the realtime client ID from the current context.

get_user_email

get_user_email() -> str | None

Get the user email from the current context.

in_batch

in_batch() -> bool

Return True if the context is in a batch job.

in_realtime

in_realtime() -> bool

Return True if the context is in a realtime job.

this_udf

this_udf() -> Udf

The UDF that is currently being run. Raises ValueError("No UDF is currently being run.") when not running inside a UDF.

get_global_context

get_global_context() -> ExecutionContextProtocol | None

Get the global execution context object.

Usage Example

import fused

@fused.udf
def udf():
# Conditional behavior based on execution mode
if fused.context.in_batch():
# Use larger chunk sizes for batch
chunk_size = 10000
else:
# Smaller chunks for realtime
chunk_size = 1000

# Access custom headers passed via API
api_key = fused.context.get_header("X-API-Key")

# Log who is running this UDF
user = fused.context.get_user_email()
print(f"Running for user: {user}")
# Access the running UDF object
current = fused.context.this_udf()
return process_data(chunk_size)