Agent

The foundational agent class that provides autonomous tool-calling, LLM integration, streaming execution, and automatic tracing. Learn how to create agents, execute tasks, handle streaming events, and manage agent lifecycle.

#Overview

The Agent class is the core agent class in Daita. It uses autonomous tool calling - you provide tools and natural language instructions, and the LLM decides which tools to use and when.

Core Capabilities:

  • Autonomous execution - LLM decides when and how to use tools
  • Streaming - Real-time visibility into agent thinking and actions
  • Simple API - Just run() and run_detailed()
  • Automatic tracing - Zero-config observability for all operations
  • Extensible - Add tools, plugins, focus filters, and relay channels

#Basic Usage

python
from daita import Agent
 
# Create agent with identity
agent = Agent(
    name="Data Quality Validator",
    prompt="You are a data quality expert. Check for completeness, accuracy, and consistency.",
    llm_provider="openai",
    model="gpt-4"
)
 
await agent.start()
 
# Simple execution - just get the answer
answer = await agent.run("Analyze this dataset and identify any quality issues")
print(answer)
 
# Detailed execution - get full metadata
result = await agent.run_detailed("What patterns do you see in the data?")
print(f"Answer: {result['result']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")
print(f"Tools used: {[tc['tool'] for tc in result['tool_calls']]}")
 
await agent.stop()

#Execution Methods

#run() - Simple Execution

Execute a task and get the final answer as a string:

python
answer = await agent.run("What were sales in Q4?")
print(answer)  # "Q4 sales totaled $1.2M, up 15% from Q3..."

Parameters:

  • prompt (str) - Natural language instruction
  • tools (List, optional) - Override available tools for this execution
  • max_iterations (int) - Maximum tool-calling loops (default: 5)
  • on_event (Callable, optional) - Streaming callback for real-time events
  • **kwargs - LLM parameters (temperature, top_p, etc.)

Returns: String with the agent's final answer

#run_detailed() - Execution with Metadata

Get the answer plus detailed execution information:

python
result = await agent.run_detailed("Analyze user signups")
 
print(result["result"])           # Final answer
print(result["processing_time_ms"])  # 1250
print(result["cost"])              # 0.0042
print(result["iterations"])        # 2
print(result["tool_calls"])        # [{"tool": "query_db", "args": {...}}]

Returns: Dictionary with:

  • result - Final answer (string)
  • tool_calls - List of all tool executions
  • iterations - Number of reasoning loops
  • tokens - Token usage breakdown
  • cost - Estimated cost in USD
  • processing_time_ms - Execution duration
  • agent_id, agent_name - Agent identifiers

Both methods support the same parameters, including streaming via on_event.

#Streaming Execution

Monitor agent execution in real-time using the on_event callback parameter. Streaming provides transparency into the agent's thinking process, tool usage, and decision-making.

#Basic Streaming

python
from daita import Agent
from daita.core.streaming import AgentEvent, EventType
 
def handle_event(event: AgentEvent):
    if event.type == EventType.THINKING:
        # Real-time LLM text streaming
        print(event.content, end="", flush=True)
 
    elif event.type == EventType.TOOL_CALL:
        # Tool is being invoked
        print(f"\n{event.tool_name}({event.tool_args})")
 
    elif event.type == EventType.TOOL_RESULT:
        # Tool completed
        print(f"{event.result}")
 
agent = Agent(name="Analyst", llm_provider="openai", model="gpt-4")
await agent.start()
 
# Enable streaming with on_event
answer = await agent.run(
    "Analyze Q4 sales trends",
    on_event=handle_event
)

#Event Types

The AgentEvent object includes the following types:

#ITERATION

New iteration started. Useful for tracking multi-step reasoning.

Fields:

  • iteration (int): Current iteration number
  • max_iterations (int): Maximum allowed iterations
python
if event.type == EventType.ITERATION:
    print(f"Iteration {event.iteration}/{event.max_iterations}")

#THINKING

LLM text chunks streaming in real-time as the model generates its response.

Fields:

  • content (str): Text chunk
python
if event.type == EventType.THINKING:
    print(event.content, end="", flush=True)

#TOOL_CALL

Tool is being invoked by the agent.

Fields:

  • tool_name (str): Name of the tool being called
  • tool_args (dict): Arguments passed to the tool
python
if event.type == EventType.TOOL_CALL:
    print(f"Calling: {event.tool_name}")
    print(f"Args: {event.tool_args}")

#TOOL_RESULT

Tool execution completed.

Fields:

  • tool_name (str): Name of the tool that executed
  • result (Any): Result returned by the tool
python
if event.type == EventType.TOOL_RESULT:
    print(f"Result from {event.tool_name}: {event.result}")

#COMPLETE

