Agents
Agents are the core building blocks of the Daita framework. SubstrateAgent is the primary agent class with autonomous tool-calling, automatic tracing, and flexible configuration.
Basic Agentā
from daita import SubstrateAgent
# Create agent with identity
agent = SubstrateAgent(
name="Data Analyst",
prompt="You are a data analyst expert. Provide detailed insights with specific metrics.",
llm_provider="openai",
model="gpt-4"
)
await agent.start()
# Simple execution - just get the answer
answer = await agent.run("What were total sales last month?")
print(answer) # "Based on the data, total sales last month were..."
# Detailed execution - get full metadata
result = await agent.run_detailed("Show me top 10 customers")
print(f"Answer: {result['result']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")
Real-Time Streamingā
Get real-time updates during agent execution using the on_event callback. This enables you to see the agent's thinking process, tool calls, and results as they happen:
from daita import SubstrateAgent
from daita.core.streaming import AgentEvent, EventType
# Define event handler
def handle_event(event: AgentEvent):
if event.type == EventType.THINKING:
# Real-time text streaming from LLM
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
# Tool is being called
print(f"\nš§ Calling {event.tool_name}...")
print(f" Args: {event.tool_args}")
elif event.type == EventType.TOOL_RESULT:
# Tool result received
print(f" ā
Result: {event.result}")
elif event.type == EventType.COMPLETE:
# Execution finished
print(f"\nā
Done! Cost: ${event.cost:.4f}")
agent = SubstrateAgent(
name="Data Analyst",
llm_provider="openai",
model="gpt-4"
)
await agent.start()
# Use streaming with run()
answer = await agent.run(
"Analyze sales data and calculate trends",
on_event=handle_event # Enable streaming
)
# Or with run_detailed()
result = await agent.run_detailed(
"Generate monthly report",
on_event=handle_event
)
Event Typesā
The on_event callback receives AgentEvent objects with the following types:
ITERATION: New iteration started (includes iteration number and max_iterations)THINKING: LLM text chunks streaming in real-time (content field contains the text)TOOL_CALL: Tool is being invoked (includes tool_name and tool_args)TOOL_RESULT: Tool execution completed (includes tool_name and result)COMPLETE: Final answer ready (includes final_result, cost, tokens, iterations)ERROR: Something went wrong (includes error message)
Streaming Benefitsā
Real-time streaming provides several advantages:
- Transparency: See exactly what the agent is thinking and doing
- Progress tracking: Monitor long-running operations in real-time
- User feedback: Provide immediate visual feedback in UIs
- Debugging: Understand agent behavior during development
- Cost monitoring: Track token usage and costs as they happen
# Example: Custom UI with streaming
class StreamingUI:
def __init__(self):
self.start_time = None
def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
self.start_time = time.time()
print(f"\n{'='*60}")
print(f"Iteration {event.iteration}/{event.max_iterations}")
print(f"{'='*60}")
elif event.type == EventType.THINKING:
# Stream text in real-time
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
print(f"\n\nš ļø {event.tool_name}({event.tool_args})")
elif event.type == EventType.TOOL_RESULT:
print(f" ā {event.result}")
elif event.type == EventType.COMPLETE:
elapsed = time.time() - self.start_time
print(f"\n\nā
Complete in {elapsed:.2f}s")
print(f" Tokens: {event.token_usage.get('total_tokens', 0)}")
print(f" Cost: ${event.cost:.4f}")
ui = StreamingUI()
answer = await agent.run(
"Complex multi-step analysis task",
on_event=ui.handle_event
)
Tool-Based Executionā
SubstrateAgent uses autonomous tool calling - you give the agent tools and a natural language instruction, and the LLM autonomously decides which tools to use and when.
Architectureā
- You give the agent tools and a natural language instruction
- The LLM autonomously decides which tools to use and when
- Tools are executed and results fed back to the LLM
- The LLM produces a final answer
This is the modern agent paradigm - autonomous, tool-driven execution.
Extending with Toolsā
Tools are the primary way to extend agent capabilities. Use the @tool decorator to convert Python functions into agent tools:
from daita import SubstrateAgent
from daita.core.tools import tool
# Define tools for your agent
@tool
async def query_database(sql: str) -> list:
'''Execute SQL query and return results.'''
# Database query implementation
return await db.execute(sql)
@tool
async def calculate_metrics(data: list) -> dict:
'''Calculate statistical metrics for data.'''
return {
'mean': sum(data) / len(data),
'max': max(data),
'min': min(data)
}
# Create agent with tools
agent = SubstrateAgent(
name="Data Analyst",
model="gpt-4o-mini",
prompt="You are a data analyst. Help users query and analyze data."
)
# Register tools
agent.register_tool(query_database)
agent.register_tool(calculate_metrics)
await agent.start()
# Agent autonomously uses tools to answer questions
answer = await agent.run("What were total sales last month?")
print(answer)
# The agent will autonomously:
# 1. Call query_database with appropriate SQL
# 2. Call calculate_metrics on the results
# 3. Provide a natural language answer
Tool Execution Flowā
Focus System (Data Filtering)ā
DAITA's unique focus system filters tool results BEFORE they reach the LLM, reducing token usage and latency:
from daita.config.base import FocusConfig
agent = SubstrateAgent(
name="Sales Analyzer",
focus=FocusConfig(
type="jsonpath",
path="$.sales[*].amount" # Only extract amounts
)
)
# When tools return large data, focus filters it before sending to LLM
# Example: Tool returns 10KB of data -> Focus filters to 1KB
# -> LLM only processes 1KB (90% token reduction!)
Focus types:
jsonpath- JSONPath selectors for JSON datacolumn- Column names for tabular dataxpath- XPath selectors for XMLcss- CSS selectors for HTMLregex- Regular expression patterns
# Column focus (for DataFrames/tabular data)
agent = SubstrateAgent(
name="DB Analyst",
focus=FocusConfig(
type="column",
columns=["user_id", "name", "email"]
)
)
# JSONPath focus (for JSON responses)
agent = SubstrateAgent(
name="API Analyst",
focus=FocusConfig(
type="jsonpath",
path="$.results[*].id"
)
)
Relay Channels (Agent Communication)ā
Agents can publish results to channels for multi-agent workflows:
# Agent publishes to relay channel
agent = SubstrateAgent(
name="Publisher",
llm_provider="openai",
model="gpt-4",
relay="output_channel"
)
# Results automatically published to "output_channel"
result = await agent.run("Analyze this data")
See the Workflows documentation for multi-agent communication patterns.
Retry Configurationā
Configure retry behavior with exponential backoff:
from daita import SubstrateAgent
from daita.config import AgentConfig, RetryPolicy, RetryStrategy
config = AgentConfig(
name="Robust Agent",
enable_retry=True,
retry_policy=RetryPolicy(
max_retries=5,
strategy=RetryStrategy.EXPONENTIAL,
base_delay=1.0,
max_delay=60.0,
jitter=True
)
)
agent = SubstrateAgent(
config=config,
llm_provider="openai",
model="gpt-4"
)
Automatic Tracingā
All operations are automatically traced:
agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")
await agent.start()
# Execute tasks (automatically traced)
await agent.run("Analyze this data")
# Get statistics
stats = agent.get_trace_stats()
# Returns: total_operations, success_rate, avg_latency, total_tokens, etc.
# Get recent operations
operations = agent.get_recent_operations(limit=10)
# Get decision history
decisions = agent.get_recent_decisions(limit=10)
Token Usage Trackingā
Token usage is automatically tracked:
agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")
await agent.start()
await agent.run("Analyze the first dataset")
await agent.run("Analyze the second dataset")
# Get usage stats
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")
Working with Toolsā
Creating Toolsā
Use the @tool decorator to create tools from Python functions:
from daita.core.tools import tool
@tool
async def calculate_sum(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b
@tool
def get_weather(city: str, units: str = "celsius") -> dict:
"""Get current weather for a city."""
# Weather API call
return {"temp": 22, "condition": "sunny"}
The @tool decorator automatically:
- Extracts parameter schemas from type hints and docstrings
- Handles both sync and async functions
- Converts functions to AgentTool instances
Registering Toolsā
Register tools with your agent:
from daita import SubstrateAgent
from daita.core.tools import tool
@tool
async def fetch_data(source: str) -> dict:
"""Fetch data from a source."""
return await api.get(source)
agent = SubstrateAgent(name="Data Agent")
# Register single tool
agent.register_tool(fetch_data)
# Register multiple tools
agent.register_tools([tool1, tool2, tool3])
# List available tools
print(agent.tool_names) # ['fetch_data', 'tool1', 'tool2', 'tool3']
print(agent.available_tools) # List of AgentTool instances
Manual Tool Executionā
Execute tools manually for testing:
# Execute tool directly
result = await agent.call_tool("fetch_data", {"source": "api/users"})
print(result)
MCP Tools (Model Context Protocol)ā
Connect agents to external tools and services using MCP servers. MCP tools are automatically discovered and made available to the agent:
from daita import SubstrateAgent
# Configure with MCP server
agent = SubstrateAgent(
name="File Agent",
llm_provider="openai",
model="gpt-4",
mcp={
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
"name": "filesystem"
}
)
await agent.start()
# MCP tools are automatically available - agent uses them autonomously
answer = await agent.run("Read the contents of config.json and summarize it")
# Agent autonomously uses the read_file tool from the MCP server
Multiple MCP Serversā
Connect to multiple MCP servers for broader capabilities:
agent = SubstrateAgent(
name="Multi-Tool Agent",
llm_provider="openai",
model="gpt-4",
mcp=[
{
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
"name": "filesystem"
},
{
"command": "uvx",
"args": ["mcp-server-git", "--repository", "/path/to/repo"],
"name": "git"
}
]
)
await agent.start()
# Agent can autonomously use tools from both MCP servers
answer = await agent.run("Read the README file and show me the latest git commits")
# List all available tools (from MCP servers and registered tools)
print(agent.tool_names)
Plugin Integrationā
Plugins can expose their capabilities as tools that agents can use autonomously:
from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin
# Create plugin instance
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent = SubstrateAgent(
name="DB Agent",
llm_provider="openai",
model="gpt-4",
tools=[db_plugin] # Plugin tools automatically registered
)
await agent.start()
# Agent autonomously uses database tools
answer = await agent.run("Get all users who signed up last week")
# Agent uses the query tool exposed by the PostgreSQLPlugin
Adding Plugins Dynamicallyā
Add plugins after agent creation:
from daita.plugins import PostgreSQLPlugin
agent = SubstrateAgent(name="Dynamic Agent")
# Add plugin - its tools are automatically discovered
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent.add_plugin(db_plugin)
await agent.start()
# Now agent can use database tools
answer = await agent.run("How many users do we have?")
Direct Plugin Accessā
Access plugins directly when you need more control:
from daita.core.tools import tool
@tool
async def custom_db_operation(query: str) -> list:
"""Execute a custom database operation."""
async with agent.plugins.postgresql(host="localhost", database="mydb") as db:
results = await db.query(query)
return results
agent.register_tool(custom_db_operation)
Complete Exampleā
from daita import SubstrateAgent
from daita.core.tools import tool
from daita.config.base import FocusConfig
import asyncio
# Define custom tools
@tool
async def fetch_sales_data(region: str, period: str = "month") -> dict:
"""Fetch sales data for a specific region and time period."""
# Database query implementation
return {
"sales": [100, 200, 150, 300],
"region": region,
"period": period,
"total": 750
}
@tool
async def calculate_growth(current: float, previous: float) -> dict:
"""Calculate growth rate and trend."""
growth_rate = ((current - previous) / previous) * 100
return {
"growth_rate": growth_rate,
"trend": "up" if growth_rate > 0 else "down"
}
# Create agent with tools
agent = SubstrateAgent(
name="Sales Data Analyzer",
prompt="You are a sales analytics expert. Provide actionable insights with specific metrics.",
llm_provider="openai",
model="gpt-4",
focus=FocusConfig(
type="column",
columns=["sales", "total", "region"]
),
relay="analysis_output"
)
# Register tools
agent.register_tool(fetch_sales_data)
agent.register_tool(calculate_growth)
# Factory function for CLI deployment
def create_agent():
return agent
# Run agent
async def main():
await agent.start()
# Agent autonomously uses tools to answer questions
result = await agent.run_detailed(
"Analyze sales performance for the US region last month and compare to previous month"
)
# Access detailed results
print(f"Answer: {result['result']}")
print(f"Tools used: {[tc['tool'] for tc in result['tool_calls']]}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")
# Simple execution
answer = await agent.run("What's the sales trend?")
print(answer)
await agent.stop()
asyncio.run(main())
Configuration Parametersā
| Parameter | Type | Description |
|---|---|---|
name | str | Agent name (required) |
llm_provider | str or LLMProvider | LLM provider: "openai", "anthropic", "gemini", "grok" or instance |
model | str | Model name: "gpt-4", "claude-3-sonnet-20240229", etc. |
api_key | str | API key (auto-detected from environment if not provided) |
config | AgentConfig | AgentConfig object for advanced configuration |
agent_id | str | Unique identifier for the agent (auto-generated if not provided) |
prompt | str | Agent identity and role description |
focus | FocusConfig or Dict | Focus configuration for filtering tool results (token optimization) |
relay | str | Channel name to publish results to |
mcp | Dict or List[Dict] | MCP server configuration(s) for tool integration |
display_reasoning | bool | Show decision-making in console (default: False) |
tools | List | List of plugins or AgentTool instances (via kwargs) |
CLI Usageā
Create agents using the CLI:
# Create agent from template
daita create agent my_agent
# Test agent locally
daita test my_agent
# Deploy to cloud
daita push
Error Handlingā
from daita import SubstrateAgent
from daita.core.exceptions import AgentError, LLMError, ValidationError
agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")
await agent.start()
try:
result = await agent.run("Analyze this data")
except ValidationError as e:
# Invalid input data
print(f"Validation error: {e}")
except LLMError as e:
# LLM provider issues (rate limits, API errors)
print(f"LLM error: {e}")
except AgentError as e:
# Agent-specific errors
print(f"Agent error: {e}")
finally:
await agent.stop()
Resource Cleanupā
Always stop agents when done to clean up resources (especially MCP connections):
agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")
await agent.start()
try:
result = await agent.run("Analyze this data")
finally:
await agent.stop() # Cleanup MCP connections, etc.
# Or use context manager (automatic cleanup)
async with SubstrateAgent(name="Temp Agent") as agent:
result = await agent.run("Quick task")
Related Documentationā
- Getting Started - Quick start tutorial
- Workflows - Multi-agent orchestration
- Tracing - Observability and monitoring
- LLM Providers - Configure different LLM providers
- Plugins - Database and API integrations