Agent
The foundational agent class that provides autonomous tool-calling, LLM integration, streaming execution, and automatic tracing. Learn how to create agents, execute tasks, handle streaming events, and manage agent lifecycle.
#Overview
The Agent class is the core agent class in Daita. It uses autonomous tool calling - you provide tools and natural language instructions, and the LLM decides which tools to use and when.
Core Capabilities:
- Autonomous execution - LLM decides when and how to use tools
- Streaming - Real-time visibility into agent thinking and actions
- Simple API - Just
run()andrun_detailed() - Automatic tracing - Zero-config observability for all operations
- Extensible - Add tools, plugins, focus filters, and relay channels
#Basic Usage
from daita import Agent
# Create agent with identity
agent = Agent(
name="Data Quality Validator",
prompt="You are a data quality expert. Check for completeness, accuracy, and consistency.",
llm_provider="openai",
model="gpt-4"
)
await agent.start()
# Simple execution - just get the answer
answer = await agent.run("Analyze this dataset and identify any quality issues")
print(answer)
# Detailed execution - get full metadata
result = await agent.run_detailed("What patterns do you see in the data?")
print(f"Answer: {result['result']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")
print(f"Tools used: {[tc['tool'] for tc in result['tool_calls']]}")
await agent.stop()#Execution Methods
#run() - Simple Execution
Execute a task and get the final answer as a string:
answer = await agent.run("What were sales in Q4?")
print(answer) # "Q4 sales totaled $1.2M, up 15% from Q3..."Parameters:
prompt(str) - Natural language instructiontools(List, optional) - Override available tools for this executionmax_iterations(int) - Maximum tool-calling loops (default: 5)on_event(Callable, optional) - Streaming callback for real-time events**kwargs- LLM parameters (temperature, top_p, etc.)
Returns: String with the agent's final answer
#run_detailed() - Execution with Metadata
Get the answer plus detailed execution information:
result = await agent.run_detailed("Analyze user signups")
print(result["result"]) # Final answer
print(result["processing_time_ms"]) # 1250
print(result["cost"]) # 0.0042
print(result["iterations"]) # 2
print(result["tool_calls"]) # [{"tool": "query_db", "args": {...}}]Returns: Dictionary with:
result- Final answer (string)tool_calls- List of all tool executionsiterations- Number of reasoning loopstokens- Token usage breakdowncost- Estimated cost in USDprocessing_time_ms- Execution durationagent_id,agent_name- Agent identifiers
Both methods support the same parameters, including streaming via on_event.
#Streaming Execution
Monitor agent execution in real-time using the on_event callback parameter. Streaming provides transparency into the agent's thinking process, tool usage, and decision-making.
#Basic Streaming
from daita import Agent
from daita.core.streaming import AgentEvent, EventType
def handle_event(event: AgentEvent):
if event.type == EventType.THINKING:
# Real-time LLM text streaming
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
# Tool is being invoked
print(f"\n{event.tool_name}({event.tool_args})")
elif event.type == EventType.TOOL_RESULT:
# Tool completed
print(f"{event.result}")
agent = Agent(name="Analyst", llm_provider="openai", model="gpt-4")
await agent.start()
# Enable streaming with on_event
answer = await agent.run(
"Analyze Q4 sales trends",
on_event=handle_event
)#Event Types
The AgentEvent object includes the following types:
#ITERATION
New iteration started. Useful for tracking multi-step reasoning.
Fields:
iteration(int): Current iteration numbermax_iterations(int): Maximum allowed iterations
if event.type == EventType.ITERATION:
print(f"Iteration {event.iteration}/{event.max_iterations}")#THINKING
LLM text chunks streaming in real-time as the model generates its response.
Fields:
content(str): Text chunk
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)#TOOL_CALL
Tool is being invoked by the agent.
Fields:
tool_name(str): Name of the tool being calledtool_args(dict): Arguments passed to the tool
if event.type == EventType.TOOL_CALL:
print(f"Calling: {event.tool_name}")
print(f"Args: {event.tool_args}")#TOOL_RESULT
Tool execution completed.
Fields:
tool_name(str): Name of the tool that executedresult(Any): Result returned by the tool
if event.type == EventType.TOOL_RESULT:
print(f"Result from {event.tool_name}: {event.result}")#COMPLETE
Agent execution finished with final answer.
Fields:
final_result(str): The agent's final answeriterations(int): Total iterations usedtoken_usage(dict): Token usage statisticscost(float): Estimated cost in USD
if event.type == EventType.COMPLETE:
print(f"\nComplete!")
print(f"Answer: {event.final_result}")
print(f"Cost: ${event.cost:.4f}")
print(f"Tokens: {event.token_usage}")#ERROR
An error occurred during execution.
Fields:
error(str): Error message
if event.type == EventType.ERROR:
print(f"Error: {event.error}")#Advanced Streaming Examples
#Progress Tracking
import time
from daita.core.streaming import AgentEvent, EventType
class ProgressTracker:
def __init__(self):
self.start_time = time.time()
self.tool_count = 0
self.thinking_chars = 0
def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
elapsed = time.time() - self.start_time
print(f"\n[{elapsed:.1f}s] Iteration {event.iteration}")
elif event.type == EventType.THINKING:
self.thinking_chars += len(event.content)
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
self.tool_count += 1
print(f"\n[Tool #{self.tool_count}] {event.tool_name}")
elif event.type == EventType.COMPLETE:
elapsed = time.time() - self.start_time
print(f"\n\nCompleted in {elapsed:.2f}s")
print(f"Tools called: {self.tool_count}")
print(f"Thinking output: {self.thinking_chars} chars")
tracker = ProgressTracker()
answer = await agent.run(
"Complex multi-step task",
on_event=tracker.handle_event
)#Custom UI Integration
class StreamingUI:
"""Example UI handler for streaming events."""
def __init__(self):
self.current_section = None
def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
self.current_section = "thinking"
print(f"\n{'='*70}")
print(f"Step {event.iteration}/{event.max_iterations}")
print(f"{'='*70}\n")
elif event.type == EventType.THINKING:
if self.current_section != "thinking":
print("\nThinking:")
self.current_section = "thinking"
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
self.current_section = "tool"
print(f"\n\nUsing tool: {event.tool_name}")
# Pretty print args
for key, value in event.tool_args.items():
print(f" {key}: {value}")
elif event.type == EventType.TOOL_RESULT:
result_preview = str(event.result)[:100]
print(f" Result: {result_preview}...")
elif event.type == EventType.COMPLETE:
print(f"\n\n{'='*70}")
print("COMPLETE")
print(f"{'='*70}")
print(f"\n{event.final_result}\n")
print(f"Cost: ${event.cost:.4f} | Tokens: {event.token_usage.get('total_tokens', 0)}")
elif event.type == EventType.ERROR:
print(f"\n\nERROR: {event.error}")
ui = StreamingUI()
result = await agent.run_detailed(
"Analyze user data and generate insights",
on_event=ui.handle_event
)#Async Event Handling
import asyncio
async def async_event_handler(event: AgentEvent):
"""Event handler that can perform async operations."""
if event.type == EventType.TOOL_CALL:
# Log to async database
await log_to_database(
tool_name=event.tool_name,
args=event.tool_args
)
elif event.type == EventType.COMPLETE:
# Send async notification
await send_notification(
message=f"Agent completed: {event.final_result}",
cost=event.cost
)
# Use async handler
answer = await agent.run(
"Process user request",
on_event=async_event_handler
)#Streaming with run_detailed()
Combine streaming visibility with detailed metadata:
def monitor(event: AgentEvent):
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
print(f"\n[Tool: {event.tool_name}]")
result = await agent.run_detailed(
"Complex analysis task",
on_event=monitor # Get real-time updates
)
# Access detailed results after completion
print(f"\nFinal answer: {result['result']}")
print(f"Iterations: {result['iterations']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Tools called: {[tc['tool'] for tc in result['tool_calls']]}")#Streaming Benefits
- Transparency: See exactly what the agent is thinking and doing
- User Experience: Provide immediate feedback instead of waiting for completion
- Debugging: Understand agent behavior and decision-making in real-time
- Monitoring: Track costs, token usage, and performance during execution
- Progress Indication: Show users that work is in progress for long-running tasks
#Best Practices for Streaming
1. Keep Handlers Fast
# Good - fast, non-blocking
def handle_event(event: AgentEvent):
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
# Avoid - slow operations in handler
def slow_handler(event: AgentEvent):
if event.type == EventType.THINKING:
time.sleep(1) # Blocks execution!
print(event.content)2. Handle Errors Gracefully
def safe_handler(event: AgentEvent):
try:
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
except Exception as e:
# Don't let handler errors crash agent execution
logger.error(f"Event handler error: {e}")3. Use Async When Needed
async def async_handler(event: AgentEvent):
if event.type == EventType.COMPLETE:
# Async operations are fine
await save_to_database(event.final_result)4. Buffer Text for UI Updates
class BufferedUI:
def __init__(self):
self.buffer = []
self.last_update = time.time()
def handle_event(self, event: AgentEvent):
if event.type == EventType.THINKING:
self.buffer.append(event.content)
# Update UI every 100ms instead of every chunk
if time.time() - self.last_update > 0.1:
self.flush_buffer()
def flush_buffer(self):
if self.buffer:
print(''.join(self.buffer), end="", flush=True)
self.buffer = []
self.last_update = time.time()#Constructor Parameters
Agent(
name: str,
llm_provider: Optional[Union[str, LLMProvider]] = None,
model: Optional[str] = None,
api_key: Optional[str] = None,
config: Optional[AgentConfig] = None,
agent_id: Optional[str] = None,
prompt: Optional[str] = None,
focus: Optional[Union[List[str], str, Dict[str, Any]]] = None,
relay: Optional[str] = None,
mcp: Optional[Union[Dict, List[Dict]]] = None,
display_reasoning: bool = False,
**kwargs # Can include 'tools' parameter
)#Core Parameters
name(str): Agent name - used in logs, traces, and relay messages (required)config(AgentConfig): Complete agent configuration object (overrides other parameters if provided)agent_id(str): Unique identifier (auto-generated if not provided)
#LLM Configuration
llm_provider(str | LLMProvider): Provider name ("openai", "anthropic", "grok", "gemini") or provider instance. Auto-detects from environment if omitted.model(str): Model name (e.g., "gpt-4", "claude-3-sonnet-20240229"). Defaults to "gpt-4" if not specified.api_key(str): API key for LLM provider. Auto-detected from environment variables (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.) if not provided.prompt(str): Agent identity and role description. Defines who the agent is and how it should behave.
#Advanced Options
focus(FocusConfig | Dict): Focus configuration for filtering tool results. Reduces token usage by filtering tool outputs before sending to LLM. Supports JSONPath, column selection, XPath, CSS selectors, and regex patterns.relay(str): Relay channel name. When set, all execution results are automatically published to this channel for other agents to consume.mcp(Dict | List[Dict]): MCP server configuration(s) for integrating external tools. Tools from MCP servers are automatically discovered and registered.display_reasoning(bool): Enable console output showing agent decisions and execution flow (useful for debugging).tools(List): List of plugins or AgentTool instances to register (passed via kwargs).**kwargs: Additional configuration passed to AgentConfig (e.g.,enable_retry,max_retries,retry_delay)
#Smart Constructor Features
The Agent constructor includes intelligent defaults:
- Auto-configuration: LLM provider and API keys automatically detected from environment
- Default model: Uses "gpt-4" if no model specified
- Auto-generated IDs: Creates unique agent_id if not provided
- Lazy tool setup: Tools are discovered and registered on first use
#Extending Agents
#Tools
Extend agent capabilities by registering tools. The agent autonomously decides when to use them:
from daita.core.tools import tool
@tool
async def get_weather(city: str) -> dict:
"""Get current weather for a city."""
return {"temp": 72, "conditions": "sunny"}
agent.register_tool(get_weather)
# Agent uses tool when needed
answer = await agent.run("What's the weather in San Francisco?")See Tools documentation for complete guide on creating and using tools.
#Plugins
Add pre-built integrations for databases, APIs, and services:
from daita.plugins import PostgreSQLPlugin
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent = Agent(name="DB Agent", tools=[db_plugin])
# Agent can now query the database
answer = await agent.run("How many users signed up last week?")See Plugins documentation for available plugins.
#Relay Channels
Automatically publish results to channels for multi-agent communication:
agent = Agent(name="Publisher", relay="results_channel")
# Results auto-published to channel after each run
await agent.run("Process this data")See Workflows documentation for multi-agent patterns.
#Focus Filters
Reduce token usage by filtering tool results before sending to LLM:
# Only include specific columns
agent = Agent(focus=["user_id", "email", "created_at"])
# Or use JSONPath for complex filtering
agent = Agent(focus={"type": "jsonpath", "path": "$.users[*].email"})Focus is applied automatically to all tool outputs. See Configuration for all focus types.
#Agent Properties
#Health Status
Get comprehensive agent health information:
health = agent.healthReturns:
{
"status": "healthy",
"uptime": 3600.0, # Seconds since agent started
"tools": {
"count": 12,
"setup": True,
"names": ["query_database", "calculate_metrics", ...]
},
"relay": {
"enabled": True,
"channel": "data_results"
},
"llm": {
"available": True,
"provider": "openai"
}
}#Token Usage
Track LLM token usage for cost monitoring:
usage = agent.get_token_usage()Returns:
{
"total_tokens": 1500,
"prompt_tokens": 800,
"completion_tokens": 700,
"requests": 5
}Token usage is tracked automatically by the tracing system across all LLM calls.
#Agent Lifecycle
Agents automatically start when created and clean up when stopped.
# Manual start/stop
await agent.start()
await agent.stop()
# Context manager (automatic cleanup)
async with Agent(name="Temp") as agent:
result = await agent.run("Quick task")#Error Handling
Handle exceptions with the framework's error hierarchy.
from daita.core.exceptions import AgentError, LLMError, ValidationError
agent = Agent(name="My Agent")
await agent.start()
try:
result = await agent.run("Analyze this data")
except ValidationError as e:
# Invalid input data
print(f"Validation error: {e}")
except LLMError as e:
# LLM provider issues
print(f"LLM error: {e}")
except AgentError as e:
# Agent-specific errors
print(f"Agent error: {e}")
finally:
await agent.stop()#Observability
#Automatic Tracing
Every operation is automatically traced with zero configuration:
from daita.core.tracing import get_agent_traces
# All operations are traced automatically
await agent.run("Analyze data")
# Access trace data
traces = get_agent_traces(agent_id=agent.agent_id)
for trace in traces:
print(f"{trace.operation_type}: {trace.duration_ms}ms")Traces include agent executions, tool calls, LLM calls, plugin operations, and errors. See Tracing documentation for complete details.
#Debug Display
Enable console output for development:
agent = Agent(name="Debug Agent", display_reasoning=True)
# Console shows real-time execution details:
# [Debug Agent] Running: "Analyze this data"
# [Debug Agent] Tool called: query_database
# [Debug Agent] Result: 42 records found#Configuration Class Methods
#Global Defaults
Configure default settings for all agents:
from daita import Agent
Agent.configure_defaults(
llm_provider="anthropic",
model="claude-3-sonnet-20240229"
)
# All agents created after this use these defaults
agent1 = Agent(name="Agent 1") # Uses Anthropic Claude
agent2 = Agent(name="Agent 2") # Also uses Anthropic ClaudeUseful for setting organization-wide preferences or testing with specific models.
#Best Practices
Agent Design:
- Define clear prompts that describe the agent's role and expertise
- Use descriptive agent names (e.g., "DataValidator", "ReportGenerator")
- Start simple and add complexity incrementally
Tool Management:
- Keep tools focused - each should do one thing well
- Write clear docstrings so the LLM knows when to use each tool
- Test tools independently before registering them
- Return structured data (dicts/objects) for consistency
Performance:
- Monitor token usage with
agent.get_token_usage() - Use focus filters to reduce tokens for large tool results
- Set appropriate
max_iterationsbased on task complexity - Enable
display_reasoning=Trueduring development
Production:
- Use context managers or explicit
stop()for cleanup - Handle errors gracefully with try-except blocks
- Check
agent.healthto monitor status - Review traces to debug issues
#Next Steps
- Tools - Deep dive into creating custom tools
- Plugins - Pre-built integrations for databases and APIs
- Workflows - Multi-agent orchestration and relay channels
- Tracing - Complete observability and monitoring guide
- Error Handling - Retry logic and reliability patterns