Framework Overview
The Daita agent framework is a python based agent system for building intelligent, observable, and reliable AI agents. This page provides a comprehensive overview of the framework's architecture, data flow, and key components.
#Core Architecture
The framework is built on four foundational layers that work together to provide a complete agent system: Agent Layer, Communication Layer, Observability Layer, and Integration Layer.
#Agent Layer
The Agent Layer is where your core business logic lives. The Agent class provides a flexible foundation that you customize with tools to accomplish different types of tasks. Think of agents as intelligent workers that can:
- Execute custom business logic through tools
- Process data with focus filters for token efficiency
- Leverage LLM capabilities for autonomous decision-making
- Automatically handle errors with intelligent retry logic
- Work independently or as part of larger workflows
Each agent is autonomous and self-contained, making it easy to develop, test, and deploy them independently.
#Communication Layer
The Communication Layer enables agents to work together in multi-agent systems without tight coupling. This layer provides two key components:
Relay Channels act as message buses where agents publish results and subscribe to data from other agents. This publish/subscribe pattern means agents don't need to know about each other directly—they just publish to channels and subscribe to the channels they care about.
Workflow Orchestration provides structured coordination when you need explicit control over agent connections and lifecycle. Workflows manage multiple agents, define their connections, and handle startup/shutdown as a unit. You can also use Agent Pools for horizontal scaling, running multiple instances of the same agent to handle high volumes.
This layer is what transforms individual agents into coordinated systems that can handle complex, multi-step processes.
#Observability Layer
The Observability Layer provides production-grade monitoring and debugging capabilities with zero configuration required. Every operation is automatically traced:
Automatic Tracing captures every agent operation with timestamps, durations, inputs, and outputs. You get complete visibility into what your agents are doing without writing any instrumentation code.
Decision Tracking records every decision your agents make (like whether to retry an error) along with confidence scores and reasoning. This helps you understand why agents behave the way they do.
Token Usage tracking monitors all LLM calls, counting tokens and estimating costs in real-time. You always know exactly how much your AI operations are costing.
#Integration Layer
The Integration Layer connects your agents to external systems and AI providers. This layer provides three types of integrations:
LLM Providers give your agents access to leading AI models from OpenAI, Anthropic, Google, and xAI. The framework handles authentication, rate limiting, and token tracking automatically—you just specify which model you want to use.
Database Plugins provide pre-built integrations for PostgreSQL, MySQL, MongoDB, Redis, and Elasticsearch. These plugins handle connection management, query execution, and error handling so you can focus on your application logic.
MCP Tools enable your agents to use Model Context Protocol tools, giving LLMs the ability to interact with filesystems, APIs, and other external systems. This extends your agents' capabilities beyond just text generation.
All integrations are automatically traced, so you get visibility into every database query, API call, and LLM interaction.
#Agent: The Foundation
The Agent class is the core agent class that provides a flexible foundation for building custom agents. It's designed as a blank slate that you can customize with tools and integrations while benefiting from automatic tracing and error handling.
#Key Features
- Autonomous Tool Calling: LLM autonomously decides which tools to use and when
- Focus System: Filter tool results before sending to LLM to reduce token usage
- LLM Integration: Built-in support for OpenAI, Anthropic, Google Gemini, and xAI Grok
- Tool Registry: Unified system for custom tools, plugins, and MCP tools
- Relay Publishing: Automatically publish results to communication channels
- Automatic Tracing: Zero-config observability for all operations
#Creating Your First Agent
from daita import Agent
agent = Agent(
name="Data Processor",
prompt="You are a sales data analyst. Help users analyze sales data.",
llm_provider="openai",
model="gpt-4",
)
await agent.start()
result = await agent.run("What were sales in the west region?")#Creating Custom Tools
Tools are Python functions that agents can autonomously call. Use the @tool decorator to convert any function into an agent tool:
from daita.core.tools import tool
@tool
async def validate_user(email: str, name: str, age: int) -> dict:
required_fields = {"email": email, "name": name, "age": age}
missing = [k for k, v in required_fields.items() if not v]
if missing:
return {"valid": False, "missing_fields": missing}
if age < 0 or age > 150:
return {"valid": False, "error": "Invalid age"}
return {"valid": True, "user": required_fields}
# Register tool with agent
agent.register_tool(validate_user)
# Agent autonomously uses tool when needed
result = await agent.run("Validate this user: Alice, alice@example.com, age 30")
# Agent will call validate_user tool and provide natural language response#Tool Execution Flow
When you call agent.run(prompt), the framework follows this execution path:
- LLM Receives Prompt - LLM gets user prompt and available tool descriptions
- Tool Calling Decision - LLM autonomously decides which tools to call (if any)
- Execute Tools - Framework executes requested tools
- Apply Focus (if configured) - Filter tool results to reduce tokens
- Feed Back to LLM - Tool results sent back to LLM for processing
- Generate Answer - LLM produces final natural language answer
- Publish to Relay (if configured) - Send results to other agents
- Return Result - Provide answer with metadata
#Communication: Relay Channels
Relay channels enable agents to communicate and build multi-agent systems. Think of relay channels as message buses that allow agents to publish and subscribe to data without direct coupling.
#Relay Features
- Fire-and-forget: Default mode for simple communication
- Reliable messaging: Optional acknowledgments and retry logic
- Result extraction: Automatically extracts
resultfield from agent responses - Channel isolation: Each channel maintains independent message queues
#Basic Relay Communication
from daita import Agent
from daita.core.relay import RelayManager
# Initialize relay manager
relay = RelayManager()
await relay.start()
# Create subscriber callback
async def process_data(data):
print(f"Received data: {data}")
# Process the data here
# Subscribe to channel
await relay.subscribe("data_channel", process_data)
# Publish data to channel
await relay.publish("data_channel", {"processed": True, "count": 42})
# Subscriber receives: {"processed": True, "count": 42}#Using Global Relay Functions
For simpler usage, use the convenience functions that manage a global relay instance:
from daita.core.relay import publish, subscribe
# Subscribe to channel
async def handle_results(data):
print(f"Got: {data}")
await subscribe("results", handle_results)
# Publish data
await publish("results", {"status": "complete"})#Agent Auto-Publishing
Configure agents to automatically publish their results to a relay channel:
# Agent publishes results to "raw_data" channel automatically
fetcher = Agent(
name="Data Fetcher",
prompt="Fetch and return data from sources.",
relay="raw_data" # Auto-publish to this channel
)
# Another agent subscribes to process the data
async def process_fetched_data(data):
analyzer = Agent(name="Analyzer")
await analyzer.start()
result = await analyzer.run(f"Analyze: {data}")
await subscribe("raw_data", process_fetched_data)
# When fetcher runs, result automatically goes to raw_data channel
await fetcher.run("Fetch sales data from database")#Workflow Orchestration
Workflows connect multiple agents into coordinated systems with explicit connections and lifecycle management.
from daita.core.workflow import Workflow
# Create workflow
workflow = Workflow("Data Pipeline")
workflow.add_agent("fetcher", fetcher)
workflow.add_agent("processor", processor)
workflow.add_agent("analyzer", analyzer)
# Connect agents via relay channels
workflow.connect("fetcher", "raw_data", "processor")
workflow.connect("processor", "processed_data", "analyzer")
await workflow.start()#Agent Pools for Horizontal Scaling
When you need to process high volumes, create agent pools with multiple instances running in parallel:
# Create pool of 5 processor agents
workflow.add_agent_pool("processors", create_processor_func, instances=5)
workflow.connect("fetcher", "raw_data", "processors")
# Messages are distributed across all 5 instances#Error Handling & Reliability
The framework includes sophisticated error handling with automatic classification and retry logic. All agents analyze errors and make intelligent retry decisions with confidence scores.
#Error Classification
| Error Class | Examples | Default Action | Confidence |
|---|---|---|---|
| Transient | Network timeouts, rate limits, service unavailable | Retry with backoff | 90% |
| Retryable | Temporary failures, resource busy | Retry with caution | 70% |
| Permanent | Invalid data, authentication errors, logic errors | Fail immediately | 95% |
#Configuring Retry Behavior
from daita import Agent
from daita.config.base import RetryPolicy, RetryStrategy
# Simple: Enable retry with defaults
agent = Agent(
name="API Client",
prompt="Fetch data from external APIs.",
enable_retry=True # Uses defaults: 3 retries, exponential backoff
)
# Advanced: Custom retry policy
agent = Agent(
name="API Client",
prompt="Fetch data from external APIs.",
enable_retry=True,
retry_policy=RetryPolicy(
max_retries=5,
strategy=RetryStrategy.EXPONENTIAL,
base_delay=1.0, # Start with 1 second
max_delay=60.0, # Cap at 60 seconds
jitter=True # Add randomness to prevent thundering herd
)
)
await agent.start()
# Automatic retry on transient errors with exponential backoff
result = await agent.run("Fetch data from https://api.example.com")Every retry decision is automatically traced with confidence scores and reasoning, viewable in your dashboard or locally via agent.get_decision_stats().
#Focus System
Focus filters tool results before sending them to the LLM, reducing token usage and costs. When tools return large datasets, focus extracts only the relevant portions.
#Focus Examples
from daita.config.base import FocusConfig
# JSONPath focus - extract specific fields from tool results
agent = Agent(
name="Analyzer",
focus=FocusConfig(
type="jsonpath",
path="$.users[*].email" # Only extract emails
)
)
# Column focus - for tabular data
agent = Agent(
name="Analyzer",
focus=FocusConfig(
type="column",
columns=["name", "email", "age"] # Only these columns
)
)
# Tool returns large dataset -> Focus filters -> LLM gets small subset
# This can reduce token usage by 90%+#Automatic Tracing & Observability
Every operation is automatically traced without configuration. All traces are queryable where you can monitor performance, debug issues, and track costs in real-time.
#Trace Types
| Trace Type | Captures | Use Case |
|---|---|---|
| Agent Execution | Task processing, tool calls, results | Monitor agent behavior |
| LLM Calls | Prompts, responses, tokens, costs | Track AI usage and costs |
| Tool Execution | Tool calls, parameters, results | Debug tool behavior |
| Plugin Operations | Database queries, API calls | Debug integrations |
| Decision Points | Retry decisions, confidence scores | Understand agent reasoning |
| Relay Messages | Inter-agent communication | Debug workflows |
#Zero-Config Tracing
Tracing is automatic—no configuration needed.
agent = Agent(
name="My Agent",
prompt="You are a data analyst.",
llm_provider="openai"
)
await agent.start()
result = await agent.run("Analyze this data")
# Access trace data locally
stats = agent.get_trace_stats()
recent = agent.get_recent_operations(limit=10)#Token Usage and Cost Tracking
# Automatic token tracking for all LLM calls
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")For local debugging, enable console decision display with display_reasoning=True to see real-time decision logs in your terminal.
#Integration Architecture
The framework supports multiple LLM providers, databases, and tools through a unified plugin system.
#LLM Providers
# Supports OpenAI, Anthropic, Google Gemini, and xAI Grok
agent = Agent(llm_provider="openai", model="gpt-4")
agent = Agent(llm_provider="anthropic", model="claude-3-sonnet-20240229")
agent = Agent(llm_provider="gemini", model="gemini-pro")#Database Plugins
from daita.plugins import PostgreSQLPlugin
# Add database plugin to agent
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent = Agent(
name="DB Agent",
prompt="You are a database analyst. Help users query and analyze data.",
tools=[db_plugin]
)
await agent.start()
# Agent autonomously uses database tools from plugin
result = await agent.run("Show me all users over age 25")
# Agent will use the PostgreSQL query tool from the plugin#MCP Tool Integration
Model Context Protocol (MCP) enables LLMs to use external tools like filesystems and APIs:
agent = Agent(
name="File Analyst",
prompt="You are a file analyst. Help users read and analyze files.",
llm_provider="openai",
mcp={
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
}
)
await agent.start()
# Agent autonomously uses filesystem tools from MCP server
result = await agent.run("Read /data/report.txt and summarize it")
# Agent will use MCP filesystem tools to read and analyze the file#Key Concepts Summary
| Concept | Description | Key Benefit |
|---|---|---|
| Agent | Foundational agent class with tool system | Flexible, extensible agent behavior |
| Tools | Python functions agents can autonomously call | Easy capability extension |
| Focus | Filter tool results before sending to LLM | Token usage reduction |
| Relay Channels | Message-based agent communication | Decoupled multi-agent systems |
| Workflows | Orchestration of multiple agents | Complex system coordination |
| Agent Pools | Horizontal scaling with multiple instances | High throughput processing |
| Automatic Tracing | Zero-config observability | Production monitoring |
| Retry Logic | Intelligent error handling | Reliability and resilience |
| Tool Registry | Unified plugin and tool management | Extensible capabilities |
| MCP Integration | Model Context Protocol support | External tool integration |
#Next Steps
- Getting Started - Build your first agent
- Agent Guide - Deep dive into the core agent
- Workflows - Build multi-agent systems
- Tracing - Understand observability
- Error Handling - Master reliability features
- Plugins - Integrate databases and APIs