Back to Examples
Intermediate

Building Custom Handlers with Database Tools

Create custom business logic that orchestrates multiple database operations using agent handlers

PluginsPostgreSQLCustom Handlers

#Overview

Learn how to build custom agent handlers that orchestrate multiple database operations to solve complex business problems. This example shows how to combine PostgreSQL tools within custom handlers to create reusable, powerful agent capabilities.

#What You'll Learn

  • Creating custom handlers that use database tools
  • Orchestrating multiple database queries in sequence
  • Building user activity analytics with handlers
  • Combining data from multiple tables
  • Creating reusable business logic patterns
  • Error handling in custom handlers

#Prerequisites

  • Understanding of PostgreSQL Tools basics
  • PostgreSQL database with sample data
  • Familiarity with Python async/await
  • Basic understanding of agent handlers

#What Are Custom Handlers?

Handlers are custom functions that extend an agent's capabilities beyond basic tool calling. They can:

  • Orchestrate multiple tool calls
  • Implement complex business logic
  • Process and transform data
  • Handle multi-step workflows
  • Return structured results

Think of handlers as custom methods for your agent that combine existing tools in powerful ways.

#Step 1: Basic Handler Structure

Here's the anatomy of a custom handler:

python
async def my_handler(data, context, agent):
    """
    Custom handler function signature
 
    Args:
        data: Input data from the handler call
        context: Execution context (metadata, tracing info)
        agent: Reference to the agent instance
 
    Returns:
        dict: Handler result (can be any structure)
    """
    # Access input data
    user_input = data.get("user_id")
 
    # Call agent tools
    result = await agent.call_tool("query_database", {
        "sql": "SELECT * FROM users WHERE id = $1",
        "params": [user_input]
    })
 
    # Process and return results
    return {"user_found": result["success"]}

Key points:

  • Handlers are async functions
  • Receive data, context, and agent parameters
  • Can call any tool available to the agent
  • Return any data structure you need

#Step 2: Create a User Activity Analyzer

Let's build a handler that analyzes user behavior across multiple tables:

python
from daita.agents.agent import Agent
from daita.plugins.postgresql import postgresql
import asyncio
 
async def analyze_user_activity(data, context, agent):
    """Analyze user activity using multiple database queries"""
    user_id = data.get("user_id")
 
    # Step 1: Get user details
    user_result = await agent.call_tool("query_database", {
        "sql": "SELECT username, email, created_at FROM users WHERE id = $1",
        "params": [str(user_id)]
    })
 
    if not user_result["success"] or user_result["row_count"] == 0:
        return {"error": "User not found"}
 
    user = user_result["rows"][0]
 
    # Step 2: Get user's recent orders
    orders_result = await agent.call_tool("query_database", {
        "sql": """
            SELECT COUNT(*) as order_count,
                   SUM(total_amount) as total_spent,
                   MAX(order_date) as last_order
            FROM orders
            WHERE user_id = $1
            AND order_date > NOW() - INTERVAL '90 days'
        """,
        "params": [str(user_id)]
    })
 
    stats = orders_result["rows"][0] if orders_result["success"] else {}
 
    # Step 3: Get favorite categories
    categories_result = await agent.call_tool("query_database", {
        "sql": """
            SELECT p.category, COUNT(*) as purchase_count
            FROM orders o
            JOIN order_items oi ON o.id = oi.order_id
            JOIN products p ON oi.product_id = p.id
            WHERE o.user_id = $1
            GROUP BY p.category
            ORDER BY purchase_count DESC
            LIMIT 3
        """,
        "params": [str(user_id)]
    })
 
    favorite_categories = [
        row["category"] for row in categories_result["rows"]
    ] if categories_result["success"] else []
 
    # Compile comprehensive analysis
    return {
        "user_info": {
            "username": user["username"],
            "email": user["email"],
            "member_since": user["created_at"]
        },
        "activity_stats": {
            "orders_last_90_days": stats.get("order_count", 0),
            "total_spent": float(stats.get("total_spent", 0)),
            "last_order_date": stats.get("last_order")
        },
        "preferences": {
            "favorite_categories": favorite_categories
        },
        "engagement_level": (
            "high" if stats.get("order_count", 0) > 5
            else "medium" if stats.get("order_count", 0) > 0
            else "low"
        )
    }