Agent execution finished with final answer.

Fields:

  • final_result (str): The agent's final answer
  • iterations (int): Total iterations used
  • token_usage (dict): Token usage statistics
  • cost (float): Estimated cost in USD
python
if event.type == EventType.COMPLETE:
    print(f"\nComplete!")
    print(f"Answer: {event.final_result}")
    print(f"Cost: ${event.cost:.4f}")
    print(f"Tokens: {event.token_usage}")

#ERROR

An error occurred during execution.

Fields:

  • error (str): Error message
python
if event.type == EventType.ERROR:
    print(f"Error: {event.error}")

#Advanced Streaming Examples

#Progress Tracking

python
import time
from daita.core.streaming import AgentEvent, EventType
 
class ProgressTracker:
    def __init__(self):
        self.start_time = time.time()
        self.tool_count = 0
        self.thinking_chars = 0
 
    def handle_event(self, event: AgentEvent):
        if event.type == EventType.ITERATION:
            elapsed = time.time() - self.start_time
            print(f"\n[{elapsed:.1f}s] Iteration {event.iteration}")
 
        elif event.type == EventType.THINKING:
            self.thinking_chars += len(event.content)
            print(event.content, end="", flush=True)
 
        elif event.type == EventType.TOOL_CALL:
            self.tool_count += 1
            print(f"\n[Tool #{self.tool_count}] {event.tool_name}")
 
        elif event.type == EventType.COMPLETE:
            elapsed = time.time() - self.start_time
            print(f"\n\nCompleted in {elapsed:.2f}s")
            print(f"Tools called: {self.tool_count}")
            print(f"Thinking output: {self.thinking_chars} chars")
 
tracker = ProgressTracker()
answer = await agent.run(
    "Complex multi-step task",
    on_event=tracker.handle_event
)

#Custom UI Integration

python
class StreamingUI:
    """Example UI handler for streaming events."""
 
    def __init__(self):
        self.current_section = None
 
    def handle_event(self, event: AgentEvent):
        if event.type == EventType.ITERATION:
            self.current_section = "thinking"
            print(f"\n{'='*70}")
            print(f"Step {event.iteration}/{event.max_iterations}")
            print(f"{'='*70}\n")
 
        elif event.type == EventType.THINKING:
            if self.current_section != "thinking":
                print("\nThinking:")
                self.current_section = "thinking"
            print(event.content, end="", flush=True)
 
        elif event.type == EventType.TOOL_CALL:
            self.current_section = "tool"
            print(f"\n\nUsing tool: {event.tool_name}")
            # Pretty print args
            for key, value in event.tool_args.items():
                print(f"    {key}: {value}")
 
        elif event.type == EventType.TOOL_RESULT:
            result_preview = str(event.result)[:100]
            print(f"    Result: {result_preview}...")
 
        elif event.type == EventType.COMPLETE:
            print(f"\n\n{'='*70}")
            print("COMPLETE")
            print(f"{'='*70}")
            print(f"\n{event.final_result}\n")
            print(f"Cost: ${event.cost:.4f} | Tokens: {event.token_usage.get('total_tokens', 0)}")
 
        elif event.type == EventType.ERROR:
            print(f"\n\nERROR: {event.error}")
 
ui = StreamingUI()
result = await agent.run_detailed(
    "Analyze user data and generate insights",
    on_event=ui.handle_event
)

#Async Event Handling

python
import asyncio
 
async def async_event_handler(event: AgentEvent):
    """Event handler that can perform async operations."""
 
    if event.type == EventType.TOOL_CALL:
        # Log to async database
        await log_to_database(
            tool_name=event.tool_name,
            args=event.tool_args
        )
 
    elif event.type == EventType.COMPLETE:
        # Send async notification
        await send_notification(
            message=f"Agent completed: {event.final_result}",
            cost=event.cost
        )
 
# Use async handler
answer = await agent.run(
    "Process user request",
    on_event=async_event_handler
)

#Streaming with run_detailed()

Combine streaming visibility with detailed metadata:

python
def monitor(event: AgentEvent):
    if event.type == EventType.THINKING:
        print(event.content, end="", flush=True)
    elif event.type == EventType.TOOL_CALL:
        print(f"\n[Tool: {event.tool_name}]")
 
result = await agent.run_detailed(
    "Complex analysis task",
    on_event=monitor  # Get real-time updates
)
 
# Access detailed results after completion
print(f"\nFinal answer: {result['result']}")
print(f"Iterations: {result['iterations']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Tools called: {[tc['tool'] for tc in result['tool_calls']]}")

