context
Access runtime context information from within a UDF. These functions return information about the current execution environment.
import fused
@fused.udf
def udf():
# Check if running in realtime or batch
if fused.context.in_realtime():
print("Running in realtime")
# Current UDF (raises if not in a UDF)
udf_obj = fused.context.this_udf()
headers = fused.context.get_headers()
Functions
get_header
get_header(name: str) -> str | None
Get a specific request header by name (case-insensitive).
get_headers
get_headers() -> dict[str, str]
Get the request headers that were passed to the UDF.
get_recursion_factor
get_recursion_factor() -> int | None
Get the recursion factor from the current context.
get_realtime_client_id
get_realtime_client_id() -> str | None
Get the realtime client ID from the current context.
get_user_email
get_user_email() -> str | None
Get the user email from the current context.
in_batch
in_batch() -> bool
Return True if the context is in a batch job.
in_realtime
in_realtime() -> bool
Return True if the context is in a realtime job.
this_udf
this_udf() -> Udf
The UDF that is currently being run. Raises ValueError("No UDF is currently being run.") when not running inside a UDF.
get_global_context
get_global_context() -> ExecutionContextProtocol | None
Get the global execution context object.
Usage Example
import fused
@fused.udf
def udf():
# Conditional behavior based on execution mode
if fused.context.in_batch():
# Use larger chunk sizes for batch
chunk_size = 10000
else:
# Smaller chunks for realtime
chunk_size = 1000
# Access custom headers passed via API
api_key = fused.context.get_header("X-API-Key")
# Log who is running this UDF
user = fused.context.get_user_email()
print(f"Running for user: {user}")
# Access the running UDF object
current = fused.context.this_udf()
return process_data(chunk_size)