From e2a6d223f5f4258c8dadf00d4abf9067655d9900 Mon Sep 17 00:00:00 2001 From: Siddhant Shah Date: Mon, 8 Sep 2025 16:48:37 +0530 Subject: [PATCH 1/3] feat: Add user_id and session_id in traces to track users and sessions --- src/openlayer/lib/__init__.py | 17 +- src/openlayer/lib/tracing/__init__.py | 44 +++++ src/openlayer/lib/tracing/context.py | 167 ++++++++++++++++++ src/openlayer/lib/tracing/tracer.py | 18 +- .../inference_pipelines/data_stream_params.py | 6 + 5 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 src/openlayer/lib/tracing/context.py diff --git a/src/openlayer/lib/__init__.py b/src/openlayer/lib/__init__.py index 00075bf2..abfab729 100644 --- a/src/openlayer/lib/__init__.py +++ b/src/openlayer/lib/__init__.py @@ -3,7 +3,7 @@ __all__ = [ "configure", "trace", - "trace_anthropic", + "trace_anthropic", "trace_openai", "trace_openai_assistant_thread_run", "trace_mistral", @@ -14,11 +14,24 @@ "trace_oci_genai", "trace_oci", # Alias for backward compatibility "update_current_trace", - "update_current_step" + "update_current_step", + # User and session context functions + "set_user_session_context", + "update_trace_user_session", + "get_current_user_id", + "get_current_session_id", + "clear_user_session_context", ] # ---------------------------------- Tracing --------------------------------- # from .tracing import tracer +from .tracing.context import ( + set_user_session_context, + update_trace_user_session, + get_current_user_id, + get_current_session_id, + clear_user_session_context, +) configure = tracer.configure trace = tracer.trace diff --git a/src/openlayer/lib/tracing/__init__.py b/src/openlayer/lib/tracing/__init__.py index e69de29b..db928534 100644 --- a/src/openlayer/lib/tracing/__init__.py +++ b/src/openlayer/lib/tracing/__init__.py @@ -0,0 +1,44 @@ +"""OpenLayer tracing module.""" + +from .tracer import ( + trace, + trace_async, + update_current_trace, + update_current_step, + log_context, + log_output, + configure, + get_current_trace, + get_current_step, + create_step, +) + +from .context import ( + set_user_session_context, + update_trace_user_session, + get_current_user_id, + get_current_session_id, + clear_user_session_context, +) + +__all__ = [ + # Core tracing functions + "trace", + "trace_async", + "update_current_trace", + "update_current_step", + "log_context", + "log_output", + "configure", + "get_current_trace", + "get_current_step", + "create_step", + + # User and session context functions + "set_user_session_context", + "update_trace_user_session", + "get_current_user_id", + "get_current_session_id", + "clear_user_session_context", +] + diff --git a/src/openlayer/lib/tracing/context.py b/src/openlayer/lib/tracing/context.py new file mode 100644 index 00000000..d481a6e2 --- /dev/null +++ b/src/openlayer/lib/tracing/context.py @@ -0,0 +1,167 @@ +""" +Streamlined user and session context management for OpenLayer tracing. + +This module provides simple functions to set user_id and session_id in middleware +and override them anywhere in your traced code. +""" + +import contextvars +import threading +from typing import Optional, Union + +# Sentinel object to distinguish between "not provided" and "explicitly None" +_NOT_PROVIDED = object() + +# Context variables for user and session tracking +_user_id_context = contextvars.ContextVar("openlayer_user_id", default=None) +_session_id_context = contextvars.ContextVar("openlayer_session_id", default=None) + +# Thread-local fallback for environments where contextvars don't work well +_thread_local = threading.local() + + +class UserSessionContext: + """Internal class to manage user and session context.""" + + @staticmethod + def set_user_id(user_id: Union[str, int, None]) -> None: + """Set the user ID for the current context.""" + user_id_str = str(user_id) if user_id is not None else None + _user_id_context.set(user_id_str) + + # Thread-local fallback + _thread_local.user_id = user_id_str + + @staticmethod + def get_user_id() -> Optional[str]: + """Get the current user ID.""" + try: + return _user_id_context.get(None) + except LookupError: + # Fallback to thread-local + return getattr(_thread_local, 'user_id', None) + + @staticmethod + def set_session_id(session_id: Union[str, None]) -> None: + """Set the session ID for the current context.""" + _session_id_context.set(session_id) + + # Thread-local fallback + _thread_local.session_id = session_id + + @staticmethod + def get_session_id() -> Optional[str]: + """Get the current session ID.""" + try: + return _session_id_context.get(None) + except LookupError: + # Fallback to thread-local + return getattr(_thread_local, 'session_id', None) + + @staticmethod + def clear_context() -> None: + """Clear all user and session context.""" + _user_id_context.set(None) + _session_id_context.set(None) + + # Clear thread-local + for attr in ['user_id', 'session_id']: + if hasattr(_thread_local, attr): + delattr(_thread_local, attr) + + +# ----------------------------- Public API Functions ----------------------------- # + +def set_user_session_context( + user_id: Union[str, int, None] = None, + session_id: Union[str, None] = None, +) -> None: + """Set user and session context for tracing (typically called in middleware). + + This function should be called once per request in your middleware to establish + default user_id and session_id values that will be automatically included in all traces. + + Args: + user_id: The user identifier + session_id: The session identifier + + Example: + >>> from openlayer.lib.tracing import set_user_session_context + >>> + >>> # In your middleware or request handler + >>> def middleware(request): + ... set_user_session_context( + ... user_id=request.user.id, + ... session_id=request.session.session_key + ... ) + ... # Now all traced functions will automatically include these values + """ + if user_id is not None: + UserSessionContext.set_user_id(user_id) + if session_id is not None: + UserSessionContext.set_session_id(session_id) + + +def update_trace_user_session( + user_id: Union[str, int, None] = _NOT_PROVIDED, + session_id: Union[str, None] = _NOT_PROVIDED, +) -> None: + """Update user_id and/or session_id for the current trace context. + + This can be called anywhere in your traced code to override the user_id + and/or session_id set in middleware. Inspired by Langfuse's updateActiveTrace pattern. + + Args: + user_id: The user identifier to set (optional). Pass None to clear. + session_id: The session identifier to set (optional). Pass None to clear. + + Example: + >>> from openlayer.lib.tracing import update_trace_user_session + >>> + >>> @trace() + >>> def process_request(): + ... # Override user_id for this specific trace + ... update_trace_user_session(user_id="different_user_123") + ... return "result" + >>> + >>> @trace() + >>> def start_new_session(): + ... # Start a new session for this trace + ... update_trace_user_session(session_id="new_session_456") + ... return "result" + >>> + >>> @trace() + >>> def switch_user_and_session(): + ... # Update both at once + ... update_trace_user_session( + ... user_id="admin_user_789", + ... session_id="admin_session_abc" + ... ) + ... return "result" + >>> + >>> @trace() + >>> def clear_user(): + ... # Clear user_id (set to None) + ... update_trace_user_session(user_id=None) + ... return "result" + """ + # Use sentinel object to distinguish between "not provided" and "explicitly None" + if user_id is not _NOT_PROVIDED: + UserSessionContext.set_user_id(user_id) + if session_id is not _NOT_PROVIDED: + UserSessionContext.set_session_id(session_id) + + +def get_current_user_id() -> Optional[str]: + """Get the current user ID from context.""" + return UserSessionContext.get_user_id() + + +def get_current_session_id() -> Optional[str]: + """Get the current session ID from context.""" + return UserSessionContext.get_session_id() + + +def clear_user_session_context() -> None: + """Clear all user and session context.""" + UserSessionContext.clear_context() diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 5a15a243..1471c758 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -16,6 +16,7 @@ from .. import utils from . import enums, steps, traces from ..guardrails.base import GuardrailResult, GuardrailAction +from .context import UserSessionContext logger = logging.getLogger(__name__) @@ -923,6 +924,12 @@ def _handle_trace_completion( num_of_token_column_name="tokens", ) ) + + # Add reserved column configurations for user context + if "user_id" in trace_data: + config.update({"user_id_column_name": "user_id"}) + if "session_id" in trace_data: + config.update({"session_id_column_name": "session_id"}) if "groundTruth" in trace_data: config.update({"ground_truth_column_name": "groundTruth"}) if "context" in trace_data: @@ -1184,7 +1191,16 @@ def post_process_trace( if trace_obj.metadata is not None: # Add each trace metadata key directly to the row/record level trace_data.update(trace_obj.metadata) - + + # Add reserved columns for user and session context + user_id = UserSessionContext.get_user_id() + if user_id is not None: + trace_data["user_id"] = user_id + + session_id = UserSessionContext.get_session_id() + if session_id is not None: + trace_data["session_id"] = session_id + if root_step.ground_truth: trace_data["groundTruth"] = root_step.ground_truth if input_variables: diff --git a/src/openlayer/types/inference_pipelines/data_stream_params.py b/src/openlayer/types/inference_pipelines/data_stream_params.py index a897b34a..554fb6bf 100644 --- a/src/openlayer/types/inference_pipelines/data_stream_params.py +++ b/src/openlayer/types/inference_pipelines/data_stream_params.py @@ -89,6 +89,12 @@ class ConfigLlmData(TypedDict, total=False): used. """ + user_id_column_name: Annotated[str, PropertyInfo(alias="userIdColumnName")] + """Name of the column with the user IDs.""" + + session_id_column_name: Annotated[str, PropertyInfo(alias="sessionIdColumnName")] + """Name of the column with the session IDs.""" + class ConfigTabularClassificationData(TypedDict, total=False): class_names: Required[Annotated[SequenceNotStr[str], PropertyInfo(alias="classNames")]] From 03ed5a2edd41c3ad517e9c6b9132b588bcee26bf Mon Sep 17 00:00:00 2001 From: Siddhant Shah Date: Tue, 9 Sep 2025 13:52:46 +0530 Subject: [PATCH 2/3] refactor: import fixes --- src/openlayer/lib/tracing/__init__.py | 16 +--------------- src/openlayer/lib/tracing/context.py | 2 +- .../inference_pipelines/data_stream_params.py | 8 +------- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/src/openlayer/lib/tracing/__init__.py b/src/openlayer/lib/tracing/__init__.py index db928534..ffa19284 100644 --- a/src/openlayer/lib/tracing/__init__.py +++ b/src/openlayer/lib/tracing/__init__.py @@ -1,4 +1,4 @@ -"""OpenLayer tracing module.""" +"""Openlayer tracing module.""" from .tracer import ( trace, @@ -13,13 +13,6 @@ create_step, ) -from .context import ( - set_user_session_context, - update_trace_user_session, - get_current_user_id, - get_current_session_id, - clear_user_session_context, -) __all__ = [ # Core tracing functions @@ -33,12 +26,5 @@ "get_current_trace", "get_current_step", "create_step", - - # User and session context functions - "set_user_session_context", - "update_trace_user_session", - "get_current_user_id", - "get_current_session_id", - "clear_user_session_context", ] diff --git a/src/openlayer/lib/tracing/context.py b/src/openlayer/lib/tracing/context.py index d481a6e2..0f011735 100644 --- a/src/openlayer/lib/tracing/context.py +++ b/src/openlayer/lib/tracing/context.py @@ -1,5 +1,5 @@ """ -Streamlined user and session context management for OpenLayer tracing. +Streamlined user and session context management for Openlayer tracing. This module provides simple functions to set user_id and session_id in middleware and override them anywhere in your traced code. diff --git a/src/openlayer/types/inference_pipelines/data_stream_params.py b/src/openlayer/types/inference_pipelines/data_stream_params.py index 554fb6bf..9c0f9e35 100644 --- a/src/openlayer/types/inference_pipelines/data_stream_params.py +++ b/src/openlayer/types/inference_pipelines/data_stream_params.py @@ -89,12 +89,6 @@ class ConfigLlmData(TypedDict, total=False): used. """ - user_id_column_name: Annotated[str, PropertyInfo(alias="userIdColumnName")] - """Name of the column with the user IDs.""" - - session_id_column_name: Annotated[str, PropertyInfo(alias="sessionIdColumnName")] - """Name of the column with the session IDs.""" - class ConfigTabularClassificationData(TypedDict, total=False): class_names: Required[Annotated[SequenceNotStr[str], PropertyInfo(alias="classNames")]] @@ -235,4 +229,4 @@ class ConfigTextClassificationData(TypedDict, total=False): Config: TypeAlias = Union[ ConfigLlmData, ConfigTabularClassificationData, ConfigTabularRegressionData, ConfigTextClassificationData -] +] \ No newline at end of file From 635bbce8418818dfbfdf27a39f68c70a7b71dae7 Mon Sep 17 00:00:00 2001 From: Siddhant Shah Date: Wed, 10 Sep 2025 15:39:00 +0530 Subject: [PATCH 3/3] style: reverted back to previous version for data_stream_params.py --- src/openlayer/types/inference_pipelines/data_stream_params.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openlayer/types/inference_pipelines/data_stream_params.py b/src/openlayer/types/inference_pipelines/data_stream_params.py index 9c0f9e35..a897b34a 100644 --- a/src/openlayer/types/inference_pipelines/data_stream_params.py +++ b/src/openlayer/types/inference_pipelines/data_stream_params.py @@ -229,4 +229,4 @@ class ConfigTextClassificationData(TypedDict, total=False): Config: TypeAlias = Union[ ConfigLlmData, ConfigTabularClassificationData, ConfigTabularRegressionData, ConfigTextClassificationData -] \ No newline at end of file +]