Daita Logo

Automatic Tracing

Daita provides zero-configuration automatic tracing for all operations. Every agent execution, LLM call, plugin usage, and decision is automatically tracked without any setup required.

#What's Traced

All operations are automatically traced:

  • Agent Execution - Task processing, timing, status, input/output
  • LLM Calls - Tokens used, cost, latency, model details
  • Plugin Operations - Database queries, API calls, file operations
  • Decisions - Reasoning chains, confidence scores, alternatives
  • Workflow Communication - Messages between agents
  • Lifecycle Events - Agent start/stop, errors, retries

#Basic Usage

Tracing happens automatically - no configuration needed:

python
from daita import Agent
 
agent = Agent(name="My Agent", llm_provider="openai", model="gpt-4")
 
# This is automatically traced
result = await agent.run("analyze this data", data={"data": [1, 2, 3]})
 
# Get trace statistics
stats = agent.get_trace_stats()
print(stats)
# {
#     'total_operations': 1,
#     'success_rate': 1.0,
#     'avg_latency': 234.5,
#     'total_tokens': 150,
#     'total_cost': 0.0023
# }

#Get Recent Operations

View recent agent operations:

python
# Get last 10 operations
operations = agent.get_recent_operations(limit=10)
 
for op in operations:
    print(f"Operation: {op['operation']}")
    print(f"Status: {op['status']}")
    print(f"Duration: {op['duration_ms']}ms")
    print(f"Start time: {op['start_time']}")

#Decision Tracing

Agents automatically track decision-making with reasoning:

python
# Get recent decisions
decisions = agent.get_recent_decisions(limit=10)
 
for decision in decisions:
    print(f"Operation: {decision['operation']}")
    print(f"Status: {decision['status']}")
    print(f"Duration: {decision['duration_ms']}ms")
 
# Get decision statistics
decision_stats = agent.get_decision_stats()
print(f"Total decisions: {decision_stats['total_decisions']}")
print(f"Avg confidence: {decision_stats['average_confidence']}")

#LLM Call Tracking

Every LLM call is automatically tracked:

python
from daita import Agent
 
agent = Agent(name="My Agent", llm_provider="openai", model="gpt-4")
 
# Process some tasks (LLM calls tracked automatically)
await agent.run("analyze this dataset", data=data1)
await agent.run("analyze this dataset", data=data2)
 
# Get token usage
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Prompt tokens: {usage['prompt_tokens']}")
print(f"Completion tokens: {usage['completion_tokens']}")
print(f"Total calls: {usage['total_calls']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")

#Plugin Tracing

Plugin operations are automatically traced:

python
from daita import Agent
from daita.plugins import PostgreSQLPlugin
from daita.core.tools import tool
 
# Create plugin
db_plugin = PostgreSQLPlugin(host="localhost", database="mydb")
 
agent = Agent(
    name="DB Agent",
    llm_provider="openai",
    model="gpt-4",
    tools=[db_plugin]  # Plugin tools automatically traced
)
 
await agent.start()
 
# This traces both the agent execution AND the database query
result = await agent.run("Get all users from the database")
 
# View all operations including plugin calls
operations = agent.get_recent_operations(limit=10)

#Workflow Tracing

Workflows automatically trace communication between agents:

python
from daita import Agent
from daita.core.workflow import Workflow
 
# Create agents
agent1 = Agent(name="Agent 1", llm_provider="openai", relay="channel1")
agent2 = Agent(name="Agent 2", llm_provider="openai")
 
# Create workflow
workflow = Workflow("My Pipeline")
workflow.add_agent("agent1", agent1)
workflow.add_agent("agent2", agent2)
workflow.connect("agent1", "channel1", "agent2")
 
await workflow.start()
 
# Communication is automatically traced
await workflow.inject_data("agent1", {"data": "test"}, task="process")
 