What this handler does:

  1. Fetches user profile information
  2. Calculates order statistics (count, spend, recency)
  3. Identifies favorite product categories
  4. Computes engagement level
  5. Returns structured analysis

#Step 3: Register the Handler with an Agent

Add your custom handler to an agent:

python
async def main():
    # Create PostgreSQL plugin
    db = postgresql(
        host="localhost",
        database="analytics_db",
        user="analytics_user",
        password="analytics_pass"
    )
 
    # Create agent with custom handler
    agent = Agent(
        name="analytics_agent",
        tools=[db],
        handlers={
            "analyze_user": analyze_user_activity  # Register handler
        }
    )
 
    print(f"Created analytics agent with tools: {agent.tool_names}")
    print(f"Available handlers: {list(agent.handlers.keys())}")

Handler registration:

  • Pass handlers dict to Agent constructor
  • Key = handler name (used to call it)
  • Value = handler function reference

#Step 4: Use the Custom Handler

Call your handler using agent.process():

python
async def main():
    db = postgresql(
        host="localhost",
        database="analytics_db",
        user="analytics_user",
        password="analytics_pass"
    )
 
    agent = Agent(
        name="analytics_agent",
        tools=[db],
        handlers={
            "analyze_user": analyze_user_activity
        }
    )
 
    # Use the custom handler
    print("\nAnalyzing user activity for user_id=42:")
    result = await agent.process("analyze_user", data={"user_id": 42})
 
    # Result is wrapped by BaseAgent.process()
    analysis = result["result"]
 
    # Display results
    print("\nUser Analysis:")
    print(f"  Username: {analysis['user_info']['username']}")
    print(f"  Email: {analysis['user_info']['email']}")
    print(f"  Member Since: {analysis['user_info']['member_since']}")
    print(f"\nActivity (Last 90 Days):")
    print(f"  Orders: {analysis['activity_stats']['orders_last_90_days']}")
    print(f"  Total Spent: ${analysis['activity_stats']['total_spent']:.2f}")
    print(f"  Last Order: {analysis['activity_stats']['last_order_date']}")
    print(f"\nPreferences:")
    print(f"  Favorite Categories: {', '.join(analysis['preferences']['favorite_categories'])}")
    print(f"\nEngagement Level: {analysis['engagement_level'].upper()}")
 
    await agent.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

Output example:

python
Analyzing user activity for user_id=42:
 
User Analysis:
  Username: john_doe
  Email: john@example.com
  Member Since: 2025-06-15 10:30:00
 
Activity (Last 90 Days):
  Orders: 8
  Total Spent: $1,247.50
  Last Order: 2026-01-05
 
Preferences:
  Favorite Categories: Electronics, Books, Home & Garden
 
Engagement Level: HIGH

#Step 5: Add Error Handling and Validation

Robust handlers include proper error handling:

