Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions src/openlayer/lib/integrations/langchain_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,22 @@ def _process_and_upload_trace(self, root_step: steps.Step) -> None:
and root_step.inputs
and "prompt" in root_step.inputs
):
config.update({"prompt": root_step.inputs["prompt"]})
config.update({"prompt": utils.json_serialize(root_step.inputs["prompt"])})

if tracer._publish:
try:
client = tracer._get_client()
if client:
# Apply final JSON serialization to ensure everything is serializable
serialized_trace_data = utils.json_serialize(trace_data)
serialized_config = utils.json_serialize(config)

client.inference_pipelines.data.stream(
inference_pipeline_id=utils.get_env_variable(
"OPENLAYER_INFERENCE_PIPELINE_ID"
),
rows=[trace_data],
config=config,
rows=[serialized_trace_data],
config=serialized_config,
)
except Exception as err: # pylint: disable=broad-except
tracer.logger.error("Could not stream data to Openlayer %s", err)
Expand Down Expand Up @@ -270,6 +274,17 @@ def _convert_langchain_objects(self, obj: Any) -> Any:
if hasattr(obj, "messages"):
return [self._convert_langchain_objects(m) for m in obj.messages]

# Handle Pydantic model instances
if hasattr(obj, "model_dump") and callable(getattr(obj, "model_dump")):
try:
return self._convert_langchain_objects(obj.model_dump())
except Exception:
pass

# Handle Pydantic model classes/metaclasses (type objects)
if isinstance(obj, type):
return str(obj.__name__ if hasattr(obj, "__name__") else obj)

