Error Handling - Resilient AI Agent Operations
The Daita error handling system provides intelligent, context-aware error management with automatic retry logic and comprehensive exception classification. It's designed to maximize system reliability while providing clear debugging information and graceful failure modes.
Error Handling Philosophy
The Daita framework uses a sophisticated error classification system where every exception carries "retry hints" that guide automatic recovery behavior. This eliminates guesswork about whether operations should be retried and enables intelligent, context-aware error recovery without manual intervention.
from daita.core.exceptions import TransientError, PermanentError, RetryableError
# Transient errors - retry immediately with minimal delay
raise TransientError("Rate limit exceeded, retry after 1 second")
# Retryable errors - retry with exponential backoff
raise RetryableError("Database connection temporarily unavailable")
# Permanent errors - don't retry, fix the underlying issue
raise PermanentError("Invalid API key format")
Exception Hierarchy
The Daita exception system is built around a clear hierarchy with automatic retry behavior classification and rich contextual information for debugging and monitoring.
DaitaError - Base Exception Class
from daita.core.exceptions import DaitaError
class DaitaError(Exception):
"""Base exception for all Daita errors with built-in retry hints."""
def __init__(self, message: str, retry_hint: str = "unknown", context: dict = None):
super().__init__(message)
self.retry_hint = retry_hint # "transient", "retryable", "permanent", "unknown"
self.context = context or {} # Additional error context
def is_transient(self) -> bool:
"""Check if this error is likely transient."""
return self.retry_hint == "transient"
def is_retryable(self) -> bool:
"""Check if this error might be retryable."""
return self.retry_hint in ["transient", "retryable"]
def is_permanent(self) -> bool:
"""Check if this error is permanent."""
return self.retry_hint == "permanent"
# Usage examples
try:
result = await some_operation()
except DaitaError as e:
if e.is_transient():
print("Will retry immediately")
elif e.is_retryable():
print("Will retry with backoff")
elif e.is_permanent():
print("Manual intervention required")
# Access error context
print(f"Error context: {e.context}")
All Daita exceptions inherit from DaitaError and automatically include retry hints and contextual information, enabling intelligent error handling throughout the system.
Component-Specific Exception Classes
from daita.core.exceptions import AgentError, LLMError, PluginError, ConfigError
# Agent-specific errors with operation context
try:
result = await agent.process("analyze", data)
except AgentError as e:
print(f"Agent {e.agent_id} failed on task: {e.task}")
print(f"Retry hint: {e.retry_hint}")
# LLM provider errors with model information
try:
response = await llm_provider.generate("What is AI?")
except LLMError as e:
print(f"LLM error from {e.provider} using {e.model}")
if e.retry_hint == "transient":
print("Likely rate limit or temporary service issue")
# Plugin errors with plugin identification
try:
results = await db_plugin.query("SELECT * FROM users")
except PluginError as e:
print(f"Plugin error: {e.plugin_name}")
if e.retry_hint == "retryable":
print("Database might be temporarily overloaded")
# Configuration errors (always permanent)
try:
config = load_agent_config("invalid-config.yaml")
except ConfigError as e:
print(f"Configuration error in section: {e.config_section}")
print("Fix configuration - retrying won't help")
Component-specific exceptions provide detailed context about where and why errors occurred, making debugging and monitoring significantly easier.
Retry-Specific Exception Classes
The framework includes three primary retry categories, each with specific handling characteristics and typical use cases.
TransientError - Immediate Retry
from daita.core.exceptions import (
TransientError, RateLimitError, TimeoutError,
ConnectionError, ServiceUnavailableError
)
# Generic transient error
class TemporaryServiceError(TransientError):
def __init__(self, service_name):
super().__init__(f"Service {service_name} temporarily unavailable")
# Rate limiting with retry-after information
try:
response = await api_client.get("/data")
except RateLimitError as e:
if e.retry_after:
print(f"Rate limited, retry after {e.retry_after} seconds")
else:
print("Rate limited, retry with minimal delay")
# Network timeouts with duration context
try:
result = await slow_operation()
except TimeoutError as e:
print(f"Operation timed out after {e.timeout_duration} seconds")
# Will be retried with exponential backoff
# Connection failures with target information
try:
await database.connect()
except ConnectionError as e:
print(f"Failed to connect to {e.host}:{e.port}")
# Automatic retry with connection pooling
Transient errors are expected to resolve quickly and are retried with minimal delay, making them ideal for temporary service disruptions and rate limiting scenarios.
RetryableError - Exponential Backoff
from daita.core.exceptions import (
RetryableError, ResourceBusyError,
DataInconsistencyError, ProcessingQueueFullError
)
# Resource contention that may resolve with time
class DatabaseLockError(RetryableError):
def __init__(self, table_name):
super().__init__(f"Table {table_name} is locked", context={"table": table_name})
# Temporary data inconsistency
try:
data = await get_consistent_data()
except DataInconsistencyError as e:
print(f"Data inconsistency in source: {e.data_source}")
# Will retry with exponential backoff to allow consistency
# Processing queue overload
try:
await submit_processing_job(data)
except ProcessingQueueFullError as e:
print(f"Queue {e.queue_name} is full")
# Exponential backoff gives queue time to drain
# Resource temporarily busy
try:
await acquire_exclusive_resource()
except ResourceBusyError as e:
print(f"Resource {e.resource_name} is busy")
# Backoff prevents resource contention storms
Retryable errors indicate issues that may resolve with time or different timing, warranting exponential backoff to avoid overwhelming stressed systems.
PermanentError - No Retry
from daita.core.exceptions import (
PermanentError, AuthenticationError, PermissionError,
ValidationError, InvalidDataError, NotFoundError
)
# Authentication failures require credential fixes
try:
await authenticate_user(api_key)
except AuthenticationError as e:
print(f"Authentication failed for provider: {e.provider}")
# No retry - fix credentials first
# Permission errors need access control changes
try:
await access_protected_resource()
except PermissionError as e:
print(f"Access denied to resource: {e.resource}")
# No retry - update permissions first
# Data validation failures need data correction
try:
result = await process_user_input(malformed_data)
except ValidationError as e:
print(f"Invalid data: {e.validation_errors}")
# No retry - fix input data first
# Resource not found errors need resource creation
try:
user = await get_user_by_id(nonexistent_id)
except NotFoundError as e:
print(f"Resource not found: {e.resource_type}")
# No retry - create resource or update reference
Permanent errors indicate fundamental issues that won't be resolved by retrying, requiring manual intervention or code changes.
Automatic Retry Logic
The Daita agent system includes sophisticated automatic retry logic with intelligent error classification, exponential backoff, and configurable retry policies.
Agent Retry Configuration
from daita import DaitaSDK
from daita.config import RetryPolicy, RetryStrategy
# Create SDK with retry-enabled agent
sdk = DaitaSDK(api_key="your-api-key")
# Agent with default retry policy
agent = sdk.substrate_agent(
name="Resilient Agent",
enable_retry=True, # Enable retry behavior
max_retries=5, # Up to 5 retry attempts
retry_delay=1.0, # Start with 1 second delay
retry_strategy="exponential_backoff" # Use exponential backoff
)
# Agent with custom retry policy
custom_retry_policy = RetryPolicy(
max_retries=3,
initial_delay=2.0,
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
permanent_errors=[
"AuthenticationError",
"ValidationError",
"CustomBusinessLogicError"
]
)
robust_agent = sdk.substrate_agent(
name="Custom Retry Agent",
enable_retry=True,
retry_policy=custom_retry_policy
)
# Process with automatic retry handling
try:
result = await agent.process("analyze", potentially_failing_data)
print(f"Success after {result['retry_info']['attempt']} attempts")
except Exception as e:
print(f"Failed after all retry attempts: {e}")
Retry Execution Flow
# The agent retry system follows this flow:
async def process_with_retry(task, data):
"""Simplified view of agent retry logic."""
max_attempts = retry_policy.max_retries + 1
for attempt in range(1, max_attempts + 1):
try:
# Add attempt context
context = {
'attempt_number': attempt,
'max_attempts': max_attempts,
'is_retry': attempt > 1
}
# Execute the task
result = await execute_task(task, data, context)
# Success - return with attempt info
return {
'status': 'success',
'result': result,
'retry_info': {
'attempt': attempt,
'max_attempts': max_attempts
}
}
except Exception as e:
# Classify error for retry decision
should_retry = classify_error_for_retry(e, attempt, max_attempts)
if should_retry and attempt < max_attempts:
# Calculate backoff delay
delay = calculate_retry_delay(attempt - 1, retry_policy)
await asyncio.sleep(delay)
continue
else:
# No more retries - return error with context
return {
'status': 'error',
'error': str(e),
'error_type': e.__class__.__name__,
'retry_info': {
'attempt': attempt,
'max_attempts': max_attempts,
'retry_exhausted': True
}
}
# This should never be reached, but ensures return
raise RuntimeError("Retry loop exited unexpectedly")
Exponential Backoff Implementation
import random
from daita.config import RetryStrategy
def calculate_retry_delay(attempt: int, retry_policy) -> float:
"""Calculate delay before next retry attempt."""
if retry_policy.strategy == RetryStrategy.IMMEDIATE:
delay = 0.0
elif retry_policy.strategy == RetryStrategy.FIXED_DELAY:
delay = retry_policy.initial_delay
else: # EXPONENTIAL_BACKOFF (default)
delay = retry_policy.initial_delay * (2 ** attempt)
# Add random jitter to prevent thundering herd
jitter = delay * 0.1 * random.random() # Up to 10% jitter
delay += jitter
return delay
# Example delay progression with initial_delay=1.0:
# Attempt 1: 1.0s + jitter
# Attempt 2: 2.0s + jitter
# Attempt 3: 4.0s + jitter
# Attempt 4: 8.0s + jitter
# Attempt 5: 16.0s + jitter
# Jitter prevents multiple agents from retrying simultaneously
Error Classification System
The framework includes intelligent error classification that examines exception types, messages, and chains to determine appropriate retry behavior.
Automatic Error Classification
from daita.core.exceptions import classify_exception, create_contextual_error
def classify_error_for_retry(error: Exception) -> str:
"""Classify any exception to determine retry behavior."""
# Check if it's already a Daita exception
if hasattr(error, 'retry_hint'):
return error.retry_hint
# Check exception class name
error_class = error.__class__.__name__
# Transient errors (network, temporary service issues)
transient_exceptions = {
'TimeoutError', 'ConnectionError', 'ConnectionResetError',
'ConnectionAbortedError', 'ConnectionRefusedError',
'OSError', 'IOError'
}
# Permanent errors (programming, configuration, data issues)
permanent_exceptions = {
'ValueError', 'TypeError', 'AttributeError', 'KeyError',
'IndexError', 'NameError', 'SyntaxError', 'ImportError',
'FileNotFoundError', 'PermissionError'
}
if error_class in transient_exceptions:
return "transient"
elif error_class in permanent_exceptions:
return "permanent"
# Check error message for patterns
error_message = str(error).lower()
permanent_patterns = [
'authentication', 'permission', 'unauthorized', 'forbidden',
'not found', 'invalid', 'malformed', 'bad request'
]
for pattern in permanent_patterns:
if pattern in error_message:
return "permanent"
# Default to retryable for unknown errors
return "retryable"
# Wrap standard exceptions with retry hints
try:
result = risky_operation()
except ValueError as e:
# Convert to Daita exception with proper classification
daita_error = create_contextual_error(
e,
context={"operation": "risky_operation"},
retry_hint="permanent"
)
raise daita_error
Exception Chain Analysis
def analyze_exception_chain(error: Exception) -> dict:
"""Analyze exception chain for comprehensive error context."""
chain_info = {
'primary_error': str(error),
'error_type': error.__class__.__name__,
'retry_hint': 'unknown',
'chain': [],
'context': {}
}
current_exc = error
while current_exc is not None:
exc_info = {
'type': current_exc.__class__.__name__,
'message': str(current_exc),
'retry_hint': getattr(current_exc, 'retry_hint', None)
}
# Add specific error context
if hasattr(current_exc, 'context'):
exc_info['context'] = current_exc.context
chain_info['chain'].append(exc_info)
# Use the first retry hint found in the chain
if exc_info['retry_hint'] and chain_info['retry_hint'] == 'unknown':
chain_info['retry_hint'] = exc_info['retry_hint']
# Move to the next exception in the chain
current_exc = getattr(current_exc, '__cause__', None)
return chain_info
# Usage in error handling
try:
result = await complex_operation()
except Exception as e:
analysis = analyze_exception_chain(e)
print(f"Primary error: {analysis['primary_error']}")
print(f"Retry recommendation: {analysis['retry_hint']}")
print(f"Exception chain length: {len(analysis['chain'])}")
# Log full chain for debugging
for i, exc in enumerate(analysis['chain']):
print(f" {i+1}. {exc['type']}: {exc['message']}")
Error Handling Patterns
Implement consistent error handling patterns across different components and use cases for maintainable, reliable code.
Agent Error Handling Patterns
from daita import DaitaSDK
from daita.core.exceptions import AgentError, LLMError, ValidationError
async def robust_agent_processing():
"""Comprehensive agent error handling example."""
sdk = DaitaSDK(api_key="your-api-key")
agent = sdk.substrate_agent(name="Robust Agent", enable_retry=True)
try:
# Process with comprehensive error handling
result = await agent.process("analyze", complex_data)
# Check if operation used retries
if result.get('retry_info', {}).get('attempt', 1) > 1:
print(f"Success after {result['retry_info']['attempt']} attempts")
return result['result']
except AgentError as e:
# Agent-specific error handling
if e.is_permanent():
print(f"Agent configuration issue: {e}")
# Log for developer attention
logger.error(f"Permanent agent error: {e}", extra={
'agent_id': e.agent_id,
'task': e.task,
'context': e.context
})
else:
print(f"Agent operation failed: {e}")
# Could implement fallback logic here
except LLMError as e:
# LLM provider error handling
if "rate limit" in str(e).lower():
print("LLM rate limit - implement request queuing")
elif "authentication" in str(e).lower():
print("LLM authentication issue - check API key")
else:
print(f"LLM provider error: {e}")
except ValidationError as e:
# Data validation error - fix input
print(f"Input data validation failed: {e}")
# Don't retry - fix the data first
except Exception as e:
# Unexpected error - wrap and re-raise
wrapped_error = create_contextual_error(
e,
context={"operation": "agent_processing", "data_type": type(complex_data).__name__}
)
raise wrapped_error
# Use the robust processing
try:
result = await robust_agent_processing()
print(f"Processing completed: {result}")
except Exception as e:
print(f"Processing failed permanently: {e}")
Plugin Error Handling Patterns
from daita.core.exceptions import PluginError, ConnectionError
async def robust_database_operation():
"""Database operation with comprehensive error handling."""
sdk = DaitaSDK()
try:
async with sdk.plugins.postgresql(
host="localhost",
database="production"
) as db:
return await db.query("SELECT * FROM users WHERE active = true")
except ConnectionError as e:
# Network connectivity issue
if e.host and e.port:
print(f"Cannot connect to database at {e.host}:{e.port}")
# Implement fallback or circuit breaker
print("Falling back to cached data")
return get_cached_user_data()
except PluginError as e:
# Database-specific error
if "permission" in str(e).lower():
print("Database permission error - check credentials")
raise PermanentError("Database access denied")
elif "timeout" in str(e).lower():
print("Database timeout - query too complex or DB overloaded")
# Could implement query simplification here
# Re-raise with additional context
raise PluginError(
f"Database operation failed: {e}",
plugin_name="postgresql",
context={"operation": "user_query", "database": "production"}
)
except Exception as e:
# Wrap unexpected database errors
print(f"Unexpected database error: {e}")
raise PluginError(
f"Database operation failed unexpectedly: {e}",
plugin_name="postgresql",
retry_hint="retryable"
)
# Circuit breaker pattern for database operations
class DatabaseCircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
async def call_with_circuit_breaker(self, operation):
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
else:
raise CircuitBreakerOpenError("Database circuit breaker is open")
try:
result = await operation()
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
HTTP Error Handling Patterns
from daita.sdk.api import APIClient, APIError
async def robust_api_client_usage():
"""Comprehensive HTTP error handling with API client."""
async with APIClient(
base_url="https://api.external-service.com",
max_retries=3,
retry_delay=1.0
) as client:
try:
response = await client.post("/process", json_data={
"text": "analyze this content",
"options": {"detailed": True}
})
return response
except APIError as e:
# Handle specific HTTP status codes
if e.status_code == 401:
raise AuthenticationError(
"API authentication failed - check API key",
provider="external-service"
)
elif e.status_code == 403:
raise PermissionError(
"API access forbidden - check permissions",
resource="process_endpoint"
)
elif e.status_code == 404:
raise NotFoundError(
"API endpoint not found",
resource_type="endpoint",
context={"url": e.url}
)
elif e.status_code == 429:
# Extract retry-after header if available
retry_after = e.response_data.get('retry_after', 60)
raise RateLimitError(
"API rate limit exceeded",
retry_after=retry_after
)
elif e.status_code >= 500:
# Server errors are usually transient
raise ServiceUnavailableError(
f"API server error: {e.status_code}",
service_name="external-service"
)
else:
# Client errors (4xx) are usually permanent
raise PermanentError(
f"API client error: {e}",
context={"status_code": e.status_code, "response": e.response_data}
)
except asyncio.TimeoutError:
# Network timeout
raise TimeoutError(
"API request timed out",
timeout_duration=client.timeout
)
except Exception as e:
# Wrap unexpected errors
raise create_contextual_error(
e,
context={"api_endpoint": "/process", "service": "external-service"},
retry_hint="retryable"
)
Error Monitoring & Debugging
Implement comprehensive error monitoring and debugging capabilities for production systems with detailed logging and metrics collection.
Error Logging Strategies
import logging
import json
from datetime import datetime
from daita.core.exceptions import DaitaError
# Configure structured logging for error tracking
class ErrorLogger:
def __init__(self, logger_name="daita.errors"):
self.logger = logging.getLogger(logger_name)
# Configure structured logging
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_error(self, error: Exception, context: dict = None):
"""Log error with structured information."""
error_info = {
'timestamp': datetime.utcnow().isoformat(),
'error_type': error.__class__.__name__,
'error_message': str(error),
'context': context or {}
}
# Add Daita-specific information
if isinstance(error, DaitaError):
error_info.update({
'retry_hint': error.retry_hint,
'is_transient': error.is_transient(),
'is_retryable': error.is_retryable(),
'is_permanent': error.is_permanent(),
'daita_context': error.context
})
# Add component-specific information
if hasattr(error, 'agent_id'):
error_info['agent_id'] = error.agent_id
if hasattr(error, 'provider'):
error_info['provider'] = error.provider
if hasattr(error, 'plugin_name'):
error_info['plugin_name'] = error.plugin_name
# Log with appropriate level
if isinstance(error, DaitaError) and error.is_permanent():
self.logger.error(json.dumps(error_info))
else:
self.logger.warning(json.dumps(error_info))
def log_retry_attempt(self, error: Exception, attempt: int, max_attempts: int, delay: float):
"""Log retry attempt information."""
retry_info = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'retry_attempt',
'error_type': error.__class__.__name__,
'error_message': str(error),
'attempt': attempt,
'max_attempts': max_attempts,
'retry_delay': delay
}
self.logger.info(json.dumps(retry_info))
# Global error logger instance
error_logger = ErrorLogger()
# Use in error handling
try:
result = await risky_operation()
except Exception as e:
error_logger.log_error(e, context={
'operation': 'risky_operation',
'user_id': 'user123',
'request_id': 'req456'
})
raise
Error Metrics & Monitoring
from collections import defaultdict, Counter
from datetime import datetime, timedelta
class ErrorMetrics:
"""Collect and analyze error metrics for monitoring."""
def __init__(self):
self.error_counts = Counter()
self.error_history = []
self.retry_stats = defaultdict(list)
def record_error(self, error: Exception, context: dict = None):
"""Record error occurrence for metrics."""
error_record = {
'timestamp': datetime.utcnow(),
'error_type': error.__class__.__name__,
'error_message': str(error),
'context': context or {}
}
# Add Daita-specific metrics
if isinstance(error, DaitaError):
error_record.update({
'retry_hint': error.retry_hint,
'component': self._get_component_from_error(error)
})
self.error_history.append(error_record)
self.error_counts[error.__class__.__name__] += 1
# Keep only recent history (last 24 hours)
cutoff_time = datetime.utcnow() - timedelta(hours=24)
self.error_history = [
record for record in self.error_history
if record['timestamp'] > cutoff_time
]
def record_retry_success(self, attempts: int, total_delay: float):
"""Record successful retry for analysis."""
self.retry_stats['successful_retries'].append({
'attempts': attempts,
'total_delay': total_delay,
'timestamp': datetime.utcnow()
})
def get_error_summary(self) -> dict:
"""Get comprehensive error summary."""
recent_errors = [
record for record in self.error_history
if record['timestamp'] > datetime.utcnow() - timedelta(hours=1)
]
return {
'total_errors_24h': len(self.error_history),
'recent_errors_1h': len(recent_errors),
'error_types': dict(self.error_counts),
'top_errors': self.error_counts.most_common(5),
'retry_success_rate': self._calculate_retry_success_rate(),
'error_rate_trend': self._calculate_error_trend()
}
def _get_component_from_error(self, error: DaitaError) -> str:
"""Identify component from error type."""
if hasattr(error, 'agent_id'):
return 'agent'
elif hasattr(error, 'provider'):
return 'llm'
elif hasattr(error, 'plugin_name'):
return 'plugin'
else:
return 'framework'
def _calculate_retry_success_rate(self) -> float:
"""Calculate percentage of retries that eventually succeeded."""
successful_retries = len(self.retry_stats['successful_retries'])
total_retry_attempts = sum(
record['attempts'] for record in self.retry_stats['successful_retries']
)
if total_retry_attempts == 0:
return 0.0
return (successful_retries / total_retry_attempts) * 100
def _calculate_error_trend(self) -> str:
"""Calculate whether error rate is increasing or decreasing."""
if len(self.error_history) < 10:
return "insufficient_data"
# Compare last hour to previous hour
now = datetime.utcnow()
last_hour = [
record for record in self.error_history
if now - timedelta(hours=1) <= record['timestamp'] <= now
]
prev_hour = [
record for record in self.error_history
if now - timedelta(hours=2) <= record['timestamp'] <= now - timedelta(hours=1)
]
if len(prev_hour) == 0:
return "no_baseline"
rate_change = (len(last_hour) - len(prev_hour)) / len(prev_hour)
if rate_change > 0.2:
return "increasing"
elif rate_change < -0.2:
return "decreasing"
else:
return "stable"
# Global metrics instance
error_metrics = ErrorMetrics()
# Integration with error handling
def handle_error_with_metrics(error: Exception, context: dict = None):
"""Handle error with metrics collection."""
# Log error
error_logger.log_error(error, context)
# Record metrics
error_metrics.record_error(error, context)
# Check if immediate attention needed
summary = error_metrics.get_error_summary()
if summary['recent_errors_1h'] > 50: # Alert threshold
print("HIGH ERROR RATE ALERT: Consider checking system health")
Best Practices
Follow these practices for robust, maintainable error handling that scales with your application complexity.
1. Use Appropriate Exception Types
# Good: Use specific exception types with proper retry hints
from daita.core.exceptions import ValidationError, TransientError, PermanentError
def validate_user_input(data):
if not data.get('email'):
raise ValidationError("Email is required") # Permanent - don't retry
if '@' not in data['email']:
raise ValidationError("Invalid email format") # Permanent - don't retry
def call_external_api():
try:
response = requests.get("https://api.example.com/data", timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
raise TransientError("API request timed out") # Transient - retry quickly
except requests.exceptions.ConnectionError:
raise TransientError("Failed to connect to API") # Transient - retry quickly
except requests.exceptions.HTTPError as e:
if e.response.status_code >= 500:
raise TransientError(f"Server error: {e.response.status_code}")
else:
raise PermanentError(f"Client error: {e.response.status_code}")
# Avoid: Generic exceptions without context
def bad_validation(data):
if not data.get('email'):
raise Exception("Something went wrong") # No context, unknown retry behavior
2. Provide Rich Error Context
# Good: Rich context for debugging and monitoring
def process_user_data(user_id, operation_type):
try:
user_data = get_user_data(user_id)
result = perform_operation(user_data, operation_type)
return result
except Exception as e:
# Wrap with comprehensive context
raise create_contextual_error(
e,
context={
'user_id': user_id,
'operation_type': operation_type,
'timestamp': datetime.utcnow().isoformat(),
'system_state': get_system_state(),
'user_tier': get_user_tier(user_id)
}
)
# Avoid: Errors without context
def bad_process_user_data(user_id, operation_type):
try:
result = perform_operation(get_user_data(user_id), operation_type)
return result
except Exception as e:
raise e # No additional context
3. Implement Graceful Degradation
# Good: Graceful degradation with fallback options
async def get_user_recommendations(user_id):
"""Get recommendations with multiple fallback strategies."""
try:
# Primary: AI-powered recommendations
return await get_ai_recommendations(user_id)
except LLMError as e:
if e.is_transient():
# Temporary LLM issue - retry once quickly
await asyncio.sleep(1)
try:
return await get_ai_recommendations(user_id)
except LLMError:
pass # Fall through to next strategy
try:
# Fallback 1: Collaborative filtering
return await get_collaborative_recommendations(user_id)
except Exception as e:
logger.warning(f"Collaborative filtering failed: {e}")
try:
# Fallback 2: Popular items
return await get_popular_recommendations()
except Exception as e:
logger.warning(f"Popular recommendations failed: {e}")
# Final fallback: Static recommendations
return get_default_recommendations()
# Avoid: All-or-nothing error handling
async def bad_get_recommendations(user_id):
return await get_ai_recommendations(user_id) # Fails completely if AI is down
4. Monitor and Alert on Error Patterns
# Good: Proactive error monitoring
class ErrorMonitor:
def __init__(self):
self.error_patterns = {
'auth_failures': 0,
'rate_limits': 0,
'timeouts': 0,
'permanent_errors': 0
}
def check_error_health(self, error: Exception):
"""Check if error indicates system health issues."""
if isinstance(error, AuthenticationError):
self.error_patterns['auth_failures'] += 1
if self.error_patterns['auth_failures'] > 10:
self.alert("High authentication failure rate")
elif isinstance(error, RateLimitError):
self.error_patterns['rate_limits'] += 1
if self.error_patterns['rate_limits'] > 5:
self.alert("Frequent rate limiting - consider request throttling")
elif isinstance(error, TimeoutError):
self.error_patterns['timeouts'] += 1
if self.error_patterns['timeouts'] > 20:
self.alert("High timeout rate - check network/service performance")
def alert(self, message: str):
"""Send alert to monitoring system."""
logger.critical(f"SYSTEM ALERT: {message}")
# Integration with monitoring systems (PagerDuty, Slack, etc.)
# Integrate with error handling
monitor = ErrorMonitor()
try:
result = await operation()
except Exception as e:
monitor.check_error_health(e)
raise
5. Test Error Handling Paths
import pytest
from unittest.mock import patch
from daita.core.exceptions import TransientError, PermanentError
@pytest.mark.asyncio
async def test_agent_retry_behavior():
"""Test that agents properly retry transient errors."""
sdk = DaitaSDK(api_key="test-key")
agent = sdk.substrate_agent(name="Test Agent", enable_retry=True, max_retries=2)
# Mock a function that fails twice then succeeds
call_count = 0
async def failing_operation():
nonlocal call_count
call_count += 1
if call_count <= 2:
raise TransientError("Temporary failure")
return {"status": "success", "data": "test"}
# Patch the agent's process method
with patch.object(agent, '_process_once', side_effect=failing_operation):
result = await agent.process("test_task", {})
# Verify retry behavior
assert result['status'] == 'success'
assert result['retry_info']['attempt'] == 3 # Succeeded on 3rd attempt
assert call_count == 3
@pytest.mark.asyncio
async def test_permanent_error_no_retry():
"""Test that permanent errors are not retried."""
sdk = DaitaSDK(api_key="test-key")
agent = sdk.substrate_agent(name="Test Agent", enable_retry=True, max_retries=3)
call_count = 0
async def permanent_failure():
nonlocal call_count
call_count += 1
raise PermanentError("Configuration error")
with patch.object(agent, '_process_once', side_effect=permanent_failure):
result = await agent.process("test_task", {})
# Verify no retries for permanent errors
assert result['status'] == 'error'
assert result['retry_info']['attempt'] == 1 # Only one attempt
assert call_count == 1
# Test error classification
def test_error_classification():
"""Test automatic error classification."""
from daita.core.exceptions import classify_exception
# Test standard library exceptions
assert classify_exception(ValueError("Invalid input")) == "permanent"
assert classify_exception(ConnectionError("Network failed")) == "transient"
assert classify_exception(OSError("System error")) == "transient"
# Test Daita exceptions
transient_error = TransientError("Rate limited")
assert classify_exception(transient_error) == "transient"
assert transient_error.is_transient() == True
assert transient_error.is_retryable() == True
assert transient_error.is_permanent() == False
The Daita error handling system provides the foundation for building resilient AI agent applications that gracefully handle failures, automatically recover from transient issues, and provide comprehensive debugging information for permanent problems.
Next Steps
- Explore Configuration for setting up retry policies and error handling preferences
- Check out Plugin System for database and external service error handling
- See Substrate Agent for agent-specific error handling customization