#Streaming Benefits

  1. Transparency: See exactly what the agent is thinking and doing
  2. User Experience: Provide immediate feedback instead of waiting for completion
  3. Debugging: Understand agent behavior and decision-making in real-time
  4. Monitoring: Track costs, token usage, and performance during execution
  5. Progress Indication: Show users that work is in progress for long-running tasks

#Best Practices for Streaming

1. Keep Handlers Fast

python
# Good - fast, non-blocking
def handle_event(event: AgentEvent):
    if event.type == EventType.THINKING:
        print(event.content, end="", flush=True)
 
# Avoid - slow operations in handler
def slow_handler(event: AgentEvent):
    if event.type == EventType.THINKING:
        time.sleep(1)  # Blocks execution!
        print(event.content)

2. Handle Errors Gracefully

python
def safe_handler(event: AgentEvent):
    try:
        if event.type == EventType.THINKING:
            print(event.content, end="", flush=True)
    except Exception as e:
        # Don't let handler errors crash agent execution
        logger.error(f"Event handler error: {e}")

3. Use Async When Needed

python
async def async_handler(event: AgentEvent):
    if event.type == EventType.COMPLETE:
        # Async operations are fine
        await save_to_database(event.final_result)

4. Buffer Text for UI Updates

python
class BufferedUI:
    def __init__(self):
        self.buffer = []
        self.last_update = time.time()
 
    def handle_event(self, event: AgentEvent):
        if event.type == EventType.THINKING:
            self.buffer.append(event.content)
            # Update UI every 100ms instead of every chunk
            if time.time() - self.last_update > 0.1:
                self.flush_buffer()
 
    def flush_buffer(self):
        if self.buffer:
            print(''.join(self.buffer), end="", flush=True)
            self.buffer = []
            self.last_update = time.time()

#Constructor Parameters

python
Agent(
    name: str,
    llm_provider: Optional[Union[str, LLMProvider]] = None,
    model: Optional[str] = None,
    api_key: Optional[str] = None,
    config: Optional[AgentConfig] = None,
    agent_id: Optional[str] = None,
    prompt: Optional[str] = None,
    focus: Optional[Union[List[str], str, Dict[str, Any]]] = None,
    relay: Optional[str] = None,
    mcp: Optional[Union[Dict, List[Dict]]] = None,
    display_reasoning: bool = False,
    **kwargs  # Can include 'tools' parameter
)

#Core Parameters

  • name (str): Agent name - used in logs, traces, and relay messages (required)
  • config (AgentConfig): Complete agent configuration object (overrides other parameters if provided)
  • agent_id (str): Unique identifier (auto-generated if not provided)

#LLM Configuration

  • llm_provider (str | LLMProvider): Provider name ("openai", "anthropic", "grok", "gemini") or provider instance. Auto-detects from environment if omitted.
  • model (str): Model name (e.g., "gpt-4", "claude-3-sonnet-20240229"). Defaults to "gpt-4" if not specified.
  • api_key (str): API key for LLM provider. Auto-detected from environment variables (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.) if not provided.
  • prompt (str): Agent identity and role description. Defines who the agent is and how it should behave.

#Advanced Options

  • focus (FocusConfig | Dict): Focus configuration for filtering tool results. Reduces token usage by filtering tool outputs before sending to LLM. Supports JSONPath, column selection, XPath, CSS selectors, and regex patterns.
  • relay (str): Relay channel name. When set, all execution results are automatically published to this channel for other agents to consume.
  • mcp (Dict | List[Dict]): MCP server configuration(s) for integrating external tools. Tools from MCP servers are automatically discovered and registered.
  • display_reasoning (bool): Enable console output showing agent decisions and execution flow (useful for debugging).
  • tools (List): List of plugins or AgentTool instances to register (passed via kwargs).
  • **kwargs: Additional configuration passed to AgentConfig (e.g., enable_retry, max_retries, retry_delay)

#Smart Constructor Features

The Agent constructor includes intelligent defaults:

  • Auto-configuration: LLM provider and API keys automatically detected from environment
  • Default model: Uses "gpt-4" if no model specified
  • Auto-generated IDs: Creates unique agent_id if not provided
  • Lazy tool setup: Tools are discovered and registered on first use

#Extending Agents

#Tools

Extend agent capabilities by registering tools. The agent autonomously decides when to use them:

python
from daita.core.tools import tool
 
@tool
async def get_weather(city: str) -> dict:
    """Get current weather for a city."""
    return {"temp": 72, "conditions": "sunny"}
 
agent.register_tool(get_weather)
 
# Agent uses tool when needed
answer = await agent.run("What's the weather in San Francisco?")

See Tools documentation for complete guide on creating and using tools.

#Plugins

Add pre-built integrations for databases, APIs, and services:

python
from daita.plugins import PostgreSQLPlugin
 
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent = Agent(name="DB Agent", tools=[db_plugin])
 
