Automatic Tracing
Daita provides zero-configuration automatic tracing for all operations. Every agent execution, LLM call, plugin usage, and decision is automatically tracked without any setup required.
#What's Traced
All operations are automatically traced:
- Agent Execution - Task processing, timing, status, input/output
- LLM Calls - Tokens used, cost, latency, model details
- Plugin Operations - Database queries, API calls, file operations
- Decisions - Reasoning chains, confidence scores, alternatives
- Workflow Communication - Messages between agents
- Lifecycle Events - Agent start/stop, errors, retries
#Basic Usage
Tracing happens automatically - no configuration needed:
from daita import Agent
agent = Agent(name="My Agent", llm_provider="openai", model="gpt-4")
# This is automatically traced
result = await agent.run("analyze this data", data={"data": [1, 2, 3]})
# Get trace statistics
stats = agent.get_trace_stats()
print(stats)
# {
# 'total_operations': 1,
# 'success_rate': 1.0,
# 'avg_latency': 234.5,
# 'total_tokens': 150,
# 'total_cost': 0.0023
# }#Get Recent Operations
View recent agent operations:
# Get last 10 operations
operations = agent.get_recent_operations(limit=10)
for op in operations:
print(f"Operation: {op['operation']}")
print(f"Status: {op['status']}")
print(f"Duration: {op['duration_ms']}ms")
print(f"Start time: {op['start_time']}")#Decision Tracing
Agents automatically track decision-making with reasoning:
# Get recent decisions
decisions = agent.get_recent_decisions(limit=10)
for decision in decisions:
print(f"Operation: {decision['operation']}")
print(f"Status: {decision['status']}")
print(f"Duration: {decision['duration_ms']}ms")
# Get decision statistics
decision_stats = agent.get_decision_stats()
print(f"Total decisions: {decision_stats['total_decisions']}")
print(f"Avg confidence: {decision_stats['average_confidence']}")#LLM Call Tracking
Every LLM call is automatically tracked:
from daita import Agent
agent = Agent(name="My Agent", llm_provider="openai", model="gpt-4")
# Process some tasks (LLM calls tracked automatically)
await agent.run("analyze this dataset", data=data1)
await agent.run("analyze this dataset", data=data2)
# Get token usage
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Prompt tokens: {usage['prompt_tokens']}")
print(f"Completion tokens: {usage['completion_tokens']}")
print(f"Total calls: {usage['total_calls']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")#Plugin Tracing
Plugin operations are automatically traced:
from daita import Agent
from daita.plugins import PostgreSQLPlugin
from daita.core.tools import tool
# Create plugin
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
agent = Agent(
name="DB Agent",
llm_provider="openai",
model="gpt-4",
tools=[db_plugin] # Plugin tools automatically traced
)
await agent.start()
# This traces both the agent execution AND the database query
result = await agent.run("Get all users from the database")
# View all operations including plugin calls
operations = agent.get_recent_operations(limit=10)#Workflow Tracing
Workflows automatically trace communication between agents:
from daita import Agent
from daita.core.workflow import Workflow
# Create agents
agent1 = Agent(name="Agent 1", llm_provider="openai", relay="channel1")
agent2 = Agent(name="Agent 2", llm_provider="openai")
# Create workflow
workflow = Workflow("My Pipeline")
workflow.add_agent("agent1", agent1)
workflow.add_agent("agent2", agent2)
workflow.connect("agent1", "channel1", "agent2")
await workflow.start()
# Communication is automatically traced
await workflow.inject_data("agent1", {"data": "test"}, task="process")
# Get communication log
comm_log = workflow.trace_manager.get_workflow_communications(workflow.name)
for message in comm_log:
print(f"From: {message['from_agent']}")
print(f"To: {message['to_agent']}")
print(f"Channel: {message['channel']}")
print(f"Start time: {message['start_time']}")#Trace Types
Different operation types are tracked:
from daita.core.tracing import TraceType
# Available trace types:
# - TraceType.AGENT_EXECUTION: Agent task processing
# - TraceType.LLM_CALL: Language model calls
# - TraceType.TOOL_EXECUTION: Plugin/tool usage
# - TraceType.DECISION_TRACE: Decision-making processes
# - TraceType.WORKFLOW_COMMUNICATION: Agent-to-agent messages
# - TraceType.AGENT_LIFECYCLE: Start/stop events#Manual Decision Recording
Record custom decisions with reasoning:
from daita.core.decision_tracing import DecisionRecorder
from daita.core.tools import tool
@tool
async def classify_data(data: dict) -> dict:
"""Classify data with decision recording."""
async with DecisionRecorder("classification") as decision:
# Add reasoning steps
decision.add_reasoning("Checked field X")
decision.add_reasoning("Validated constraint Y")
# Set confidence
decision.set_confidence(0.85)
# Record alternatives considered
decision.add_alternative("option_a")
decision.add_alternative("option_b")
return {"decision": "option_a"}#Trace Statistics
Agent metrics and token usage are available through separate methods:
# Operation metrics
metrics = agent.trace_manager.get_agent_metrics(agent.agent_id)
print(f"Total operations: {metrics['total_operations']}")
print(f"Successful operations: {metrics['successful_operations']}")
print(f"Failed operations: {metrics['failed_operations']}")
print(f"Success rate: {metrics['success_rate']:.2%}")
print(f"Average latency: {metrics['avg_latency_ms']}ms")
# Token and cost metrics
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Prompt tokens: {usage['prompt_tokens']}")
print(f"Completion tokens: {usage['completion_tokens']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")
# Decision metrics
decision_stats = agent.get_decision_stats()
print(f"Total decisions: {decision_stats['total_decisions']}")
print(f"Average confidence: {decision_stats['average_confidence']:.2f}")
print(f"Decision types: {decision_stats['decision_types']}")#Exporting to Datadog, Jaeger, or Honeycomb
As of v0.13.0, the tracing backend is built on OpenTelemetry. You can attach any OTel-compatible exporter to forward spans to your observability platform of choice.
First, install the OTLP exporter:
pip install 'daita-agents[otlp]'Then call configure_tracing() before creating any agents:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from daita import configure_tracing
configure_tracing(
exporters=[OTLPSpanExporter(endpoint="http://localhost:4317")]
)
# Now create your agents — all spans will be forwarded to the OTLP endpoint
agent = Agent(name="My Agent", llm_provider="openai", model="gpt-4")Any exporter that implements the OTel SpanExporter interface works: OTLP (Datadog, Honeycomb, Grafana Tempo), Jaeger, Zipkin, and others.
Trace IDs follow the W3C Trace Context format (32-character hex trace ID, 16-character hex span ID), compatible with any OTel-native backend.
#Performance Impact
Tracing is designed to be lightweight:
- In-memory storage - Recent 500 spans kept in a bounded, thread-safe buffer
- Async reporting - Non-blocking dashboard uploads via
BatchSpanProcessor - Minimal overhead - Sub 1ms per operation
#Privacy Considerations
Traces may contain sensitive data:
- Input/output data from operations
- LLM prompts and responses
- Database query results
- Decision reasoning
Best practices:
- Review what data is being traced
- Use focus parameters to filter sensitive fields
- Consider data retention policies
- Sanitize data before processing if needed
#Related Documentation
- Agents - Agent creation and usage
- Workflows - Multi-agent orchestration
- Plugins - Database and API integrations
- Error Handling - Error tracing and retry logic