Skip to main content

context

Access runtime context information from within a UDF. These functions return information about the current execution environment.

import fused

@fused.udf
def udf():
# Check if running in realtime or batch
if fused.context.in_realtime():
print("Running in realtime")

# Get request headers
headers = fused.context.get_headers()

Functions

get_headers

get_headers() -> dict[str, str]

Get the request headers that were passed to the UDF.

Returns a dictionary of header names to values for all headers passed in the HTTP request.

get_header

get_header(name: str) -> str | None

Get a specific request header by name (case-insensitive).

Parameters:

  • name (str) – The header name to retrieve.

Returns: The header value, or None if not found.

get_recursion_factor

get_recursion_factor() -> int | None

Get the recursion factor from the current context.

Returns the recursion factor when running in a tiled context, or None otherwise.

get_realtime_client_id

get_realtime_client_id() -> str | None

Get the realtime client ID from the current context.

Returns the client ID when running in realtime mode, or None otherwise.

get_user_email

get_user_email() -> str | None

Get the user email from the current context.

Returns the email of the authenticated user running the UDF, or None if not available.

in_realtime

in_realtime() -> bool

Return True if the context is in a realtime job.

Use this to conditionally execute code based on execution mode.

in_batch

in_batch() -> bool

Return True if the context is in a batch job.

Use this to conditionally execute code based on execution mode.

get_global_context

get_global_context() -> ExecutionContextProtocol | None

Get the global execution context object.

Returns the full context object for advanced use cases.

Usage Example

import fused

@fused.udf
def udf():
# Conditional behavior based on execution mode
if fused.context.in_batch():
# Use larger chunk sizes for batch
chunk_size = 10000
else:
# Smaller chunks for realtime
chunk_size = 1000

# Access custom headers passed via API
api_key = fused.context.get_header("X-API-Key")

# Log who is running this UDF
user = fused.context.get_user_email()
print(f"Running for user: {user}")

return process_data(chunk_size)