Substrate Agent
The SubstrateAgent is the foundational agent in Daita that provides autonomous tool-calling capabilities. It handles LLM integration, automatic tracing, retry logic, and relay communication while giving you complete flexibility through custom tools.
Overview
SubstrateAgent uses autonomous tool calling - you give the agent tools and natural language instructions, and the LLM autonomously decides which tools to use and when to accomplish the task.
Key Features:
- Autonomous tool-calling with LLM-driven decision making
- Tool-based extension system (
@tooldecorator) - Automatic LLM provider configuration from environment variables
- Zero-configuration automatic tracing of all operations
- Focus system for filtering tool results (token optimization)
- MCP server integration for external tools
- Relay system for inter-agent communication
- Simple API:
run()andrun_detailed()
Basic Usage
from daita import SubstrateAgent
# Create agent with identity
agent = SubstrateAgent(
name="Data Quality Validator",
prompt="You are a data quality expert. Check for completeness, accuracy, and consistency.",
llm_provider="openai",
model="gpt-4"
)
await agent.start()
# Simple execution - just get the answer
answer = await agent.run("Analyze this dataset and identify any quality issues")
print(answer)
# Detailed execution - get full metadata
result = await agent.run_detailed("What patterns do you see in the data?")
print(f"Answer: {result['result']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Cost: ${result['cost']}")
print(f"Tools used: {[tc['tool'] for tc in result['tool_calls']]}")
await agent.stop()
Core Concepts
The run() Method
The run() method is the primary way to execute tasks with a SubstrateAgent:
answer = await agent.run(
prompt: str, # Natural language instruction
tools: Optional[List] = None, # Specific tools to use (optional)
max_iterations: int = 5, # Max tool calling iterations
on_event: Optional[Callable] = None, # Real-time streaming callback
**kwargs # LLM parameters (temperature, etc.)
)
Returns a string with the agent's final answer.
Streaming: When on_event is provided, you receive real-time callbacks with execution events (thinking, tool calls, results, completion).
Execution Flow:
- LLM receives the prompt and available tools
- LLM autonomously decides which tools to call (if any)
- Tools are executed and results are fed back to the LLM
- Focus is applied to tool results if configured (token filtering)
- Process repeats until LLM has enough information
- LLM generates final natural language answer
- Result is published to relay channel if configured
The run_detailed() Method
Like run() but returns detailed execution metadata:
result = await agent.run_detailed(
prompt: str,
tools: Optional[List] = None,
max_iterations: int = 5,
on_event: Optional[Callable] = None, # Real-time streaming callback
**kwargs
)
Returns:
{
"result": str, # Final answer
"tool_calls": [...], # All tools called
"iterations": int, # Number of iterations
"tokens": {...}, # Token usage
"cost": float, # Estimated cost
"processing_time_ms": float, # Execution time
"agent_id": str, # Agent identifier
"agent_name": str # Agent name
}
Streaming: Works identically to run() - provide on_event callback for real-time updates while still receiving full metadata in the return value.
Tools
Tools are the primary way to extend agent capabilities. Create tools using the @tool decorator:
from daita.core.tools import tool
@tool
async def fetch_user_data(user_id: int) -> dict:
"""Fetch user data from database."""
return await db.query("SELECT * FROM users WHERE id = $1", [user_id])
agent.register_tool(fetch_user_data)
The agent autonomously decides when and how to use registered tools based on the natural language prompt.
See the Tools documentation for more details on creating and using tools.
Streaming Execution
Monitor agent execution in real-time using the on_event callback parameter. Streaming provides transparency into the agent's thinking process, tool usage, and decision-making.
Basic Streaming
from daita import SubstrateAgent
from daita.core.streaming import AgentEvent, EventType
def handle_event(event: AgentEvent):
if event.type == EventType.THINKING:
# Real-time LLM text streaming
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
# Tool is being invoked
print(f"\n🔧 {event.tool_name}({event.tool_args})")
elif event.type == EventType.TOOL_RESULT:
# Tool completed
print(f" ✅ {event.result}")
agent = SubstrateAgent(name="Analyst", llm_provider="openai", model="gpt-4")
await agent.start()
# Enable streaming with on_event
answer = await agent.run(
"Analyze Q4 sales trends",
on_event=handle_event
)
Event Types
The AgentEvent object includes the following types:
ITERATION
New iteration started. Useful for tracking multi-step reasoning.
Fields:
iteration(int): Current iteration numbermax_iterations(int): Maximum allowed iterations
if event.type == EventType.ITERATION:
print(f"Iteration {event.iteration}/{event.max_iterations}")
THINKING
LLM text chunks streaming in real-time as the model generates its response.
Fields:
content(str): Text chunk
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
TOOL_CALL
Tool is being invoked by the agent.
Fields:
tool_name(str): Name of the tool being calledtool_args(dict): Arguments passed to the tool
if event.type == EventType.TOOL_CALL:
print(f"Calling: {event.tool_name}")
print(f"Args: {event.tool_args}")
TOOL_RESULT
Tool execution completed.
Fields:
tool_name(str): Name of the tool that executedresult(Any): Result returned by the tool
if event.type == EventType.TOOL_RESULT:
print(f"Result from {event.tool_name}: {event.result}")
COMPLETE
Agent execution finished with final answer.
Fields:
final_result(str): The agent's final answeriterations(int): Total iterations usedtoken_usage(dict): Token usage statisticscost(float): Estimated cost in USD
if event.type == EventType.COMPLETE:
print(f"\n✅ Complete!")
print(f"Answer: {event.final_result}")
print(f"Cost: ${event.cost:.4f}")
print(f"Tokens: {event.token_usage}")
ERROR
An error occurred during execution.
Fields:
error(str): Error message
if event.type == EventType.ERROR:
print(f"❌ Error: {event.error}")
Advanced Streaming Examples
Progress Tracking
import time
from daita.core.streaming import AgentEvent, EventType
class ProgressTracker:
def __init__(self):
self.start_time = time.time()
self.tool_count = 0
self.thinking_chars = 0
def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
elapsed = time.time() - self.start_time
print(f"\n[{elapsed:.1f}s] Iteration {event.iteration}")
elif event.type == EventType.THINKING:
self.thinking_chars += len(event.content)
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
self.tool_count += 1
print(f"\n[Tool #{self.tool_count}] {event.tool_name}")
elif event.type == EventType.COMPLETE:
elapsed = time.time() - self.start_time
print(f"\n\nCompleted in {elapsed:.2f}s")
print(f"Tools called: {self.tool_count}")
print(f"Thinking output: {self.thinking_chars} chars")
tracker = ProgressTracker()
answer = await agent.run(
"Complex multi-step task",
on_event=tracker.handle_event
)
Custom UI Integration
class StreamingUI:
"""Example UI handler for streaming events."""
def __init__(self):
self.current_section = None
def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
self.current_section = "thinking"
print(f"\n{'='*70}")
print(f"Step {event.iteration}/{event.max_iterations}")
print(f"{'='*70}\n")
elif event.type == EventType.THINKING:
if self.current_section != "thinking":
print("\n💭 Thinking:")
self.current_section = "thinking"
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
self.current_section = "tool"
print(f"\n\n🛠️ Using tool: {event.tool_name}")
# Pretty print args
for key, value in event.tool_args.items():
print(f" {key}: {value}")
elif event.type == EventType.TOOL_RESULT:
result_preview = str(event.result)[:100]
print(f" ✅ Result: {result_preview}...")
elif event.type == EventType.COMPLETE:
print(f"\n\n{'='*70}")
print("✅ COMPLETE")
print(f"{'='*70}")
print(f"\n{event.final_result}\n")
print(f"Cost: ${event.cost:.4f} | Tokens: {event.token_usage.get('total_tokens', 0)}")
elif event.type == EventType.ERROR:
print(f"\n\n❌ ERROR: {event.error}")
ui = StreamingUI()
result = await agent.run_detailed(
"Analyze user data and generate insights",
on_event=ui.handle_event
)
Async Event Handling
import asyncio
async def async_event_handler(event: AgentEvent):
"""Event handler that can perform async operations."""
if event.type == EventType.TOOL_CALL:
# Log to async database
await log_to_database(
tool_name=event.tool_name,
args=event.tool_args
)
elif event.type == EventType.COMPLETE:
# Send async notification
await send_notification(
message=f"Agent completed: {event.final_result}",
cost=event.cost
)
# Use async handler
answer = await agent.run(
"Process user request",
on_event=async_event_handler
)
Streaming with run_detailed()
Combine streaming visibility with detailed metadata:
def monitor(event: AgentEvent):
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
print(f"\n[Tool: {event.tool_name}]")
result = await agent.run_detailed(
"Complex analysis task",
on_event=monitor # Get real-time updates
)
# Access detailed results after completion
print(f"\nFinal answer: {result['result']}")
print(f"Iterations: {result['iterations']}")
print(f"Time: {result['processing_time_ms']}ms")
print(f"Tools called: {[tc['tool'] for tc in result['tool_calls']]}")
Streaming Benefits
- Transparency: See exactly what the agent is thinking and doing
- User Experience: Provide immediate feedback instead of waiting for completion
- Debugging: Understand agent behavior and decision-making in real-time
- Monitoring: Track costs, token usage, and performance during execution
- Progress Indication: Show users that work is in progress for long-running tasks
Best Practices for Streaming
1. Keep Handlers Fast
# Good - fast, non-blocking
def handle_event(event: AgentEvent):
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
# Avoid - slow operations in handler
def slow_handler(event: AgentEvent):
if event.type == EventType.THINKING:
time.sleep(1) # Blocks execution!
print(event.content)
2. Handle Errors Gracefully
def safe_handler(event: AgentEvent):
try:
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
except Exception as e:
# Don't let handler errors crash agent execution
logger.error(f"Event handler error: {e}")
3. Use Async When Needed
async def async_handler(event: AgentEvent):
if event.type == EventType.COMPLETE:
# Async operations are fine
await save_to_database(event.final_result)
4. Buffer Text for UI Updates
class BufferedUI:
def __init__(self):
self.buffer = []
self.last_update = time.time()
def handle_event(self, event: AgentEvent):
if event.type == EventType.THINKING:
self.buffer.append(event.content)
# Update UI every 100ms instead of every chunk
if time.time() - self.last_update > 0.1:
self.flush_buffer()
def flush_buffer(self):
if self.buffer:
print(''.join(self.buffer), end="", flush=True)
self.buffer = []
self.last_update = time.time()
Constructor Parameters
SubstrateAgent(
name: str,
llm_provider: Optional[Union[str, LLMProvider]] = None,
model: Optional[str] = None,
api_key: Optional[str] = None,
config: Optional[AgentConfig] = None,
agent_id: Optional[str] = None,
prompt: Optional[str] = None,
focus: Optional[Union[List[str], str, Dict[str, Any]]] = None,
relay: Optional[str] = None,
mcp: Optional[Union[Dict, List[Dict]]] = None,
display_reasoning: bool = False,
**kwargs # Can include 'tools' parameter
)
Core Parameters
name(str): Agent name - used in logs, traces, and relay messages (required)config(AgentConfig): Complete agent configuration object (overrides other parameters if provided)agent_id(str): Unique identifier (auto-generated if not provided)
LLM Configuration
llm_provider(str | LLMProvider): Provider name ("openai", "anthropic", "grok", "gemini") or provider instance. Auto-detects from environment if omitted.model(str): Model name (e.g., "gpt-4", "claude-3-sonnet-20240229"). Defaults to "gpt-4" if not specified.api_key(str): API key for LLM provider. Auto-detected from environment variables (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.) if not provided.prompt(str): Agent identity and role description. Defines who the agent is and how it should behave.
Advanced Options
focus(FocusConfig | Dict): Focus configuration for filtering tool results. Reduces token usage by filtering tool outputs before sending to LLM. Supports JSONPath, column selection, XPath, CSS selectors, and regex patterns.relay(str): Relay channel name. When set, all execution results are automatically published to this channel for other agents to consume.mcp(Dict | List[Dict]): MCP server configuration(s) for integrating external tools. Tools from MCP servers are automatically discovered and registered.display_reasoning(bool): Enable console output showing agent decisions and execution flow (useful for debugging).tools(List): List of plugins or AgentTool instances to register (passed via kwargs).**kwargs: Additional configuration passed to AgentConfig (e.g.,enable_retry,max_retries,retry_delay)
Smart Constructor Features
The SubstrateAgent constructor includes intelligent defaults:
- Auto-configuration: LLM provider and API keys automatically detected from environment
- Default model: Uses "gpt-4" if no model specified
- Auto-generated IDs: Creates unique agent_id if not provided
- Lazy tool setup: Tools are discovered and registered on first use
Working with Tools
SubstrateAgent uses tools to extend its capabilities. Tools are Python functions decorated with @tool that the agent can autonomously call to accomplish tasks.
Creating Tools
Use the @tool decorator to create tools from Python functions:
from daita import SubstrateAgent
from daita.core.tools import tool
# Define custom tools
@tool
async def fetch_sales_data(region: str) -> dict:
"""Fetch sales data for a specific region."""
return {"sales": [100, 200, 300], "region": region}
@tool
async def calculate_average(numbers: list) -> float:
"""Calculate the average of a list of numbers."""
return sum(numbers) / len(numbers)
# Create agent and register tools
agent = SubstrateAgent(
name="Sales Analyst",
prompt="You are a sales data expert. Provide actionable insights.",
llm_provider="openai",
model="gpt-4"
)
agent.register_tool(fetch_sales_data)
agent.register_tool(calculate_average)
await agent.start()
# Agent autonomously uses tools
answer = await agent.run("What's the average sales for the US region?")
print(answer)
Plugin Integration
Plugins can expose their capabilities as tools that agents use autonomously:
from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin
# Create plugin instance
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent = SubstrateAgent(
name="DB Agent",
llm_provider="openai",
model="gpt-4",
tools=[db_plugin] # Plugin tools automatically registered
)
await agent.start()
# Agent autonomously uses database tools
answer = await agent.run("Get all users who signed up last week")
# Agent uses the query tool exposed by the PostgreSQLPlugin
Adding Plugins Dynamically
Add plugins after agent creation:
from daita.plugins import PostgreSQLPlugin
agent = SubstrateAgent(name="Dynamic Agent")
# Add plugin - its tools are automatically discovered
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent.add_plugin(db_plugin)
await agent.start()
# Now agent can use database tools
answer = await agent.run("How many users do we have?")
Direct Plugin Access
Access plugins directly when you need more control via custom tools:
from daita.core.tools import tool
@tool
async def custom_db_operation(query: str) -> list:
"""Execute a custom database operation."""
async with agent.plugins.postgresql(host="localhost", database="mydb") as db:
results = await db.query(query)
return results
agent.register_tool(custom_db_operation)
Available via agent.plugins:
agent.plugins.postgresql()- PostgreSQL databaseagent.plugins.mysql()- MySQL databaseagent.plugins.mongodb()- MongoDB databaseagent.plugins.elasticsearch()- Elasticsearchagent.plugins.rest()- REST API clientagent.plugins.s3()- AWS S3 storageagent.plugins.slack()- Slack messagingagent.plugins.redis()- Redis messaging/caching
Relay Communication
Configure agents to automatically publish results to relay channels:
agent = SubstrateAgent(
name="Publisher",
relay="data_channel"
)
# Results automatically published to relay channel
result = await agent.run("Analyze this data")
Focus System
The focus system filters tool results BEFORE they reach the LLM, reducing token usage and latency. Focus is applied automatically to tool outputs:
Focus Types
Column Selection (for DataFrames):
agent = SubstrateAgent(
name="Column Filter",
focus=["user_id", "email", "created_at"]
)
# Only these columns from tool results will be sent to the LLM
JSONPath (for JSON/dict data):
agent = SubstrateAgent(
name="JSON Filter",
focus={"type": "jsonpath", "path": "$.users[*].email"}
)
# Extracts all email fields from users array
XPath (for XML data):
agent = SubstrateAgent(
name="XML Filter",
focus={"type": "xpath", "path": "//user/email"}
)
CSS Selectors (for HTML data):
agent = SubstrateAgent(
name="HTML Filter",
focus={"type": "css", "selector": "div.content p"}
)
Focus errors are logged as warnings and tool results are passed through unchanged if filtering fails.
Agent Properties
Health Status
Get comprehensive agent health information:
health = agent.health
Returns:
{
"status": "healthy",
"uptime": 3600.0, # Seconds since agent started
"tools": {
"count": 12,
"setup": True,
"names": ["query_database", "calculate_metrics", ...]
},
"relay": {
"enabled": True,
"channel": "data_results"
},
"llm": {
"available": True,
"provider": "openai"
}
}
Token Usage
Track LLM token usage for cost monitoring:
usage = agent.get_token_usage()
Returns:
{
"total_tokens": 1500,
"prompt_tokens": 800,
"completion_tokens": 700,
"requests": 5
}
Token usage is tracked automatically by the tracing system across all LLM calls.
Agent Lifecycle
Agents automatically start when created and clean up when stopped.
# Manual start/stop
await agent.start()
await agent.stop()
# Context manager (automatic cleanup)
async with SubstrateAgent(name="Temp") as agent:
result = await agent.run("Quick task")
Error Handling
Handle exceptions with the framework's error hierarchy.
from daita.core.exceptions import AgentError, LLMError, ValidationError
agent = SubstrateAgent(name="My Agent")
await agent.start()
try:
result = await agent.run("Analyze this data")
except ValidationError as e:
# Invalid input data
print(f"Validation error: {e}")
except LLMError as e:
# LLM provider issues
print(f"LLM error: {e}")
except AgentError as e:
# Agent-specific errors
print(f"Agent error: {e}")
finally:
await agent.stop()
Advanced Examples
Multi-Step Analysis
from daita import SubstrateAgent
from daita.core.tools import tool
@tool
async def fetch_data(source: str) -> dict:
"""Fetch data from specified source."""
# Data fetching implementation
return {"records": [...]}
@tool
async def validate_data(data: dict) -> dict:
"""Validate data quality."""
# Validation logic
return {"valid": True, "errors": []}
@tool
async def transform_data(data: dict) -> dict:
"""Transform data for analysis."""
# Transformation logic
return {"transformed": [...]}
agent = SubstrateAgent(
name="Pipeline Agent",
prompt="You are a data pipeline agent. Fetch, validate, and transform data step by step."
)
agent.register_tools([fetch_data, validate_data, transform_data])
await agent.start()
# Agent autonomously orchestrates the pipeline
result = await agent.run(
"Fetch data from 'sales_db', validate it, and transform it for reporting"
)
Database Operations
from daita import SubstrateAgent
from daita.core.tools import tool
from daita.plugins import PostgreSQLPlugin
# Add database plugin for tool-based access
db_plugin = PostgreSQLPlugin(host="localhost", database="analytics")
agent = SubstrateAgent(
name="DB Agent",
tools=[db_plugin]
)
await agent.start()
# Agent autonomously uses database tools
result = await agent.run("Get all users who signed up in the last 7 days")
Automatic Tracing
Every SubstrateAgent operation is automatically traced without any configuration required. The tracing system captures comprehensive execution data for observability and debugging.
What Gets Traced
- Agent lifecycle: Start, stop, and initialization events
- Agent executions: Run invocations, data types, execution time
- Tool calls: Tool executions, parameters, results
- LLM calls: Model used, token counts, costs, response times
- Plugin operations: Database queries, API calls, performance metrics
- Focus application: Data filtering operations and results
- Relay publishing: Message delivery to channels
- Errors and retries: Exception details, retry attempts, recovery
Accessing Traces
Traces are captured automatically during agent execution. Access trace data programmatically in your code:
# Get all traces for an agent
from daita.core.tracing import get_agent_traces
traces = get_agent_traces(agent_id=agent.agent_id)
for trace in traces:
print(f"Operation: {trace.operation_type}")
print(f"Duration: {trace.duration_ms}ms")
print(f"Metadata: {trace.metadata}")
Traces are automatically uploaded to the Daita platform when DAITA_API_KEY environment variable is set. For local development without the API key, traces are still captured in memory and accessible via the tracing API.
Decision Display
Enable real-time console output of agent decisions:
agent = SubstrateAgent(
name="Debug Agent",
display_reasoning=True
)
# Console shows:
# [Debug Agent] Running: "Analyze this data"
# [Debug Agent] Tool called: query_database
# [Debug Agent] LLM call: 450 tokens
# [Debug Agent] Result published to relay: data_channel
Useful for debugging and understanding agent behavior during development.
Configuration Class Methods
Global Defaults
Configure default settings for all SubstrateAgent instances:
SubstrateAgent.configure_defaults(
llm_provider="anthropic",
model="claude-3-sonnet-20240229"
)
# All agents created after this use these defaults
agent1 = SubstrateAgent(name="Agent 1") # Uses Anthropic Claude
agent2 = SubstrateAgent(name="Agent 2") # Also uses Anthropic Claude
Useful for setting organization-wide preferences or testing with specific models.
Best Practices
Tool Design
- Keep tools focused: Each tool should do one thing well (e.g.,
query_database,validate_schema) - Use descriptive names: Tool names should clearly indicate purpose
- Document thoroughly: Use clear docstrings so the LLM understands when to use the tool
- Return structured data: Return dictionaries or typed objects for consistency
- Handle errors gracefully: Use try-except within tools to provide useful error messages
Performance
- Monitor token usage: Call
agent.get_token_usage()regularly to track LLM costs - Use focus wisely: Filter tool results early with focus to reduce token overhead
- Test tools independently: Test tool functions separately before registering with agents
- Leverage caching: Identical agent configurations automatically reuse cached instances
Multi-Agent Systems
- Use relay channels: Enable relay for agent-to-agent communication instead of direct calls
- Name agents clearly: Use descriptive names that indicate role (e.g., "DataValidator", "ReportGenerator")
- Share tools strategically: Register only the tools each agent needs for its role
- Implement cleanup: Use context managers or explicit stop() calls to clean up resources
Development Workflow
- Start with prompts: Define clear agent prompts that describe the agent's role
- Add tools incrementally: Start with a few core tools, add more as needed
- Enable display_reasoning: Turn on console output during development for visibility
- Test locally first: Develop with local LLM calls before deploying to production
- Check health regularly: Use
agent.healthto monitor agent status and tool availability
Next Steps
- Plugin System - Database and API integrations for data access
- Workflows - Multi-agent orchestration with relay channels