Skip to main content

Agents

Agents are the core building blocks of the Daita framework. SubstrateAgent is the primary agent class with autonomous tool-calling, automatic tracing, and flexible configuration.

Basic Agent​

from daita import SubstrateAgent

# Create agent with identity
agent = SubstrateAgent(
name="Data Analyst",
prompt="You are a data analyst expert. Provide detailed insights with specific metrics.",
llm_provider="openai",
model="gpt-4"
)

await agent.start()

# Simple execution - just get the answer
answer = await agent.run("What were total sales last month?")
print(answer) # "Based on the data, total sales last month were..."

# Detailed execution - get full metadata
result = await agent.run_detailed("Show me top 10 customers")
print(f"Answer: {result['result']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")

Real-Time Streaming​

Get real-time updates during agent execution using the on_event callback. This enables you to see the agent's thinking process, tool calls, and results as they happen:

from daita import SubstrateAgent
from daita.core.streaming import AgentEvent, EventType

# Define event handler
def handle_event(event: AgentEvent):
if event.type == EventType.THINKING:
# Real-time text streaming from LLM
print(event.content, end="", flush=True)

elif event.type == EventType.TOOL_CALL:
# Tool is being called
print(f"\nšŸ”§ Calling {event.tool_name}...")
print(f" Args: {event.tool_args}")

elif event.type == EventType.TOOL_RESULT:
# Tool result received
print(f" āœ… Result: {event.result}")

elif event.type == EventType.COMPLETE:
# Execution finished
print(f"\nāœ… Done! Cost: ${event.cost:.4f}")

agent = SubstrateAgent(
name="Data Analyst",
llm_provider="openai",
model="gpt-4"
)

await agent.start()

# Use streaming with run()
answer = await agent.run(
"Analyze sales data and calculate trends",
on_event=handle_event # Enable streaming
)

# Or with run_detailed()
result = await agent.run_detailed(
"Generate monthly report",
on_event=handle_event
)

Event Types​

The on_event callback receives AgentEvent objects with the following types:

  • ITERATION: New iteration started (includes iteration number and max_iterations)
  • THINKING: LLM text chunks streaming in real-time (content field contains the text)
  • TOOL_CALL: Tool is being invoked (includes tool_name and tool_args)
  • TOOL_RESULT: Tool execution completed (includes tool_name and result)
  • COMPLETE: Final answer ready (includes final_result, cost, tokens, iterations)
  • ERROR: Something went wrong (includes error message)

Streaming Benefits​

Real-time streaming provides several advantages:

  • Transparency: See exactly what the agent is thinking and doing
  • Progress tracking: Monitor long-running operations in real-time
  • User feedback: Provide immediate visual feedback in UIs
  • Debugging: Understand agent behavior during development
  • Cost monitoring: Track token usage and costs as they happen
# Example: Custom UI with streaming
class StreamingUI:
def __init__(self):
self.start_time = None

def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
self.start_time = time.time()
print(f"\n{'='*60}")
print(f"Iteration {event.iteration}/{event.max_iterations}")
print(f"{'='*60}")

elif event.type == EventType.THINKING:
# Stream text in real-time
print(event.content, end="", flush=True)

elif event.type == EventType.TOOL_CALL:
print(f"\n\nšŸ› ļø {event.tool_name}({event.tool_args})")

elif event.type == EventType.TOOL_RESULT:
print(f" → {event.result}")

elif event.type == EventType.COMPLETE:
elapsed = time.time() - self.start_time
print(f"\n\nāœ… Complete in {elapsed:.2f}s")
print(f" Tokens: {event.token_usage.get('total_tokens', 0)}")
print(f" Cost: ${event.cost:.4f}")

ui = StreamingUI()
answer = await agent.run(
"Complex multi-step analysis task",
on_event=ui.handle_event
)

Tool-Based Execution​

SubstrateAgent uses autonomous tool calling - you give the agent tools and a natural language instruction, and the LLM autonomously decides which tools to use and when.

Architecture​

  1. You give the agent tools and a natural language instruction
  2. The LLM autonomously decides which tools to use and when
  3. Tools are executed and results fed back to the LLM
  4. The LLM produces a final answer

This is the modern agent paradigm - autonomous, tool-driven execution.

Extending with Tools​

Tools are the primary way to extend agent capabilities. Use the @tool decorator to convert Python functions into agent tools:

from daita import SubstrateAgent
from daita.core.tools import tool

# Define tools for your agent
@tool
async def query_database(sql: str) -> list:
'''Execute SQL query and return results.'''
# Database query implementation
return await db.execute(sql)

@tool
async def calculate_metrics(data: list) -> dict:
'''Calculate statistical metrics for data.'''
return {
'mean': sum(data) / len(data),
'max': max(data),
'min': min(data)
}

# Create agent with tools
agent = SubstrateAgent(
name="Data Analyst",
model="gpt-4o-mini",
prompt="You are a data analyst. Help users query and analyze data."
)

# Register tools
agent.register_tool(query_database)
agent.register_tool(calculate_metrics)

await agent.start()

# Agent autonomously uses tools to answer questions
answer = await agent.run("What were total sales last month?")
print(answer)
# The agent will autonomously:
# 1. Call query_database with appropriate SQL
# 2. Call calculate_metrics on the results
# 3. Provide a natural language answer

Tool Execution Flow​

Focus System (Data Filtering)​

DAITA's unique focus system filters tool results BEFORE they reach the LLM, reducing token usage and latency:

from daita.config.base import FocusConfig

agent = SubstrateAgent(
name="Sales Analyzer",
focus=FocusConfig(
type="jsonpath",
path="$.sales[*].amount" # Only extract amounts
)
)

# When tools return large data, focus filters it before sending to LLM
# Example: Tool returns 10KB of data -> Focus filters to 1KB
# -> LLM only processes 1KB (90% token reduction!)

Focus types:

  • jsonpath - JSONPath selectors for JSON data
  • column - Column names for tabular data
  • xpath - XPath selectors for XML
  • css - CSS selectors for HTML
  • regex - Regular expression patterns
# Column focus (for DataFrames/tabular data)
agent = SubstrateAgent(
name="DB Analyst",
focus=FocusConfig(
type="column",
columns=["user_id", "name", "email"]
)
)

# JSONPath focus (for JSON responses)
agent = SubstrateAgent(
name="API Analyst",
focus=FocusConfig(
type="jsonpath",
path="$.results[*].id"
)
)

Relay Channels (Agent Communication)​

Agents can publish results to channels for multi-agent workflows:

# Agent publishes to relay channel
agent = SubstrateAgent(
name="Publisher",
llm_provider="openai",
model="gpt-4",
relay="output_channel"
)

# Results automatically published to "output_channel"
result = await agent.run("Analyze this data")

See the Workflows documentation for multi-agent communication patterns.

Retry Configuration​

Configure retry behavior with exponential backoff:

from daita import SubstrateAgent
from daita.config import AgentConfig, RetryPolicy, RetryStrategy

config = AgentConfig(
name="Robust Agent",
enable_retry=True,
retry_policy=RetryPolicy(
max_retries=5,
strategy=RetryStrategy.EXPONENTIAL,
base_delay=1.0,
max_delay=60.0,
jitter=True
)
)

agent = SubstrateAgent(
config=config,
llm_provider="openai",
model="gpt-4"
)

Automatic Tracing​

All operations are automatically traced:

agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")

await agent.start()

# Execute tasks (automatically traced)
await agent.run("Analyze this data")

# Get statistics
stats = agent.get_trace_stats()
# Returns: total_operations, success_rate, avg_latency, total_tokens, etc.

# Get recent operations
operations = agent.get_recent_operations(limit=10)

# Get decision history
decisions = agent.get_recent_decisions(limit=10)

Token Usage Tracking​

Token usage is automatically tracked:

agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")

await agent.start()

await agent.run("Analyze the first dataset")
await agent.run("Analyze the second dataset")

# Get usage stats
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")

Working with Tools​

Creating Tools​

Use the @tool decorator to create tools from Python functions:

from daita.core.tools import tool