# Get communication log
comm_log = workflow.trace_manager.get_workflow_communications(workflow.name)
for message in comm_log:
    print(f"From: {message['from_agent']}")
    print(f"To: {message['to_agent']}")
    print(f"Channel: {message['channel']}")
    print(f"Start time: {message['start_time']}")

#Trace Types

Different operation types are tracked:

python
from daita.core.tracing import TraceType
 
# Available trace types:
# - TraceType.AGENT_EXECUTION: Agent task processing
# - TraceType.LLM_CALL: Language model calls
# - TraceType.TOOL_EXECUTION: Plugin/tool usage
# - TraceType.DECISION_TRACE: Decision-making processes
# - TraceType.WORKFLOW_COMMUNICATION: Agent-to-agent messages
# - TraceType.AGENT_LIFECYCLE: Start/stop events

#Manual Decision Recording

Record custom decisions with reasoning:

python
from daita.core.decision_tracing import DecisionRecorder
from daita.core.tools import tool
 
@tool
async def classify_data(data: dict) -> dict:
    """Classify data with decision recording."""
    async with DecisionRecorder("classification") as decision:
        # Add reasoning steps
        decision.add_reasoning("Checked field X")
        decision.add_reasoning("Validated constraint Y")
 
        # Set confidence
        decision.set_confidence(0.85)
 
        # Record alternatives considered
        decision.add_alternative("option_a")
        decision.add_alternative("option_b")
 
        return {"decision": "option_a"}

#Trace Statistics

Agent metrics and token usage are available through separate methods:

python
# Operation metrics
metrics = agent.trace_manager.get_agent_metrics(agent.agent_id)
print(f"Total operations: {metrics['total_operations']}")
print(f"Successful operations: {metrics['successful_operations']}")
print(f"Failed operations: {metrics['failed_operations']}")
print(f"Success rate: {metrics['success_rate']:.2%}")
print(f"Average latency: {metrics['avg_latency_ms']}ms")
 
# Token and cost metrics
usage = agent.get_token_usage()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Prompt tokens: {usage['prompt_tokens']}")
print(f"Completion tokens: {usage['completion_tokens']}")
print(f"Estimated cost: ${usage['estimated_cost']:.4f}")
 
# Decision metrics
decision_stats = agent.get_decision_stats()
print(f"Total decisions: {decision_stats['total_decisions']}")
print(f"Average confidence: {decision_stats['average_confidence']:.2f}")
print(f"Decision types: {decision_stats['decision_types']}")

#Exporting to Datadog, Jaeger, or Honeycomb

As of v0.13.0, the tracing backend is built on OpenTelemetry. You can attach any OTel-compatible exporter to forward spans to your observability platform of choice.

First, install the OTLP exporter:

bash
pip install 'daita-agents[otlp]'

Then call configure_tracing() before creating any agents:

python
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from daita import configure_tracing
 
configure_tracing(
    exporters=[OTLPSpanExporter(endpoint="http://localhost:4317")]
)
 
# Now create your agents — all spans will be forwarded to the OTLP endpoint
agent = Agent(name="My Agent", llm_provider="openai", model="gpt-4")

Any exporter that implements the OTel SpanExporter interface works: OTLP (Datadog, Honeycomb, Grafana Tempo), Jaeger, Zipkin, and others.

Trace IDs follow the W3C Trace Context format (32-character hex trace ID, 16-character hex span ID), compatible with any OTel-native backend.

#Performance Impact

Tracing is designed to be lightweight:

  • In-memory storage - Recent 500 spans kept in a bounded, thread-safe buffer
  • Async reporting - Non-blocking dashboard uploads via BatchSpanProcessor
  • Minimal overhead - Sub 1ms per operation

#Privacy Considerations

Traces may contain sensitive data:

  • Input/output data from operations
  • LLM prompts and responses
  • Database query results
  • Decision reasoning

Best practices:

  • Review what data is being traced
  • Use focus parameters to filter sensitive fields
  • Consider data retention policies
  • Sanitize data before processing if needed