From 2d028d3228103b504ea9866988b6f6693ed9004f Mon Sep 17 00:00:00 2001 From: Gustavo Cid Date: Thu, 4 Sep 2025 12:09:39 -0300 Subject: [PATCH] feat: introduce new step types --- .../lib/integrations/langchain_callback.py | 48 +++++-- src/openlayer/lib/tracing/enums.py | 4 + src/openlayer/lib/tracing/steps.py | 124 +++++++++++++++++- src/openlayer/lib/utils.py | 9 ++ 4 files changed, 171 insertions(+), 14 deletions(-) diff --git a/src/openlayer/lib/integrations/langchain_callback.py b/src/openlayer/lib/integrations/langchain_callback.py index c48bfff8..3d66e742 100644 --- a/src/openlayer/lib/integrations/langchain_callback.py +++ b/src/openlayer/lib/integrations/langchain_callback.py @@ -190,18 +190,22 @@ def _process_and_upload_trace(self, root_step: steps.Step) -> None: and root_step.inputs and "prompt" in root_step.inputs ): - config.update({"prompt": root_step.inputs["prompt"]}) + config.update({"prompt": utils.json_serialize(root_step.inputs["prompt"])}) if tracer._publish: try: client = tracer._get_client() if client: + # Apply final JSON serialization to ensure everything is serializable + serialized_trace_data = utils.json_serialize(trace_data) + serialized_config = utils.json_serialize(config) + client.inference_pipelines.data.stream( inference_pipeline_id=utils.get_env_variable( "OPENLAYER_INFERENCE_PIPELINE_ID" ), - rows=[trace_data], - config=config, + rows=[serialized_trace_data], + config=serialized_config, ) except Exception as err: # pylint: disable=broad-except tracer.logger.error("Could not stream data to Openlayer %s", err) @@ -270,6 +274,17 @@ def _convert_langchain_objects(self, obj: Any) -> Any: if hasattr(obj, "messages"): return [self._convert_langchain_objects(m) for m in obj.messages] + # Handle Pydantic model instances + if hasattr(obj, "model_dump") and callable(getattr(obj, "model_dump")): + try: + return self._convert_langchain_objects(obj.model_dump()) + except Exception: + pass + + # Handle Pydantic model classes/metaclasses (type objects) + if isinstance(obj, type): + return str(obj.__name__ if hasattr(obj, "__name__") else obj) + # Handle other LangChain objects with common attributes if hasattr(obj, "dict") and callable(getattr(obj, "dict")): # Many LangChain objects have a dict() method @@ -556,6 +571,7 @@ def _handle_chain_start( metadata={ "tags": tags, "serialized": serialized, + "is_chain": True, **(metadata or {}), **kwargs, }, @@ -637,7 +653,7 @@ def _handle_tool_start( run_id=run_id, parent_run_id=parent_run_id, name=tool_name, - step_type=enums.StepType.USER_CALL, + step_type=enums.StepType.TOOL, inputs=tool_input, metadata={ "tags": tags, @@ -645,6 +661,8 @@ def _handle_tool_start( **(metadata or {}), **kwargs, }, + function_name=tool_name, + arguments=tool_input, ) def _handle_tool_end( @@ -690,13 +708,16 @@ def _handle_agent_action( run_id=run_id, parent_run_id=parent_run_id, name=f"Agent Tool: {action.tool}", - step_type=enums.StepType.USER_CALL, + step_type=enums.StepType.AGENT, inputs={ "tool": action.tool, "tool_input": action.tool_input, "log": action.log, }, metadata={"agent_action": True, **kwargs}, + tool=action.tool, + action=action, + agent_type="langchain_agent", ) def _handle_agent_finish( @@ -740,7 +761,7 @@ def _handle_retriever_start( run_id=run_id, parent_run_id=parent_run_id, name=retriever_name, - step_type=enums.StepType.USER_CALL, + step_type=enums.StepType.RETRIEVER, inputs={"query": query}, metadata={ "tags": tags, @@ -775,6 +796,11 @@ def _handle_retriever_end( if current_trace: current_trace.update_metadata(context=doc_contents) + # Update the step with RetrieverStep-specific attributes + step = self.steps[run_id] + if isinstance(step, steps.RetrieverStep): + step.documents = doc_contents + self._end_step( run_id=run_id, parent_run_id=parent_run_id, @@ -1146,19 +1172,23 @@ def _process_and_upload_async_trace(self, trace: traces.Trace) -> None: and root_step.inputs and "prompt" in root_step.inputs ): - config.update({"prompt": root_step.inputs["prompt"]}) + config.update({"prompt": utils.json_serialize(root_step.inputs["prompt"])}) # Upload to Openlayer if tracer._publish: try: client = tracer._get_client() if client: + # Apply final JSON serialization to ensure everything is serializable + serialized_trace_data = utils.json_serialize(trace_data) + serialized_config = utils.json_serialize(config) + client.inference_pipelines.data.stream( inference_pipeline_id=utils.get_env_variable( "OPENLAYER_INFERENCE_PIPELINE_ID" ), - rows=[trace_data], - config=config, + rows=[serialized_trace_data], + config=serialized_config, ) except Exception as err: tracer.logger.error("Could not stream data to Openlayer %s", err) diff --git a/src/openlayer/lib/tracing/enums.py b/src/openlayer/lib/tracing/enums.py index dbb5f132..9b467a96 100644 --- a/src/openlayer/lib/tracing/enums.py +++ b/src/openlayer/lib/tracing/enums.py @@ -6,3 +6,7 @@ class StepType(enum.Enum): USER_CALL = "user_call" CHAT_COMPLETION = "chat_completion" + AGENT = "agent" + RETRIEVER = "retriever" + TOOL = "tool" + HANDOFF = "handoff" diff --git a/src/openlayer/lib/tracing/steps.py b/src/openlayer/lib/tracing/steps.py index 4fcc9d55..122890f2 100644 --- a/src/openlayer/lib/tracing/steps.py +++ b/src/openlayer/lib/tracing/steps.py @@ -2,7 +2,7 @@ import time import uuid -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from .. import utils from . import enums @@ -54,10 +54,10 @@ def to_dict(self) -> Dict[str, Any]: "name": self.name, "id": str(self.id), "type": self.step_type.value, - "inputs": self.inputs, - "output": self.output, - "groundTruth": self.ground_truth, - "metadata": self.metadata, + "inputs": utils.json_serialize(self.inputs), + "output": utils.json_serialize(self.output), + "groundTruth": utils.json_serialize(self.ground_truth), + "metadata": utils.json_serialize(self.metadata), "steps": [nested_step.to_dict() for nested_step in self.steps], "latency": self.latency, "startTime": self.start_time, @@ -119,6 +119,116 @@ def to_dict(self) -> Dict[str, Any]: return step_dict +class AgentStep(Step): + """Agent step represents an agent in the trace.""" + + def __init__( + self, + name: str, + inputs: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Dict[str, any]] = None, + ) -> None: + super().__init__(name=name, inputs=inputs, output=output, metadata=metadata) + self.step_type = enums.StepType.AGENT + self.tool: str = None + self.action: Any = None + self.agent_type: str = None + + def to_dict(self) -> Dict[str, Any]: + """Dictionary representation of the AgentStep.""" + step_dict = super().to_dict() + step_dict.update( + { + "tool": self.tool, + "action": self.action, + "agentType": self.agent_type, + } + ) + return step_dict + + +class RetrieverStep(Step): + """Retriever step represents a retriever in the trace.""" + + def __init__( + self, + name: str, + inputs: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Dict[str, any]] = None, + ) -> None: + super().__init__(name=name, inputs=inputs, output=output, metadata=metadata) + self.step_type = enums.StepType.RETRIEVER + self.documents: List[Any] = None + + def to_dict(self) -> Dict[str, Any]: + """Dictionary representation of the RetrieverStep.""" + step_dict = super().to_dict() + step_dict.update( + { + "documents": self.documents, + } + ) + return step_dict + + +class ToolStep(Step): + """Tool step represents a tool in the trace.""" + + def __init__( + self, + name: str, + inputs: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Dict[str, any]] = None, + ) -> None: + super().__init__(name=name, inputs=inputs, output=output, metadata=metadata) + self.step_type = enums.StepType.TOOL + self.function_name: str = None + self.arguments: Any = None + + def to_dict(self) -> Dict[str, Any]: + """Dictionary representation of the ToolStep.""" + step_dict = super().to_dict() + step_dict.update( + { + "functionName": self.function_name, + "arguments": self.arguments, + } + ) + return step_dict + + +class HandoffStep(Step): + """Handoff step represents a handoff in the trace.""" + + def __init__( + self, + name: str, + inputs: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Dict[str, any]] = None, + ) -> None: + super().__init__(name=name, inputs=inputs, output=output, metadata=metadata) + self.step_type = enums.StepType.HANDOFF + self.from_component: str = None + self.to_component: str = None + self.handoff_data: Any = None + + def to_dict(self) -> Dict[str, Any]: + """Dictionary representation of the HandoffStep.""" + step_dict = super().to_dict() + step_dict.update( + { + "fromComponent": self.from_component, + "toComponent": self.to_component, + "handoffData": self.handoff_data, + } + ) + return step_dict + + # ----------------------------- Factory function ----------------------------- # def step_factory(step_type: enums.StepType, *args, **kwargs) -> Step: """Factory function to create a step based on the step_type.""" @@ -127,5 +237,9 @@ def step_factory(step_type: enums.StepType, *args, **kwargs) -> Step: step_type_mapping = { enums.StepType.USER_CALL: UserCallStep, enums.StepType.CHAT_COMPLETION: ChatCompletionStep, + enums.StepType.AGENT: AgentStep, + enums.StepType.RETRIEVER: RetrieverStep, + enums.StepType.TOOL: ToolStep, + enums.StepType.HANDOFF: HandoffStep, } return step_type_mapping[step_type](*args, **kwargs) diff --git a/src/openlayer/lib/utils.py b/src/openlayer/lib/utils.py index 2732ca0c..b5366c35 100644 --- a/src/openlayer/lib/utils.py +++ b/src/openlayer/lib/utils.py @@ -48,6 +48,15 @@ def json_serialize(data): return [json_serialize(item) for item in data] elif isinstance(data, tuple): return tuple(json_serialize(item) for item in data) + elif isinstance(data, type): + # Handle model classes/metaclasses + return str(data.__name__ if hasattr(data, "__name__") else data) + elif hasattr(data, "model_dump") and callable(getattr(data, "model_dump")): + # Handle Pydantic model instances + try: + return json_serialize(data.model_dump()) + except Exception: + return str(data) else: # Fallback: Convert to string if not serializable try: