context
Access runtime context information from within a UDF. These functions return information about the current execution environment.
import fused
@fused.udf
def udf():
# Check if running in realtime or batch
if fused.context.in_realtime():
print("Running in realtime")
# Get request headers
headers = fused.context.get_headers()
Functions
get_headers
get_headers() -> dict[str, str]
Get the request headers that were passed to the UDF.
Returns a dictionary of header names to values for all headers passed in the HTTP request.
get_header
get_header(name: str) -> str | None
Get a specific request header by name (case-insensitive).
Parameters:
- name (
str) – The header name to retrieve.
Returns: The header value, or None if not found.
get_recursion_factor
get_recursion_factor() -> int | None
Get the recursion factor from the current context.
Returns the recursion factor when running in a tiled context, or None otherwise.
get_realtime_client_id
get_realtime_client_id() -> str | None
Get the realtime client ID from the current context.
Returns the client ID when running in realtime mode, or None otherwise.
get_user_email
get_user_email() -> str | None
Get the user email from the current context.
Returns the email of the authenticated user running the UDF, or None if not available.
in_realtime
in_realtime() -> bool
Return True if the context is in a realtime job.
Use this to conditionally execute code based on execution mode.
in_batch
in_batch() -> bool
Return True if the context is in a batch job.
Use this to conditionally execute code based on execution mode.
get_global_context
get_global_context() -> ExecutionContextProtocol | None
Get the global execution context object.
Returns the full context object for advanced use cases.
Usage Example
import fused
@fused.udf
def udf():
# Conditional behavior based on execution mode
if fused.context.in_batch():
# Use larger chunk sizes for batch
chunk_size = 10000
else:
# Smaller chunks for realtime
chunk_size = 1000
# Access custom headers passed via API
api_key = fused.context.get_header("X-API-Key")
# Log who is running this UDF
user = fused.context.get_user_email()
print(f"Running for user: {user}")
return process_data(chunk_size)