# Handle other LangChain objects with common attributes
if hasattr(obj, "dict") and callable(getattr(obj, "dict")):
# Many LangChain objects have a dict() method
Expand Down Expand Up @@ -556,6 +571,7 @@ def _handle_chain_start(
metadata={
"tags": tags,
"serialized": serialized,
"is_chain": True,
**(metadata or {}),
**kwargs,
},
Expand Down Expand Up @@ -637,14 +653,16 @@ def _handle_tool_start(
run_id=run_id,
parent_run_id=parent_run_id,
name=tool_name,
step_type=enums.StepType.USER_CALL,
step_type=enums.StepType.TOOL,
inputs=tool_input,
metadata={
"tags": tags,
"serialized": serialized,
**(metadata or {}),
**kwargs,
},
function_name=tool_name,
arguments=tool_input,
)

def _handle_tool_end(
Expand Down Expand Up @@ -690,13 +708,16 @@ def _handle_agent_action(
run_id=run_id,
parent_run_id=parent_run_id,
name=f"Agent Tool: {action.tool}",
step_type=enums.StepType.USER_CALL,
step_type=enums.StepType.AGENT,
inputs={
"tool": action.tool,
"tool_input": action.tool_input,
"log": action.log,
},
metadata={"agent_action": True, **kwargs},
tool=action.tool,
action=action,
agent_type="langchain_agent",
)

def _handle_agent_finish(
Expand Down Expand Up @@ -740,7 +761,7 @@ def _handle_retriever_start(
run_id=run_id,
parent_run_id=parent_run_id,
name=retriever_name,
step_type=enums.StepType.USER_CALL,
step_type=enums.StepType.RETRIEVER,
inputs={"query": query},
metadata={
"tags": tags,
Expand Down Expand Up @@ -775,6 +796,11 @@ def _handle_retriever_end(
if current_trace:
current_trace.update_metadata(context=doc_contents)

# Update the step with RetrieverStep-specific attributes
step = self.steps[run_id]
if isinstance(step, steps.RetrieverStep):
step.documents = doc_contents

self._end_step(
run_id=run_id,
parent_run_id=parent_run_id,
Expand Down Expand Up @@ -1146,19 +1172,23 @@ def _process_and_upload_async_trace(self, trace: traces.Trace) -> None:
and root_step.inputs
and "prompt" in root_step.inputs
):
config.update({"prompt": root_step.inputs["prompt"]})
config.update({"prompt": utils.json_serialize(root_step.inputs["prompt"])})

# Upload to Openlayer
if tracer._publish:
try:
client = tracer._get_client()
if client:
# Apply final JSON serialization to ensure everything is serializable
serialized_trace_data = utils.json_serialize(trace_data)
serialized_config = utils.json_serialize(config)

client.inference_pipelines.data.stream(
inference_pipeline_id=utils.get_env_variable(
"OPENLAYER_INFERENCE_PIPELINE_ID"
),
rows=[trace_data],
config=config,
rows=[serialized_trace_data],
config=serialized_config,
)
except Exception as err:
tracer.logger.error("Could not stream data to Openlayer %s", err)
Expand Down
4 changes: 4 additions & 0 deletions src/openlayer/lib/tracing/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@
class StepType(enum.Enum):
USER_CALL = "user_call"
CHAT_COMPLETION = "chat_completion"
AGENT = "agent"
RETRIEVER = "retriever"
TOOL = "tool"
HANDOFF = "handoff"
124 changes: 119 additions & 5 deletions src/openlayer/lib/tracing/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import time
import uuid
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from .. import utils
from . import enums
Expand Down Expand Up @@ -54,10 +54,10 @@ def to_dict(self) -> Dict[str, Any]:
"name": self.name,
"id": str(self.id),
"type": self.step_type.value,
"inputs": self.inputs,
"output": self.output,
"groundTruth": self.ground_truth,
"metadata": self.metadata,
"inputs": utils.json_serialize(self.inputs),
"output": utils.json_serialize(self.output),
"groundTruth": utils.json_serialize(self.ground_truth),
"metadata": utils.json_serialize(self.metadata),
"steps": [nested_step.to_dict() for nested_step in self.steps],
"latency": self.latency,
"startTime": self.start_time,
Expand Down Expand Up @@ -119,6 +119,116 @@ def to_dict(self) -> Dict[str, Any]:
return step_dict


class AgentStep(Step):
"""Agent step represents an agent in the trace."""

def __init__(
self,
name: str,
inputs: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Dict[str, any]] = None,
) -> None:
super().__init__(name=name, inputs=inputs, output=output, metadata=metadata)
self.step_type = enums.StepType.AGENT
self.tool: str = None
self.action: Any = None
self.agent_type: str = None

def to_dict(self) -> Dict[str, Any]:
"""Dictionary representation of the AgentStep."""
step_dict = super().to_dict()
step_dict.update(
{
"tool": self.tool,
"action": self.action,
"agentType": self.agent_type,
}
)
return step_dict


class RetrieverStep(Step):
"""Retriever step represents a retriever in the trace."""

def __init__(
self,
name: str,
inputs: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Dict[str, any]] = None,
) -> None:
super().__init__(name=name, inputs=inputs, output=output, metadata=metadata)
self.step_type = enums.StepType.RETRIEVER
self.documents: List[Any] = None

def to_dict(self) -> Dict[str, Any]:
"""Dictionary representation of the RetrieverStep."""
step_dict = super().to_dict()
step_dict.update(
{
"documents": self.documents,
}
)
return step_dict


class ToolStep(Step):
"""Tool step represents a tool in the trace."""

def __init__(
self,
name: str,
inputs: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Dict[str, any]] = None,
) -> None:
super().__init__(name=name, inputs=inputs, output=output, metadata=metadata)
self.step_type = enums.StepType.TOOL
self.function_name: str = None
self.arguments: Any = None

def to_dict(self) -> Dict[str, Any]:
"""Dictionary representation of the ToolStep."""
step_dict = super().to_dict()
step_dict.update(
{
"functionName": self.function_name,
"arguments": self.arguments,
}
)
return step_dict


class HandoffStep(Step):
"""Handoff step represents a handoff in the trace."""

def __init__(
self,
name: str,
inputs: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Dict[str, any]] = None,
) -> None:
super().__init__(name=name, inputs=inputs, output=output, metadata=metadata)
self.step_type = enums.StepType.HANDOFF
self.from_component: str = None
self.to_component: str = None
self.handoff_data: Any = None

def to_dict(self) -> Dict[str, Any]:
"""Dictionary representation of the HandoffStep."""
step_dict = super().to_dict()
step_dict.update(
{
"fromComponent": self.from_component,
"toComponent": self.to_component,
"handoffData": self.handoff_data,
}
)
return step_dict


# ----------------------------- Factory function ----------------------------- #
def step_factory(step_type: enums.StepType, *args, **kwargs) -> Step:
"""Factory function to create a step based on the step_type."""
Expand All @@ -127,5 +237,9 @@ def step_factory(step_type: enums.StepType, *args, **kwargs) -> Step:
step_type_mapping = {
enums.StepType.USER_CALL: UserCallStep,
enums.StepType.CHAT_COMPLETION: ChatCompletionStep,
enums.StepType.AGENT: AgentStep,
enums.StepType.RETRIEVER: RetrieverStep,
enums.StepType.TOOL: ToolStep,
enums.StepType.HANDOFF: HandoffStep,
}
return step_type_mapping[step_type](*args, **kwargs)
9 changes: 9 additions & 0 deletions src/openlayer/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def json_serialize(data):
return [json_serialize(item) for item in data]
elif isinstance(data, tuple):
return tuple(json_serialize(item) for item in data)
elif isinstance(data, type):
# Handle model classes/metaclasses
return str(data.__name__ if hasattr(data, "__name__") else data)
elif hasattr(data, "model_dump") and callable(getattr(data, "model_dump")):
# Handle Pydantic model instances
try:
return json_serialize(data.model_dump())
except Exception:
return str(data)
else:
# Fallback: Convert to string if not serializable
try:
Expand Down