@tool
async def calculate_sum(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b

@tool
def get_weather(city: str, units: str = "celsius") -> dict:
"""Get current weather for a city."""
# Weather API call
return {"temp": 22, "condition": "sunny"}

The @tool decorator automatically:

  • Extracts parameter schemas from type hints and docstrings
  • Handles both sync and async functions
  • Converts functions to AgentTool instances

Registering Tools​

Register tools with your agent:

from daita import SubstrateAgent
from daita.core.tools import tool

@tool
async def fetch_data(source: str) -> dict:
"""Fetch data from a source."""
return await api.get(source)

agent = SubstrateAgent(name="Data Agent")

# Register single tool
agent.register_tool(fetch_data)

# Register multiple tools
agent.register_tools([tool1, tool2, tool3])

# List available tools
print(agent.tool_names) # ['fetch_data', 'tool1', 'tool2', 'tool3']
print(agent.available_tools) # List of AgentTool instances

Manual Tool Execution​

Execute tools manually for testing:

# Execute tool directly
result = await agent.call_tool("fetch_data", {"source": "api/users"})
print(result)

MCP Tools (Model Context Protocol)​

Connect agents to external tools and services using MCP servers. MCP tools are automatically discovered and made available to the agent:

from daita import SubstrateAgent

# Configure with MCP server
agent = SubstrateAgent(
name="File Agent",
llm_provider="openai",
model="gpt-4",
mcp={
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
"name": "filesystem"
}
)

await agent.start()

# MCP tools are automatically available - agent uses them autonomously
answer = await agent.run("Read the contents of config.json and summarize it")
# Agent autonomously uses the read_file tool from the MCP server

Multiple MCP Servers​

Connect to multiple MCP servers for broader capabilities:

agent = SubstrateAgent(
name="Multi-Tool Agent",
llm_provider="openai",
model="gpt-4",
mcp=[
{
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
"name": "filesystem"
},
{
"command": "uvx",
"args": ["mcp-server-git", "--repository", "/path/to/repo"],
"name": "git"
}
]
)

await agent.start()

# Agent can autonomously use tools from both MCP servers
answer = await agent.run("Read the README file and show me the latest git commits")

# List all available tools (from MCP servers and registered tools)
print(agent.tool_names)

Plugin Integration​

Plugins can expose their capabilities as tools that agents can use autonomously:

from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin

# Create plugin instance
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")

agent = SubstrateAgent(
name="DB Agent",
llm_provider="openai",
model="gpt-4",
tools=[db_plugin] # Plugin tools automatically registered
)

await agent.start()

# Agent autonomously uses database tools
answer = await agent.run("Get all users who signed up last week")
# Agent uses the query tool exposed by the PostgreSQLPlugin

Adding Plugins Dynamically​

Add plugins after agent creation:

from daita.plugins import PostgreSQLPlugin

agent = SubstrateAgent(name="Dynamic Agent")

# Add plugin - its tools are automatically discovered
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent.add_plugin(db_plugin)

await agent.start()

# Now agent can use database tools
answer = await agent.run("How many users do we have?")

Direct Plugin Access​

Access plugins directly when you need more control:

from daita.core.tools import tool

@tool
async def custom_db_operation(query: str) -> list:
"""Execute a custom database operation."""
async with agent.plugins.postgresql(host="localhost", database="mydb") as db:
results = await db.query(query)
return results

agent.register_tool(custom_db_operation)

Complete Example​

from daita import SubstrateAgent
from daita.core.tools import tool
from daita.config.base import FocusConfig
import asyncio

# Define custom tools
@tool
async def fetch_sales_data(region: str, period: str = "month") -> dict:
"""Fetch sales data for a specific region and time period."""
# Database query implementation
return {
"sales": [100, 200, 150, 300],
"region": region,
"period": period,
"total": 750
}

@tool
async def calculate_growth(current: float, previous: float) -> dict:
"""Calculate growth rate and trend."""
growth_rate = ((current - previous) / previous) * 100
return {
"growth_rate": growth_rate,
"trend": "up" if growth_rate > 0 else "down"
}

# Create agent with tools
agent = SubstrateAgent(
name="Sales Data Analyzer",
prompt="You are a sales analytics expert. Provide actionable insights with specific metrics.",
llm_provider="openai",
model="gpt-4",
focus=FocusConfig(
type="column",
columns=["sales", "total", "region"]
),
relay="analysis_output"
)

# Register tools
agent.register_tool(fetch_sales_data)
agent.register_tool(calculate_growth)

# Factory function for CLI deployment
def create_agent():
return agent

# Run agent
async def main():
await agent.start()

# Agent autonomously uses tools to answer questions
result = await agent.run_detailed(
"Analyze sales performance for the US region last month and compare to previous month"
)

# Access detailed results
print(f"Answer: {result['result']}")
print(f"Tools used: {[tc['tool'] for tc in result['tool_calls']]}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")

# Simple execution
answer = await agent.run("What's the sales trend?")
print(answer)

await agent.stop()

asyncio.run(main())

Configuration Parameters​

ParameterTypeDescription
namestrAgent name (required)
llm_providerstr or LLMProviderLLM provider: "openai", "anthropic", "gemini", "grok" or instance
modelstrModel name: "gpt-4", "claude-3-sonnet-20240229", etc.
api_keystrAPI key (auto-detected from environment if not provided)
configAgentConfigAgentConfig object for advanced configuration
agent_idstrUnique identifier for the agent (auto-generated if not provided)
promptstrAgent identity and role description
focusFocusConfig or DictFocus configuration for filtering tool results (token optimization)
relaystrChannel name to publish results to
mcpDict or List[Dict]MCP server configuration(s) for tool integration
display_reasoningboolShow decision-making in console (default: False)
toolsListList of plugins or AgentTool instances (via kwargs)

CLI Usage​

Create agents using the CLI:

# Create agent from template
daita create agent my_agent

# Test agent locally
daita test my_agent

# Deploy to cloud
daita push

Error Handling​

from daita import SubstrateAgent
from daita.core.exceptions import AgentError, LLMError, ValidationError

agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")

await agent.start()

try:
result = await agent.run("Analyze this data")
except ValidationError as e:
# Invalid input data
print(f"Validation error: {e}")
except LLMError as e:
# LLM provider issues (rate limits, API errors)
print(f"LLM error: {e}")
except AgentError as e:
# Agent-specific errors
print(f"Agent error: {e}")
finally:
await agent.stop()

Resource Cleanup​

Always stop agents when done to clean up resources (especially MCP connections):

agent = SubstrateAgent(name="My Agent", llm_provider="openai", model="gpt-4")

await agent.start()

try:
result = await agent.run("Analyze this data")
finally:
await agent.stop() # Cleanup MCP connections, etc.

# Or use context manager (automatic cleanup)
async with SubstrateAgent(name="Temp Agent") as agent:
result = await agent.run("Quick task")