#Overview
Learn how to build custom agent handlers that orchestrate multiple database operations to solve complex business problems. This example shows how to combine PostgreSQL tools within custom handlers to create reusable, powerful agent capabilities.
#What You'll Learn
- Creating custom handlers that use database tools
- Orchestrating multiple database queries in sequence
- Building user activity analytics with handlers
- Combining data from multiple tables
- Creating reusable business logic patterns
- Error handling in custom handlers
#Prerequisites
- Understanding of PostgreSQL Tools basics
- PostgreSQL database with sample data
- Familiarity with Python async/await
- Basic understanding of agent handlers
#What Are Custom Handlers?
Handlers are custom functions that extend an agent's capabilities beyond basic tool calling. They can:
- Orchestrate multiple tool calls
- Implement complex business logic
- Process and transform data
- Handle multi-step workflows
- Return structured results
Think of handlers as custom methods for your agent that combine existing tools in powerful ways.
#Step 1: Basic Handler Structure
Here's the anatomy of a custom handler:
async def my_handler(data, context, agent):
"""
Custom handler function signature
Args:
data: Input data from the handler call
context: Execution context (metadata, tracing info)
agent: Reference to the agent instance
Returns:
dict: Handler result (can be any structure)
"""
# Access input data
user_input = data.get("user_id")
# Call agent tools
result = await agent.call_tool("query_database", {
"sql": "SELECT * FROM users WHERE id = $1",
"params": [user_input]
})
# Process and return results
return {"user_found": result["success"]}Key points:
- Handlers are async functions
- Receive
data,context, andagentparameters - Can call any tool available to the agent
- Return any data structure you need
#Step 2: Create a User Activity Analyzer
Let's build a handler that analyzes user behavior across multiple tables:
from daita.agents.agent import Agent
from daita.plugins.postgresql import postgresql
import asyncio
async def analyze_user_activity(data, context, agent):
"""Analyze user activity using multiple database queries"""
user_id = data.get("user_id")
# Step 1: Get user details
user_result = await agent.call_tool("query_database", {
"sql": "SELECT username, email, created_at FROM users WHERE id = $1",
"params": [str(user_id)]
})
if not user_result["success"] or user_result["row_count"] == 0:
return {"error": "User not found"}
user = user_result["rows"][0]
# Step 2: Get user's recent orders
orders_result = await agent.call_tool("query_database", {
"sql": """
SELECT COUNT(*) as order_count,
SUM(total_amount) as total_spent,
MAX(order_date) as last_order
FROM orders
WHERE user_id = $1
AND order_date > NOW() - INTERVAL '90 days'
""",
"params": [str(user_id)]
})
stats = orders_result["rows"][0] if orders_result["success"] else {}
# Step 3: Get favorite categories
categories_result = await agent.call_tool("query_database", {
"sql": """
SELECT p.category, COUNT(*) as purchase_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.user_id = $1
GROUP BY p.category
ORDER BY purchase_count DESC
LIMIT 3
""",
"params": [str(user_id)]
})
favorite_categories = [
row["category"] for row in categories_result["rows"]
] if categories_result["success"] else []
# Compile comprehensive analysis
return {
"user_info": {
"username": user["username"],
"email": user["email"],
"member_since": user["created_at"]
},
"activity_stats": {
"orders_last_90_days": stats.get("order_count", 0),
"total_spent": float(stats.get("total_spent", 0)),
"last_order_date": stats.get("last_order")
},
"preferences": {
"favorite_categories": favorite_categories
},
"engagement_level": (
"high" if stats.get("order_count", 0) > 5
else "medium" if stats.get("order_count", 0) > 0
else "low"
)
}What this handler does:
- Fetches user profile information
- Calculates order statistics (count, spend, recency)
- Identifies favorite product categories
- Computes engagement level
- Returns structured analysis
#Step 3: Register the Handler with an Agent
Add your custom handler to an agent:
async def main():
# Create PostgreSQL plugin
db = postgresql(
host="localhost",
database="analytics_db",
user="analytics_user",
password="analytics_pass"
)
# Create agent with custom handler
agent = Agent(
name="analytics_agent",
tools=[db],
handlers={
"analyze_user": analyze_user_activity # Register handler
}
)
print(f"Created analytics agent with tools: {agent.tool_names}")
print(f"Available handlers: {list(agent.handlers.keys())}")Handler registration:
- Pass handlers dict to Agent constructor
- Key = handler name (used to call it)
- Value = handler function reference
#Step 4: Use the Custom Handler
Call your handler using agent.process():
async def main():
db = postgresql(
host="localhost",
database="analytics_db",
user="analytics_user",
password="analytics_pass"
)
agent = Agent(
name="analytics_agent",
tools=[db],
handlers={
"analyze_user": analyze_user_activity
}
)
# Use the custom handler
print("\nAnalyzing user activity for user_id=42:")
result = await agent.process("analyze_user", data={"user_id": 42})
# Result is wrapped by BaseAgent.process()
analysis = result["result"]
# Display results
print("\nUser Analysis:")
print(f" Username: {analysis['user_info']['username']}")
print(f" Email: {analysis['user_info']['email']}")
print(f" Member Since: {analysis['user_info']['member_since']}")
print(f"\nActivity (Last 90 Days):")
print(f" Orders: {analysis['activity_stats']['orders_last_90_days']}")
print(f" Total Spent: ${analysis['activity_stats']['total_spent']:.2f}")
print(f" Last Order: {analysis['activity_stats']['last_order_date']}")
print(f"\nPreferences:")
print(f" Favorite Categories: {', '.join(analysis['preferences']['favorite_categories'])}")
print(f"\nEngagement Level: {analysis['engagement_level'].upper()}")
await agent.stop()
if __name__ == "__main__":
asyncio.run(main())Output example:
Analyzing user activity for user_id=42:
User Analysis:
Username: john_doe
Email: john@example.com
Member Since: 2025-06-15 10:30:00
Activity (Last 90 Days):
Orders: 8
Total Spent: $1,247.50
Last Order: 2026-01-05
Preferences:
Favorite Categories: Electronics, Books, Home & Garden
Engagement Level: HIGH#Step 5: Add Error Handling and Validation
Robust handlers include proper error handling:
async def analyze_user_activity_robust(data, context, agent):
"""Enhanced handler with error handling and validation"""
# Validate input
user_id = data.get("user_id")
if not user_id:
return {"error": "user_id is required", "success": False}
try:
# Get user details with error handling
user_result = await agent.call_tool("query_database", {
"sql": "SELECT username, email, created_at FROM users WHERE id = $1",
"params": [str(user_id)]
})
if not user_result["success"]:
return {
"error": f"Database error: {user_result.get('error')}",
"success": False
}
if user_result["row_count"] == 0:
return {
"error": f"User {user_id} not found",
"success": False
}
user = user_result["rows"][0]
# Get orders with timeout protection
orders_result = await agent.call_tool("query_database", {
"sql": """
SELECT COUNT(*) as order_count,
COALESCE(SUM(total_amount), 0) as total_spent,
MAX(order_date) as last_order
FROM orders
WHERE user_id = $1
AND order_date > NOW() - INTERVAL '90 days'
""",
"params": [str(user_id)]
})
# Handle query failure gracefully
if orders_result["success"]:
stats = orders_result["rows"][0]
else:
stats = {
"order_count": 0,
"total_spent": 0,
"last_order": None
}
# Get categories (optional, don't fail if it errors)
favorite_categories = []
try:
categories_result = await agent.call_tool("query_database", {
"sql": """
SELECT p.category, COUNT(*) as purchase_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.user_id = $1
GROUP BY p.category
ORDER BY purchase_count DESC
LIMIT 3
""",
"params": [str(user_id)]
})
if categories_result["success"]:
favorite_categories = [
row["category"] for row in categories_result["rows"]
]
except Exception as e:
print(f"Warning: Could not fetch categories: {e}")
# Return successful analysis
return {
"success": True,
"user_info": {
"username": user["username"],
"email": user["email"],
"member_since": str(user["created_at"])
},
"activity_stats": {
"orders_last_90_days": int(stats.get("order_count", 0)),
"total_spent": float(stats.get("total_spent", 0)),
"last_order_date": str(stats.get("last_order")) if stats.get("last_order") else None
},
"preferences": {
"favorite_categories": favorite_categories
},
"engagement_level": (
"high" if stats.get("order_count", 0) > 5
else "medium" if stats.get("order_count", 0) > 0
else "low"
)
}
except Exception as e:
return {
"error": f"Unexpected error: {str(e)}",
"success": False
}#Step 6: Multiple Handlers Pattern
Create agents with multiple specialized handlers:
async def get_user_summary(data, context, agent):
"""Quick user summary"""
user_id = data.get("user_id")
result = await agent.call_tool("query_database", {
"sql": "SELECT username, email, created_at FROM users WHERE id = $1",
"params": [str(user_id)]
})
if result["success"] and result["row_count"] > 0:
return result["rows"][0]
return {"error": "User not found"}
async def get_order_history(data, context, agent):
"""Get user's complete order history"""
user_id = data.get("user_id")
limit = data.get("limit", 10)
result = await agent.call_tool("query_database", {
"sql": """
SELECT id, order_date, total_amount, status
FROM orders
WHERE user_id = $1
ORDER BY order_date DESC
LIMIT $2
""",
"params": [str(user_id), limit]
})
return {
"success": result["success"],
"orders": result["rows"] if result["success"] else []
}
async def main():
db = postgresql(host="localhost", database="ecommerce_db")
# Agent with multiple handlers
agent = Agent(
name="customer_service_agent",
tools=[db],
handlers={
"analyze_user": analyze_user_activity,
"user_summary": get_user_summary,
"order_history": get_order_history
}
)
# Use different handlers for different tasks
summary = await agent.process("user_summary", data={"user_id": 42})
print(f"User: {summary['result']['username']}")
orders = await agent.process("order_history", data={"user_id": 42, "limit": 5})
print(f"Recent orders: {len(orders['result']['orders'])}")
analysis = await agent.process("analyze_user", data={"user_id": 42})
print(f"Engagement: {analysis['result']['engagement_level']}")
await agent.stop()#Complete Example
Full working example with custom handlers:
from daita.agents.agent import Agent
from daita.plugins.postgresql import postgresql
import asyncio
async def analyze_user_activity(data, context, agent):
"""Comprehensive user activity analysis"""
user_id = data.get("user_id")
# Get user details
user_result = await agent.call_tool("query_database", {
"sql": "SELECT username, email, created_at FROM users WHERE id = $1",
"params": [str(user_id)]
})
if not user_result["success"] or user_result["row_count"] == 0:
return {"error": "User not found"}
user = user_result["rows"][0]
# Get order statistics
orders_result = await agent.call_tool("query_database", {
"sql": """
SELECT COUNT(*) as order_count,
SUM(total_amount) as total_spent,
MAX(order_date) as last_order
FROM orders
WHERE user_id = $1
AND order_date > NOW() - INTERVAL '90 days'
""",
"params": [str(user_id)]
})
stats = orders_result["rows"][0] if orders_result["success"] else {}
# Get favorite categories
categories_result = await agent.call_tool("query_database", {
"sql": """
SELECT p.category, COUNT(*) as purchase_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.user_id = $1
GROUP BY p.category
ORDER BY purchase_count DESC
LIMIT 3
""",
"params": [str(user_id)]
})
favorite_categories = [
row["category"] for row in categories_result["rows"]
] if categories_result["success"] else []
return {
"user_info": {
"username": user["username"],
"email": user["email"],
"member_since": user["created_at"]
},
"activity_stats": {
"orders_last_90_days": stats.get("order_count", 0),
"total_spent": float(stats.get("total_spent", 0)),
"last_order_date": stats.get("last_order")
},
"preferences": {
"favorite_categories": favorite_categories
},
"engagement_level": (
"high" if stats.get("order_count", 0) > 5
else "medium" if stats.get("order_count", 0) > 0
else "low"
)
}
async def main():
# Create database plugin
db = postgresql(
host="localhost",
database="analytics_db",
user="analytics_user",
password="analytics_pass"
)
# Create agent with custom handler
agent = Agent(
name="analytics_agent",
tools=[db],
handlers={
"analyze_user": analyze_user_activity
}
)
print(f"Created analytics agent with tools: {agent.tool_names}")
# Use the handler
print("\nAnalyzing user activity for user_id=42:")
result = await agent.process("analyze_user", data={"user_id": 42})
analysis = result["result"]
print("\nUser Analysis:")
print(f" Username: {analysis['user_info']['username']}")
print(f" Email: {analysis['user_info']['email']}")
print(f" Member Since: {analysis['user_info']['member_since']}")
print(f"\nActivity (Last 90 Days):")
print(f" Orders: {analysis['activity_stats']['orders_last_90_days']}")
print(f" Total Spent: ${analysis['activity_stats']['total_spent']:.2f}")
print(f" Last Order: {analysis['activity_stats']['last_order_date']}")
print(f"\nPreferences:")
print(f" Favorite Categories: {', '.join(analysis['preferences']['favorite_categories'])}")
print(f"\nEngagement Level: {analysis['engagement_level'].upper()}")
await agent.stop()
if __name__ == "__main__":
asyncio.run(main())#Framework Internals
How handlers work:
- Registration: Handlers stored in agent's handler registry
- Invocation:
agent.process(handler_name, data)calls the handler - Context: Agent passes execution context for tracing
- Tool Access: Handlers can call any registered tool via
agent.call_tool() - Result Wrapping: Results wrapped in standard format by framework
Handler vs LLM Reasoning:
- Handlers: Deterministic, explicit control flow, fast
- LLM: Non-deterministic, autonomous decision-making, flexible
Use handlers when you know the exact steps needed!
#Key Takeaways
- Handlers orchestrate tools into complex workflows
- Multi-step analysis is easy with sequential tool calls
- Reusable business logic encapsulated in handler functions
- Error handling is critical for production handlers
- Multiple handlers create specialized agent capabilities
- Faster than LLM reasoning for known workflows
#Next Steps
- Tool Introspection to programmatically explore tools
- Multi-Plugin Architecture to combine PostgreSQL with other plugins
- Database Query Agent for LLM-powered autonomous queries