# Agent can now query the database
answer = await agent.run("How many users signed up last week?")

See Plugins documentation for available plugins.

#Relay Channels

Automatically publish results to channels for multi-agent communication:

python
agent = Agent(name="Publisher", relay="results_channel")
 
# Results auto-published to channel after each run
await agent.run("Process this data")

See Workflows documentation for multi-agent patterns.

#Focus Filters

Reduce token usage by filtering tool results before sending to LLM:

python
# Only include specific columns
agent = Agent(focus=["user_id", "email", "created_at"])
 
# Or use JSONPath for complex filtering
agent = Agent(focus={"type": "jsonpath", "path": "$.users[*].email"})

Focus is applied automatically to all tool outputs. See Configuration for all focus types.

#Agent Properties

#Health Status

Get comprehensive agent health information:

python
health = agent.health

Returns:

python
{
    "status": "healthy",
    "uptime": 3600.0,  # Seconds since agent started
    "tools": {
        "count": 12,
        "setup": True,
        "names": ["query_database", "calculate_metrics", ...]
    },
    "relay": {
        "enabled": True,
        "channel": "data_results"
    },
    "llm": {
        "available": True,
        "provider": "openai"
    }
}

#Token Usage

Track LLM token usage for cost monitoring:

python
usage = agent.get_token_usage()

Returns:

python
{
    "total_tokens": 1500,
    "prompt_tokens": 800,
    "completion_tokens": 700,
    "requests": 5
}

Token usage is tracked automatically by the tracing system across all LLM calls.

#Agent Lifecycle

Agents automatically start when created and clean up when stopped.

python
# Manual start/stop
await agent.start()
await agent.stop()
 
# Context manager (automatic cleanup)
async with Agent(name="Temp") as agent:
    result = await agent.run("Quick task")

#Error Handling

Handle exceptions with the framework's error hierarchy.

python
from daita.core.exceptions import AgentError, LLMError, ValidationError
 
agent = Agent(name="My Agent")
 
await agent.start()
 
try:
    result = await agent.run("Analyze this data")
except ValidationError as e:
    # Invalid input data
    print(f"Validation error: {e}")
except LLMError as e:
    # LLM provider issues
    print(f"LLM error: {e}")
except AgentError as e:
    # Agent-specific errors
    print(f"Agent error: {e}")
finally:
    await agent.stop()

#Observability

#Automatic Tracing

Every operation is automatically traced with zero configuration:

python
from daita.core.tracing import get_agent_traces
 
# All operations are traced automatically
await agent.run("Analyze data")
 
# Access trace data
traces = get_agent_traces(agent_id=agent.agent_id)
for trace in traces:
    print(f"{trace.operation_type}: {trace.duration_ms}ms")

Traces include agent executions, tool calls, LLM calls, plugin operations, and errors. See Tracing documentation for complete details.

#Debug Display

Enable console output for development:

python
agent = Agent(name="Debug Agent", display_reasoning=True)
 
# Console shows real-time execution details:
# [Debug Agent] Running: "Analyze this data"
# [Debug Agent] Tool called: query_database
# [Debug Agent] Result: 42 records found

#Configuration Class Methods

#Global Defaults

Configure default settings for all agents:

python
from daita import Agent
 
Agent.configure_defaults(
    llm_provider="anthropic",
    model="claude-3-sonnet-20240229"
)
 
# All agents created after this use these defaults
agent1 = Agent(name="Agent 1")  # Uses Anthropic Claude
agent2 = Agent(name="Agent 2")  # Also uses Anthropic Claude

Useful for setting organization-wide preferences or testing with specific models.

#Best Practices

Agent Design:

  • Define clear prompts that describe the agent's role and expertise
  • Use descriptive agent names (e.g., "DataValidator", "ReportGenerator")
  • Start simple and add complexity incrementally

Tool Management:

  • Keep tools focused - each should do one thing well
  • Write clear docstrings so the LLM knows when to use each tool
  • Test tools independently before registering them
  • Return structured data (dicts/objects) for consistency

Performance:

  • Monitor token usage with agent.get_token_usage()
  • Use focus filters to reduce tokens for large tool results
  • Set appropriate max_iterations based on task complexity
  • Enable display_reasoning=True during development

Production:

  • Use context managers or explicit stop() for cleanup
  • Handle errors gracefully with try-except blocks
  • Check agent.health to monitor status
  • Review traces to debug issues

#Next Steps

  • Tools - Deep dive into creating custom tools
  • Plugins - Pre-built integrations for databases and APIs
  • Workflows - Multi-agent orchestration and relay channels
  • Tracing - Complete observability and monitoring guide
  • Error Handling - Retry logic and reliability patterns