python
async def analyze_user_activity_robust(data, context, agent):
    """Enhanced handler with error handling and validation"""
 
    # Validate input
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "user_id is required", "success": False}
 
    try:
        # Get user details with error handling
        user_result = await agent.call_tool("query_database", {
            "sql": "SELECT username, email, created_at FROM users WHERE id = $1",
            "params": [str(user_id)]
        })
 
        if not user_result["success"]:
            return {
                "error": f"Database error: {user_result.get('error')}",
                "success": False
            }
 
        if user_result["row_count"] == 0:
            return {
                "error": f"User {user_id} not found",
                "success": False
            }
 
        user = user_result["rows"][0]
 
        # Get orders with timeout protection
        orders_result = await agent.call_tool("query_database", {
            "sql": """
                SELECT COUNT(*) as order_count,
                       COALESCE(SUM(total_amount), 0) as total_spent,
                       MAX(order_date) as last_order
                FROM orders
                WHERE user_id = $1
                AND order_date > NOW() - INTERVAL '90 days'
            """,
            "params": [str(user_id)]
        })
 
        # Handle query failure gracefully
        if orders_result["success"]:
            stats = orders_result["rows"][0]
        else:
            stats = {
                "order_count": 0,
                "total_spent": 0,
                "last_order": None
            }
 
        # Get categories (optional, don't fail if it errors)
        favorite_categories = []
        try:
            categories_result = await agent.call_tool("query_database", {
                "sql": """
                    SELECT p.category, COUNT(*) as purchase_count
                    FROM orders o
                    JOIN order_items oi ON o.id = oi.order_id
                    JOIN products p ON oi.product_id = p.id
                    WHERE o.user_id = $1
                    GROUP BY p.category
                    ORDER BY purchase_count DESC
                    LIMIT 3
                """,
                "params": [str(user_id)]
            })
 
            if categories_result["success"]:
                favorite_categories = [
                    row["category"] for row in categories_result["rows"]
                ]
        except Exception as e:
            print(f"Warning: Could not fetch categories: {e}")
 
        # Return successful analysis
        return {
            "success": True,
            "user_info": {
                "username": user["username"],
                "email": user["email"],
                "member_since": str(user["created_at"])
            },
            "activity_stats": {
                "orders_last_90_days": int(stats.get("order_count", 0)),
                "total_spent": float(stats.get("total_spent", 0)),
                "last_order_date": str(stats.get("last_order")) if stats.get("last_order") else None
            },
            "preferences": {
                "favorite_categories": favorite_categories
            },
            "engagement_level": (
                "high" if stats.get("order_count", 0) > 5
                else "medium" if stats.get("order_count", 0) > 0
                else "low"
            )
        }
 
    except Exception as e:
        return {
            "error": f"Unexpected error: {str(e)}",
            "success": False
        }

#Step 6: Multiple Handlers Pattern

Create agents with multiple specialized handlers:

python
async def get_user_summary(data, context, agent):
    """Quick user summary"""
    user_id = data.get("user_id")
 
    result = await agent.call_tool("query_database", {
        "sql": "SELECT username, email, created_at FROM users WHERE id = $1",
        "params": [str(user_id)]
    })
 
    if result["success"] and result["row_count"] > 0:
        return result["rows"][0]
    return {"error": "User not found"}
 
 
async def get_order_history(data, context, agent):
    """Get user's complete order history"""
    user_id = data.get("user_id")
    limit = data.get("limit", 10)
 
    result = await agent.call_tool("query_database", {
        "sql": """
            SELECT id, order_date, total_amount, status
            FROM orders
            WHERE user_id = $1
            ORDER BY order_date DESC
            LIMIT $2
        """,
        "params": [str(user_id), limit]
    })
 
    return {
        "success": result["success"],
        "orders": result["rows"] if result["success"] else []
    }
 
 
async def main():
    db = postgresql(host="localhost", database="ecommerce_db")
 
    # Agent with multiple handlers
    agent = Agent(
        name="customer_service_agent",
        tools=[db],
        handlers={
            "analyze_user": analyze_user_activity,
            "user_summary": get_user_summary,
            "order_history": get_order_history
        }
    )
 
    # Use different handlers for different tasks
    summary = await agent.process("user_summary", data={"user_id": 42})
    print(f"User: {summary['result']['username']}")
 
    orders = await agent.process("order_history", data={"user_id": 42, "limit": 5})
    print(f"Recent orders: {len(orders['result']['orders'])}")
 
    analysis = await agent.process("analyze_user", data={"user_id": 42})
    print(f"Engagement: {analysis['result']['engagement_level']}")
 
    await agent.stop()

#Complete Example

Full working example with custom handlers:

python
from daita.agents.agent import Agent
from daita.plugins.postgresql import postgresql
import asyncio
 
 
async def analyze_user_activity(data, context, agent):
    """Comprehensive user activity analysis"""
    user_id = data.get("user_id")
 
    # Get user details
    user_result = await agent.call_tool("query_database", {
        "sql": "SELECT username, email, created_at FROM users WHERE id = $1",
        "params": [str(user_id)]
    })
 
    if not user_result["success"] or user_result["row_count"] == 0:
        return {"error": "User not found"}
 
    user = user_result["rows"][0]
 
    # Get order statistics
    orders_result = await agent.call_tool("query_database", {
        "sql": """
            SELECT COUNT(*) as order_count,
                   SUM(total_amount) as total_spent,
                   MAX(order_date) as last_order
            FROM orders
            WHERE user_id = $1
            AND order_date > NOW() - INTERVAL '90 days'
        """,
        "params": [str(user_id)]
    })
 
    stats = orders_result["rows"][0] if orders_result["success"] else {}
 
    # Get favorite categories
    categories_result = await agent.call_tool("query_database", {
        "sql": """
            SELECT p.category, COUNT(*) as purchase_count
            FROM orders o
            JOIN order_items oi ON o.id = oi.order_id
            JOIN products p ON oi.product_id = p.id
            WHERE o.user_id = $1
            GROUP BY p.category
            ORDER BY purchase_count DESC
            LIMIT 3
        """,
        "params": [str(user_id)]
    })
 
    favorite_categories = [
        row["category"] for row in categories_result["rows"]
    ] if categories_result["success"] else []
 
    return {
        "user_info": {
            "username": user["username"],
            "email": user["email"],
            "member_since": user["created_at"]
        },
        "activity_stats": {
            "orders_last_90_days": stats.get("order_count", 0),
            "total_spent": float(stats.get("total_spent", 0)),
            "last_order_date": stats.get("last_order")
        },
        "preferences": {
            "favorite_categories": favorite_categories
        },
        "engagement_level": (
            "high" if stats.get("order_count", 0) > 5
            else "medium" if stats.get("order_count", 0) > 0
            else "low"
        )
    }
 
 
async def main():
    # Create database plugin
    db = postgresql(
        host="localhost",
        database="analytics_db",
        user="analytics_user",
        password="analytics_pass"
    )
 
    # Create agent with custom handler
    agent = Agent(
        name="analytics_agent",
        tools=[db],
        handlers={
            "analyze_user": analyze_user_activity
        }
    )
 
    print(f"Created analytics agent with tools: {agent.tool_names}")
 
    # Use the handler
    print("\nAnalyzing user activity for user_id=42:")
    result = await agent.process("analyze_user", data={"user_id": 42})
 
    analysis = result["result"]
 
    print("\nUser Analysis:")
    print(f"  Username: {analysis['user_info']['username']}")
    print(f"  Email: {analysis['user_info']['email']}")
    print(f"  Member Since: {analysis['user_info']['member_since']}")
    print(f"\nActivity (Last 90 Days):")
    print(f"  Orders: {analysis['activity_stats']['orders_last_90_days']}")
    print(f"  Total Spent: ${analysis['activity_stats']['total_spent']:.2f}")
    print(f"  Last Order: {analysis['activity_stats']['last_order_date']}")
    print(f"\nPreferences:")
    print(f"  Favorite Categories: {', '.join(analysis['preferences']['favorite_categories'])}")
    print(f"\nEngagement Level: {analysis['engagement_level'].upper()}")
 
    await agent.stop()
 
 
if __name__ == "__main__":
    asyncio.run(main())

#Framework Internals

How handlers work:

  1. Registration: Handlers stored in agent's handler registry
  2. Invocation: agent.process(handler_name, data) calls the handler
  3. Context: Agent passes execution context for tracing
  4. Tool Access: Handlers can call any registered tool via agent.call_tool()
  5. Result Wrapping: Results wrapped in standard format by framework

Handler vs LLM Reasoning:

  • Handlers: Deterministic, explicit control flow, fast
  • LLM: Non-deterministic, autonomous decision-making, flexible

Use handlers when you know the exact steps needed!

#Key Takeaways

  1. Handlers orchestrate tools into complex workflows
  2. Multi-step analysis is easy with sequential tool calls
  3. Reusable business logic encapsulated in handler functions
  4. Error handling is critical for production handlers
  5. Multiple handlers create specialized agent capabilities
  6. Faster than LLM reasoning for known workflows

#Next Steps