pydantic_ai_implementation_guide
pydantic_ai_implementation_guide
Table of Contents
1. Introduction
2. Quick Start Guide
3. Setting Up Your Environment
4. Defining Agents
5. Creating and Registering Tools
6. Dependency Injection
7. MCP Server Setup
8. Agent Types and Use Cases
9. End-to-End Example
10. Testing and Debugging
11. Deployment Considerations
12. Appendix: Advanced Patterns
Introduction
This implementation guide provides practical instructions and code examples for
building an AI agent platform using Pydantic AI. It covers everything from basic agent
setup to complex workflows and deployment strategies. The guide is designed to help
developers implement the architecture described in the platform architecture
document while following the best practices outlined in the best practices guide.
# Install Pydantic AI
# pip install pydantic-ai
Project Structure
my_ai_platform/
├── agents/
│ ├── __init__.py
│ ├── base.py
│ ├── retrieval.py
│ ├── planning.py
│ └── conversation.py
├── tools/
│ ├── __init__.py
│ ├── registry.py
│ ├── search.py
│ ├── database.py
│ └── external_apis.py
├── services/
│ ├── __init__.py
│ ├── database.py
│ ├── vector_store.py
│ └── authentication.py
├── models/
│ ├── __init__.py
│ ├── input_types.py
│ └── output_types.py
├── mcp/
│ ├── __init__.py
│ ├── server.py
│ └── client.py
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── logging.py
├── tests/
│ ├── __init__.py
│ ├── test_agents.py
│ └── test_tools.py
├── main.py
├── Dockerfile
└── docker-compose.yml
Defining Agents
Agents are the core building blocks of your AI platform. Here’s how to define different
types of agents:
Basic Agent
from pydantic_ai import Agent
class Sentiment(Enum):
POSITIVE = "positive"
NEUTRAL = "neutral"
NEGATIVE = "negative"
class SentimentAnalysis(BaseModel):
sentiment: Sentiment
confidence: float = Field(ge=0.0, le=1.0)
explanation: str
@dataclass
class UserContext:
user_name: str
user_preferences: dict
language: str = "en"
@personalized_agent.system_prompt
def personalized_prompt(ctx: RunContext[UserContext]) -> str:
return f"""
You are a personal assistant for {ctx.deps.user_name}.
Preferred language: {ctx.deps.language}
User preferences: {ctx.deps.user_preferences}
response = personalized_agent.run_sync(
"What restaurants would you recommend?",
deps=user_context
)
Async Agent
from pydantic_ai import Agent
import asyncio
async_agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant."
)
# Example usage
queries = ["What is AI?", "Explain quantum computing", "How does blockchain work?"]
results = asyncio.run(process_multiple_queries(queries))
Stateful Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class ConversationState:
history: List[dict]
user_id: str
last_topic: Optional[str] = None
@stateful_agent.system_prompt
def conversation_prompt(ctx: RunContext[ConversationState]) -> str:
history_str = "\n".join([
f"User: {msg['user']}\nAssistant: {msg['assistant']}"
for msg in ctx.deps.history[-5:] # Last 5 messages
])
return f"""
You are a conversational assistant for user {ctx.deps.user_id}.
# Example usage
state = ConversationState(
history=[
{"user": "Hello", "assistant": "Hi there! How can I help you today?"},
{"user": "Tell me about machine learning", "assistant": "Machine learning is..."}
],
user_id="user123",
last_topic="machine learning"
)
weather_agent = Agent(
'openai:gpt-4',
system_prompt="You provide weather information."
)
@weather_agent.tool_plain
def get_current_temperature(city: str) -> float:
"""
Get the current temperature in Celsius for the specified city.
Args:
city: The name of the city to get the temperature for
Returns:
The current temperature in Celsius
"""
# In a real implementation, this would call a weather API
# This is a mock implementation for demonstration
mock_temperatures = {
"new york": 22.5,
"london": 18.0,
"tokyo": 26.3,
"sydney": 20.1
}
return mock_temperatures.get(city.lower(), 20.0) # Default to 20°C if city not found
@dataclass
class APICredentials:
api_key: str
base_url: str
search_agent = Agent(
'anthropic:claude-3-haiku-20240307',
deps_type=APICredentials,
system_prompt="You help users find information online."
)
@search_agent.tool
async def search_web(ctx: RunContext[APICredentials], query: str, max_results: int = 5) ->
list[dict]:
"""
Search the web for information related to the query.
Args:
query: The search query
max_results: Maximum number of results to return (default: 5)
Returns:
A list of search results, each containing a title, URL, and snippet
"""
headers = {"Authorization": f"Bearer {ctx.deps.api_key}"}
params = {"q": query, "limit": max_results}
Tool Registry
# tools/registry.py
from typing import Dict, Callable, Any, List
from pydantic_ai import Agent
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, Dict[str, Callable]] = {}
def register_tool(self, category: str, name: str, tool_func: Callable) -> None:
"""Register a tool function under a specific category."""
if category not in self._tools:
self._tools[category] = {}
self._tools[category][name] = tool_func
# Example usage
registry = ToolRegistry()
# Register tools
def search_wikipedia(query: str) -> List[dict]:
"""Search Wikipedia for information."""
# Implementation
return []
Dependency Injection
Dependency injection allows you to provide external services and data to your agents
and tools:
@dataclass
class DatabaseService:
connection_string: str
@dataclass
class AppDependencies:
db: DatabaseService
user_id: int
api_key: str
@data_agent.tool
async def query_database(ctx: RunContext[AppDependencies], sql: str) -> List[Dict[str,
Any]]:
"""
Execute an SQL query against the database.
Args:
sql: The SQL query to execute
Returns:
The query results as a list of dictionaries
"""
# Add security check to prevent SQL injection
if "DROP" in sql.upper() or "DELETE" in sql.upper():
raise ValueError("Destructive SQL operations are not allowed")
# Example usage
db_service = DatabaseService(connection_string="postgresql://user:pass@localhost/db")
dependencies = AppDependencies(
db=db_service,
user_id=123,
api_key="sk-api-key"
)
result = data_agent.run_sync(
"Find all users in the database",
deps=dependencies
)
Service Layer
For more complex applications, you might want to create a service layer:
# services/database.py
import asyncpg
from typing import List, Dict, Any, Optional
class DatabaseService:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def query(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str,
Any]]:
"""Execute a query and return results as dictionaries."""
if not self.pool:
await self.initialize()
async def execute(self, sql: str, params: Optional[List[Any]] = None) -> str:
"""Execute a command and return status."""
if not self.pool:
await self.initialize()
# services/vector_store.py
import numpy as np
from typing import List, Dict, Any, Optional
class VectorStoreService:
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize your vector database client here
async def insert(self, id: str, vector: List[float], metadata: Dict[str, Any]) ->
bool:
"""Insert a vector with metadata."""
# Implementation
return True
# services/service_container.py
from dataclasses import dataclass
from .database import DatabaseService
from .vector_store import VectorStoreService
@dataclass
class ServiceContainer:
db: DatabaseService
vector_store: VectorStoreService
@classmethod
def create_from_config(cls, config: dict):
"""Create a service container from configuration."""
db = DatabaseService(config["database_url"])
vector_store = VectorStoreService(config["vector_store_url"])
return cls(db=db, vector_store=vector_store)
# Security
API_KEY = "your-secret-api-key" # In production, use environment variables
api_key_header = APIKeyHeader(name="X-API-Key")
# Models
class MCPRequest(BaseModel):
agent_id: str
function_name: str
parameters: Dict[str, Any]
context: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
status: str
data: Any
error: Optional[str] = None
# Routes
@app.post("/invoke", response_model=MCPResponse, dependencies=[Depends(verify_api_key)])
async def invoke_function(request: MCPRequest):
try:
# In a real implementation, this would dispatch to the appropriate function
# This is a mock implementation for demonstration
if request.function_name == "get_weather":
# Mock weather data
return MCPResponse(
status="success",
data={"temperature": 22.5, "condition": "sunny"}
)
elif request.function_name == "search_database":
# Mock database search
return MCPResponse(
status="success",
data=[{"id": 1, "name": "Example result"}]
)
else:
return MCPResponse(
status="error",
data=None,
error=f"Unknown function: {request.function_name}"
)
except Exception as e:
return MCPResponse(
status="error",
data=None,
error=str(e)
)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
services:
mcp-server:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- API_KEY=${API_KEY}
- DATABASE_URL=${DATABASE_URL}
- VECTOR_STORE_URL=${VECTOR_STORE_URL}
volumes:
- ./logs:/app/logs
restart: unless-stopped
depends_on:
- postgres
- redis
postgres:
image: postgres:14
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
- redis-data:/data
volumes:
postgres-data:
redis-data:
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
Retrieval Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class VectorDBDeps:
collection_name: str
api_key: str
retrieval_agent = Agent(
'openai:gpt-4',
deps_type=VectorDBDeps,
system_prompt="""
You are a retrieval agent that helps users find relevant information.
Use the search_documents tool to find information related to the user's query.
Always cite your sources by including the document ID.
"""
)
@retrieval_agent.tool
async def search_documents(ctx: RunContext[VectorDBDeps], query: str, top_k: int = 3) ->
List[Dict[str, Any]]:
"""
Search for documents related to the query.
Args:
query: The search query
top_k: Maximum number of results to return (default: 3)
Returns:
A list of documents, each containing an ID, content, and relevance score
"""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": "doc1", "content": "Information about topic A", "score": 0.92},
{"id": "doc2", "content": "Information about topic B", "score": 0.85},
{"id": "doc3", "content": "Information about topic C", "score": 0.78}
]
# Example usage
deps = VectorDBDeps(collection_name="knowledge_base", api_key="api-key")
result = retrieval_agent.run_sync(
"What information do we have about topic A?",
deps=deps
)
Planning Agent
from pydantic_ai import Agent
from pydantic import BaseModel, Field
from typing import List
from enum import Enum
class TaskStatus(Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"
class Task(BaseModel):
description: str
status: TaskStatus = TaskStatus.TODO
dependencies: List[int] = Field(default_factory=list)
class Plan(BaseModel):
goal: str
tasks: List[Task]
estimated_completion_time: str
planning_agent = Agent(
'anthropic:claude-3-opus-20240229',
output_type=Plan,
system_prompt="""
You are a planning agent that helps users break down complex goals into actionable
tasks.
For each goal, create a detailed plan with:
1. A clear list of tasks
2. Dependencies between tasks (which tasks must be completed before others)
3. An estimated completion time
# Example usage
result = planning_agent.run_sync(
"I want to build a personal website to showcase my portfolio."
)
print(f"Goal: {result.data.goal}")
print(f"Estimated completion time: {result.data.estimated_completion_time}")
print("Tasks:")
for i, task in enumerate(result.data.tasks):
deps = f"Dependencies: {task.dependencies}" if task.dependencies else "No
dependencies"
print(f"{i+1}. {task.description} ({task.status.value}) - {deps}")
Conversational Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class ConversationHistory:
messages: List[Dict[str, str]]
user_name: str
preferences: Dict[str, str]
conversation_agent = Agent(
'openai:gpt-4',
deps_type=ConversationHistory,
system_prompt="""
You are a conversational agent that maintains context throughout a conversation.
Be friendly, helpful, and personalized to the user's preferences.
"""
)
@conversation_agent.system_prompt
def dynamic_prompt(ctx: RunContext[ConversationHistory]) -> str:
history = "\n".join([
f"User: {msg['user']}\nAssistant: {msg['assistant']}"
for msg in ctx.deps.messages[-5:] # Last 5 messages
])
return f"""
You are chatting with {ctx.deps.user_name}.
User preferences:
{ctx.deps.preferences}
Maintain context from the conversation history and respond in a way that aligns with
Maintain context from the conversation history and respond in a way that aligns with
the user's preferences.
"""
# Example usage
history = ConversationHistory(
messages=[
{"user": "Hi there!", "assistant": "Hello! How can I help you today?"},
{"user": "I'm looking for a good book to read.", "assistant": "What genres do you
enjoy?"},
{"user": "I like science fiction and fantasy.", "assistant": "Great choices! Some
recommendations..."}
],
user_name="Alex",
preferences={"tone": "casual", "response_length": "detailed"}
)
response = conversation_agent.run_sync(
"Any recommendations for books like Dune?",
deps=history
)
@dataclass
class RAGDependencies:
vector_db_url: str
api_key: str
user_id: str
rag_agent = Agent(
'openai:gpt-4',
deps_type=RAGDependencies,
system_prompt="""
You are a RAG (Retrieval-Augmented Generation) agent.
Process:
1. When a user asks a question, use the search_knowledge_base tool to retrieve
relevant information
2. Synthesize the retrieved information to provide a comprehensive answer
3. Always cite your sources by including document IDs
4. If the retrieved information is insufficient, acknowledge the limitations
@rag_agent.tool
async def search_knowledge_base(ctx: RunContext[RAGDependencies], query: str, top_k: int =
5) -> List[Dict[str, Any]]:
"""
Search the knowledge base for information related to the query.
Args:
query: The search query
top_k: Maximum number of results to return (default: 5)
Returns:
A list of documents, each containing an ID, content, and relevance score
"""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": "doc1", "content": "Detailed information about topic X...", "score": 0.95},
{"id": "doc2", "content": "Additional context about topic X...", "score": 0.87},
{"id": "doc3", "content": "Related information about topic Y...", "score": 0.82},
{"id": "doc4", "content": "Historical background on topic X...", "score": 0.78},
{"id": "doc5", "content": "Recent developments in topic X...", "score": 0.75}
]
# Example usage
deps = RAGDependencies(
vector_db_url="https://api.vectordb.example.com",
api_key="api-key",
user_id="user123"
)
result = rag_agent.run_sync(
"What is the latest information about topic X?",
deps=deps
)
class WorkflowStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class WorkflowStep(BaseModel):
id: str
name: str
status: WorkflowStatus = WorkflowStatus.PENDING
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class Workflow(BaseModel):
id: str
name: str
steps: List[WorkflowStep]
current_step_index: int = 0
status: WorkflowStatus = WorkflowStatus.PENDING
context: Dict[str, Any] = Field(default_factory=dict)
@dataclass
class WorkflowDependencies:
workflow: Workflow
services: Dict[str, Any]
workflow_agent = Agent(
'openai:gpt-4',
deps_type=WorkflowDependencies,
system_prompt="""
You are a workflow orchestration agent responsible for managing multi-step processes.
Your responsibilities:
1. Execute the current step in the workflow
2. Update the workflow state based on the results
3. Determine the next step to execute
4. Handle errors and provide recovery options
Follow the workflow definition precisely and maintain the workflow context.
"""
)
@workflow_agent.tool
async def execute_step(ctx: RunContext[WorkflowDependencies]) -> Dict[str, Any]:
"""
Execute the current step in the workflow.
Returns:
The result of the step execution
"""
workflow = ctx.deps.workflow
current_step = workflow.steps[workflow.current_step_index]
try:
# Update step status
current_step.status = WorkflowStatus.IN_PROGRESS
except Exception as e:
current_step.status = WorkflowStatus.FAILED
current_step.error = str(e)
return {"success": False, "step_id": current_step.id, "error": str(e)}
@workflow_agent.tool
async def advance_workflow(ctx: RunContext[WorkflowDependencies]) -> Dict[str, Any]:
"""
Advance the workflow to the next step.
Returns:
Information about the next step
"""
workflow = ctx.deps.workflow
return {
"success": True,
"next_step_id": next_step.id,
"next_step_name": next_step.name
}
# Example usage
workflow = Workflow(
id="data_pipeline_1",
name="Data Processing Pipeline",
steps=[
WorkflowStep(id="data_extraction", name="Extract Data from Source"),
WorkflowStep(id="data_transformation", name="Transform Data"),
WorkflowStep(id="data_loading", name="Load Data to Destination")
]
)
deps = WorkflowDependencies(
workflow=workflow,
services={} # In a real implementation, this would contain service instances
)
if "workflow_completed" in advance_result.output:
print("Workflow completed successfully!")
break
return workflow
End-to-End Example
Here’s a complete end-to-end example of a simple customer support agent platform:
# main.py
import asyncio
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from enum import Enum
from dataclasses import dataclass
# Models
class SupportCategory(Enum):
BILLING = "billing"
TECHNICAL = "technical"
ACCOUNT = "account"
OTHER = "other"
class SupportRequest(BaseModel):
customer_id: str
message: str
class SupportResponse(BaseModel):
response: str
category: SupportCategory
escalate: bool = False
follow_up_actions: List[str] = Field(default_factory=list)
# Services
@dataclass
class CustomerDatabase:
async def get_customer(self, customer_id: str) -> Dict[str, Any]:
# Mock implementation
return {
"id": customer_id,
"name": "John Doe",
"email": "john@example.com",
"account_type": "premium",
"billing_status": "active"
}
@dataclass
class KnowledgeBase:
async def search(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
# Mock implementation
return [
{"id": "kb1", "title": "Billing FAQ", "content": "Information about
billing..."},
{"id": "kb2", "title": "Technical Support Guide", "content": "Troubleshooting
steps..."}
]
@dataclass
class SupportDependencies:
customer_db: CustomerDatabase
knowledge_base: KnowledgeBase
customer_id: str
# Agent
support_agent = Agent(
'openai:gpt-4',
deps_type=SupportDependencies,
output_type=SupportResponse,
system_prompt="""
You are a customer support agent for our company.
Your responsibilities:
1. Respond to customer inquiries in a helpful and professional manner
2. Categorize the inquiry appropriately
3. Determine if the issue needs to be escalated to a human agent
4. Suggest follow-up actions if applicable
Use the available tools to gather information about the customer and relevant
knowledge base articles.
"""
)
@support_agent.tool
async def get_customer_info(ctx: RunContext[SupportDependencies]) -> Dict[str, Any]:
"""
Get information about the customer.
Returns:
Customer information including name, email, account type, and billing status
"""
return await ctx.deps.customer_db.get_customer(ctx.deps.customer_id)
@support_agent.tool
async def get_customer_history(ctx: RunContext[SupportDependencies]) -> List[Dict[str,
Any]]:
"""
Get the customer's support ticket history.
Returns:
List of previous support tickets
"""
return await ctx.deps.customer_db.get_customer_tickets(ctx.deps.customer_id)
@support_agent.tool
async def search_knowledge_base(ctx: RunContext[SupportDependencies], query: str) ->
List[Dict[str, Any]]:
"""
Search the knowledge base for relevant articles.
Args:
query: The search query
Returns:
List of relevant knowledge base articles
"""
return await ctx.deps.knowledge_base.search(query)
# FastAPI app
app = FastAPI(title="Customer Support Agent API")
@app.post("/support", response_model=SupportResponse)
async def handle_support_request(
request: SupportRequest,
authorization: Optional[str] = Header(None)
):
# In a real implementation, validate the authorization token
if not authorization:
raise HTTPException(status_code=401, detail="Unauthorized")
# Initialize services
customer_db = CustomerDatabase()
knowledge_base = KnowledgeBase()
# Create dependencies
deps = SupportDependencies(
customer_db=customer_db,
knowledge_base=knowledge_base,
customer_id=request.customer_id
)
return result.data
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
@dataclass
class TestDeps:
value: str
# Test agent
@pytest.fixture
def test_agent():
agent = Agent(
'openai:gpt-4',
deps_type=TestDeps,
output_type=TestOutput,
system_prompt="You are a test agent."
)
@agent.tool
async def test_tool(ctx: RunContext[TestDeps], input_value: str) -> str:
return f"Processed: {input_value} with {ctx.deps.value}"
return agent
# Tests
@pytest.mark.asyncio
async def test_agent_run(test_agent):
# Mock the LLM call to avoid actual API calls during testing
with patch('pydantic_ai.agent.Agent._call_llm', new_callable=AsyncMock) as mock_call:
# Set up the mock to return a valid response
mock_call.return_value = {
"output": "This is a test response",
"data": {"result": "Success", "score": 0.95}
}
# Assertions
assert result.output == "This is a test response"
assert result.data.result == "Success"
assert result.data.score == 0.95
@pytest.mark.asyncio
async def test_tool_execution(test_agent):
# Test that the tool works correctly
ctx = RunContext(deps=TestDeps(value="test_context"))
result = await test_agent.tools["test_tool"](ctx, "test_input")
assert result == "Processed: test_input with test_context"
Integration Testing
# tests/test_integration.py
import pytest
import asyncio
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
# Test client
@pytest.fixture
def client():
return TestClient(app)
# Tests
def test_support_endpoint(client):
# Mock the agent run method to avoid actual LLM calls
with patch('pydantic_ai.agent.Agent.run', new_callable=AsyncMock) as mock_run:
# Set up the mock to return a valid response
mock_run.return_value.data = {
"response": "I can help with your billing issue.",
"category": "billing",
"escalate": False,
"follow_up_actions": ["Check billing status"]
}
# Assertions
assert response.status_code == 200
data = response.json()
assert data["response"] == "I can help with your billing issue."
assert data["category"] == "billing"
assert data["escalate"] is False
assert "Check billing status" in data["follow_up_actions"]
Debugging Tips
import logging
logging.basicConfig(level=logging.DEBUG)
# The execution graph will be saved to a file that you can visualize
print(f"Execution graph saved to: {agent.last_execution_graph_path}")
Deployment Considerations
Environment Variables
# config/settings.py
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# API keys
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY", "")
google_api_key: str = os.getenv("GOOGLE_API_KEY", "")
# Database
database_url: str = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/db")
# MCP Server
mcp_server_host: str = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
mcp_server_port: int = int(os.getenv("MCP_SERVER_PORT", "8000"))
mcp_api_key: str = os.getenv("MCP_API_KEY", "default-key")
# Logging
log_level: str = os.getenv("LOG_LEVEL", "INFO")
class Config:
env_file = ".env"
Containerization
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
Kubernetes Deployment
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pydantic-ai-platform
labels:
app: pydantic-ai-platform
spec:
replicas: 3
selector:
matchLabels:
app: pydantic-ai-platform
template:
metadata:
labels:
app: pydantic-ai-platform
spec:
containers:
- name: pydantic-ai-platform
image: your-registry/pydantic-ai-platform:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-api-key
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-credentials
key: url
resources:
limits:
cpu: "1"
memory: "1Gi"
requests:
cpu: "500m"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: pydantic-ai-platform
spec:
selector:
app: pydantic-ai-platform
ports:
- port: 80
targetPort: 8000
type: ClusterIP
# Models
class ResearchTopic(BaseModel):
title: str
keywords: List[str]
description: str
class ResearchOutline(BaseModel):
sections: List[Dict[str, Any]]
references: List[str]
class ResearchReport(BaseModel):
title: str
introduction: str
sections: List[Dict[str, str]]
conclusion: str
references: List[str]
# Dependencies
@dataclass
class ResearchDeps:
api_key: str
user_id: str
# Agents
topic_agent = Agent(
'openai:gpt-4',
deps_type=ResearchDeps,
output_type=ResearchTopic,
system_prompt="You help users define research topics."
)
outline_agent = Agent(
'anthropic:claude-3-opus-20240229',
deps_type=ResearchDeps,
output_type=ResearchOutline,
system_prompt="You create detailed outlines for research reports."
)
report_agent = Agent(
'openai:gpt-4',
deps_type=ResearchDeps,
output_type=ResearchReport,
system_prompt="You write comprehensive research reports based on outlines."
)
# Workflow function
async def generate_research_report(query: str, deps: ResearchDeps) -> ResearchReport:
# Step 1: Generate research topic
topic_result = await topic_agent.run(
f"Generate a research topic based on: {query}",
deps=deps
)
topic = topic_result.data
Sections:
{outline.sections}
References to include:
{outline.references}
"""
Streaming Responses
from pydantic_ai import Agent
import asyncio
streaming_agent = Agent(
'openai:gpt-4',
system_prompt="You are a storytelling agent that creates engaging narratives.",
streaming=True # Enable streaming
)
# Synchronous streaming
for chunk in streaming_agent.stream_sync("Tell me a short story about a robot learning to
paint."):
print(chunk, end="", flush=True)
# Asynchronous streaming
async def stream_story():
async for chunk in streaming_agent.stream("Tell me a short story about a robot
learning to paint."):
print(chunk, end="", flush=True)
await asyncio.sleep(0.01) # Small delay for demonstration
class CustomLLMModel(BaseModel):
"""Custom LLM model implementation."""
payload = {
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stop": stop,
**kwargs
}
return ModelResponse(
content=data["choices"][0]["message"]["content"],
model=self.model_name,
usage={
"prompt_tokens": data["usage"]["prompt_tokens"],
"completion_tokens": data["usage"]["completion_tokens"],
"total_tokens": data["usage"]["total_tokens"]
},
raw_response=data
)
agent = Agent(
custom_model,
system_prompt="You are a helpful assistant."
)