Back to Examples
Intermediate

Real-Time Streaming Responses

Build responsive AI applications with real-time text streaming, live tool call updates, and event-driven UI feedback

AgentsStreamingLLM

#Overview

Learn how to implement real-time streaming in your agents to create responsive, transparent AI applications. This example shows how to stream LLM responses character-by-character, display tool calls as they happen, and build event-driven UIs that keep users engaged.

#What You'll Learn

  • Understanding streaming vs non-streaming responses
  • Setting up streaming event handlers
  • Processing different event types (thinking, tool calls, completion)
  • Building real-time UI feedback
  • Streaming with multiple tool calls
  • Best practices for streaming UX

#Prerequisites

#Why Streaming Matters

Without streaming:

python
User: "What's the weather in Tokyo?"
[30 second wait...]
Agent: "The weather in Tokyo is 68°F and partly cloudy."

With streaming:

python
User: "What's the weather in Tokyo?"
Agent: "Let me check" [streams in real-time]
🔧 Calling tool: get_current_weather...
Agent: "the weather for you..." [streams as it generates]
Agent: "The weather in Tokyo is 68°F and partly cloudy."

Benefits:

  • Better UX: Users see progress immediately
  • Transparency: Watch the agent's decision-making process
  • Engagement: No black-box waiting periods
  • Debugging: See exactly what the agent is doing

#Step 1: Basic Streaming Setup

Enable streaming by providing an event handler:

python
from daita import Agent
from daita.core.streaming import AgentEvent, EventType
import asyncio
 
async def handle_event(event: AgentEvent):
    """Handle streaming events in real-time"""
    if event.type == EventType.THINKING:
        # Stream text character by character
        print(event.content, end="", flush=True)
 
async def main():
    agent = Agent(
        name="streaming_agent",
        model="gpt-4o-mini",
        llm_provider="openai",
        prompt="You are a helpful assistant."
    )
 
    await agent.start()
 
    # Enable streaming with on_event parameter
    result = await agent.run(
        "Tell me a short joke",
        on_event=handle_event  # This enables streaming!
    )
 
    await agent.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

What happens:

  1. Agent generates response
  2. Each text chunk triggers a THINKING event
  3. Your handler receives events in real-time
  4. Text streams to console character-by-character

#Step 2: Understanding Event Types

Daita streaming provides different event types for different stages:

python
from daita.core.streaming import AgentEvent, EventType
 
async def handle_event(event: AgentEvent):
    """Process different event types"""
 
    if event.type == EventType.ITERATION:
        # New reasoning iteration started
        print(f"\n🔄 Iteration {event.iteration}/{event.max_iterations}")
 
    elif event.type == EventType.THINKING:
        # LLM is generating text
        print(event.content, end="", flush=True)
 
    elif event.type == EventType.TOOL_CALL:
        # Agent is about to call a tool
        print(f"\n🔧 Calling: {event.tool_name}")
        print(f"   Args: {event.tool_args}")
 
    elif event.type == EventType.TOOL_RESULT:
        # Tool execution completed
        print(f"   ✅ Result: {event.result}")
 
    elif event.type == EventType.COMPLETE:
        # Agent finished completely
        print(f"\n✅ Complete!")
        print(f"   Final answer: {event.final_result}")
        print(f"   Tokens used: {event.token_usage.get('total_tokens')}")
        print(f"   Cost: ${event.cost:.4f}")
 
    elif event.type == EventType.ERROR:
        # An error occurred
        print(f"\n❌ Error: {event.error}")

Event types:

Event TypeWhenData Available
ITERATIONNew reasoning loop startsiteration, max_iterations
THINKINGLLM generates textcontent (text chunk)
TOOL_CALLBefore tool executiontool_name, tool_args
TOOL_RESULTAfter tool executionresult
COMPLETETask finishedfinal_result, token_usage, cost
ERRORError occurrederror

#Step 3: Create Tools for Streaming

Define tools that your agent can call:

python
from daita.core.tools import tool
 
@tool
async def get_current_weather(location: str, unit: str = "fahrenheit") -> dict:
    """
    Get the current weather for a location.
 
    Args:
        location: The city and state, e.g. San Francisco, CA
        unit: Temperature unit (celsius or fahrenheit)
 
    Returns:
        Weather information including temperature and conditions
    """
    # Simulate API call
    await asyncio.sleep(0.5)
 
    # Mock weather data
    weather_data = {
        "San Francisco, CA": {"temp": 72, "condition": "Sunny", "humidity": 65},
        "New York, NY": {"temp": 58, "condition": "Cloudy", "humidity": 75},
        "Tokyo, Japan": {"temp": 68, "condition": "Partly Cloudy", "humidity": 70},
    }
 
    data = weather_data.get(location, {"temp": 70, "condition": "Unknown", "humidity": 50})
 
    if unit.lower() == "celsius":
        data["temp"] = round((data["temp"] - 32) * 5/9, 1)
        data["unit"] = "C"
    else:
        data["unit"] = "F"
 
    return {
        "location": location,
        "temperature": data["temp"],
        "unit": data["unit"],
        "condition": data["condition"],
        "humidity": data["humidity"]
    }
 
 
@tool
async def get_forecast(location: str, days: int = 3) -> dict:
    """
    Get weather forecast for upcoming days.
 
    Args:
        location: The city and state
        days: Number of days to forecast (1-7)
 
    Returns:
        Multi-day weather forecast
    """
    await asyncio.sleep(0.3)
 
    forecast = []
    for i in range(min(days, 5)):
        forecast.append({
            "day": i + 1,
            "temperature": 72 - i * 2,
            "condition": ["Sunny", "Partly Cloudy", "Cloudy", "Rainy", "Clear"][i],
            "precipitation_chance": [10, 20, 30, 60, 15][i]
        })
 
    return {
        "location": location,
        "forecast": forecast
    }

#Step 4: Build a Streaming UI Handler

Create a reusable class to handle streaming events with visual feedback:

python
class StreamingUI:
    """Handle real-time streaming events with visual feedback."""
 
    def __init__(self):
        self.iteration_count = 0
        self.thinking_buffer = ""
 
    def handle_event(self, event: AgentEvent):
        """Process streaming events and display them in real-time."""
 
        if event.type == EventType.ITERATION:
            self.iteration_count += 1
            print(f"\n{'='*70}")
            print(f"🔄 ITERATION {event.iteration}/{event.max_iterations}")
            print(f"{'='*70}")
            self.thinking_buffer = ""
 
        elif event.type == EventType.THINKING:
            # Stream text character by character
            print(event.content, end="", flush=True)
            self.thinking_buffer += event.content
 
        elif event.type == EventType.TOOL_CALL:
            print(f"\n\n🔧 TOOL CALL: {event.tool_name}")
            print(f"   Arguments: {event.tool_args}")
            print("   Executing...", end="", flush=True)
 
        elif event.type == EventType.TOOL_RESULT:
            print(" ✅ Done")
            # Show abbreviated result
            result_str = str(event.result)
            if len(result_str) > 100:
                result_str = result_str[:100] + "..."
            print(f"   Result: {result_str}")
 
        elif event.type == EventType.COMPLETE:
            print(f"\n\n{'='*70}")
            print("✅ COMPLETE")
            print(f"{'='*70}")
            print(f"Final Answer:\n{event.final_result}")
            print(f"\nStats:")
            print(f"  - Iterations: {event.iterations or 'N/A'}")
            print(f"  - Tokens: {event.token_usage.get('total_tokens', 'N/A') if event.token_usage else 'N/A'}")
            if event.cost:
                print(f"  - Cost: ${event.cost:.4f}")
 
        elif event.type == EventType.ERROR:
            print(f"\n❌ ERROR: {event.error}")

What this provides:

  • Visual separators between iterations
  • Real-time text streaming
  • Tool call progress indicators
  • Result summaries
  • Final statistics

#Step 5: Complete Streaming Example

Put it all together:

python
from daita import Agent
from daita.core.tools import tool
from daita.core.streaming import AgentEvent, EventType
import asyncio
 
 
@tool
async def get_current_weather(location: str, unit: str = "fahrenheit") -> dict:
    """Get the current weather for a location."""
    await asyncio.sleep(0.5)
 
    weather_data = {
        "San Francisco, CA": {"temp": 72, "condition": "Sunny", "humidity": 65},
        "New York, NY": {"temp": 58, "condition": "Cloudy", "humidity": 75},
        "Tokyo, Japan": {"temp": 68, "condition": "Partly Cloudy", "humidity": 70},
    }
 
    data = weather_data.get(location, {"temp": 70, "condition": "Unknown", "humidity": 50})
 
    if unit.lower() == "celsius":
        data["temp"] = round((data["temp"] - 32) * 5/9, 1)
        data["unit"] = "C"
    else:
        data["unit"] = "F"
 
    return {
        "location": location,
        "temperature": data["temp"],
        "unit": data["unit"],
        "condition": data["condition"],
        "humidity": data["humidity"]
    }
 
 
class StreamingUI:
    """Handle streaming events with visual feedback."""
 
    def __init__(self):
        self.iteration_count = 0
 
    def handle_event(self, event: AgentEvent):
        if event.type == EventType.ITERATION:
            self.iteration_count += 1
            print(f"\n{'='*70}")
            print(f"🔄 ITERATION {event.iteration}/{event.max_iterations}")
            print(f"{'='*70}")
 
        elif event.type == EventType.THINKING:
            print(event.content, end="", flush=True)
 
        elif event.type == EventType.TOOL_CALL:
            print(f"\n\n🔧 TOOL CALL: {event.tool_name}")
            print(f"   Arguments: {event.tool_args}")
            print("   Executing...", end="", flush=True)
 
        elif event.type == EventType.TOOL_RESULT:
            print(" ✅ Done")
 
        elif event.type == EventType.COMPLETE:
            print(f"\n\n{'='*70}")
            print("✅ COMPLETE")
            print(f"{'='*70}")
            print(f"\nFinal Answer: {event.final_result}")
            if event.token_usage:
                print(f"Tokens: {event.token_usage.get('total_tokens', 'N/A')}")
            if event.cost:
                print(f"Cost: ${event.cost:.4f}")
 
        elif event.type == EventType.ERROR:
            print(f"\n❌ ERROR: {event.error}")
 
 
async def main():
    print("="*70)
    print("REAL-TIME STREAMING DEMO")
    print("="*70)
 
    # Create agent
    agent = Agent(
        name="weather_assistant",
        model="gpt-4o-mini",
        llm_provider="openai",
        prompt="""You are a helpful weather assistant. When users ask about
        weather, use the available tools to get accurate information."""
    )
 
    # Register tools
    agent.register_tool(get_current_weather)
 
    # Start agent
    await agent.start()
 
    # Create UI handler
    ui = StreamingUI()
 
    # Run with streaming
    print("\nQuery: What's the weather like in Tokyo?\n")
 
    result = await agent.run(
        "What's the weather like in Tokyo?",
        on_event=ui.handle_event  # Enable streaming!
    )
 
    print("\n" + "="*70)
 
    # Cleanup
    await agent.stop()
 
 
if __name__ == "__main__":
    asyncio.run(main())

Output:

text
======================================================================
REAL-TIME STREAMING DEMO
======================================================================
 
Query: What's the weather like in Tokyo?
 
======================================================================
🔄 ITERATION 1 of 5
======================================================================
 
🔧 TOOL CALL: get_current_weather
   Arguments: {'location': 'Tokyo, Japan'}
   Executing... ✅ Done
 
======================================================================
🔄 ITERATION 2 of 5
======================================================================
The weather in Tokyo is currently 68°F and partly cloudy with 70% humidity.
 
======================================================================
✅ COMPLETE
======================================================================
 
Final Answer: The weather in Tokyo is currently 68°F and partly cloudy with 70% humidity.
Tokens: 127
Cost: $0.0003
======================================================================

#Step 6: Multiple Tool Calls

Streaming automatically handles complex multi-tool scenarios:

python
async def main():
    agent = Agent(
        name="weather_assistant",
        model="gpt-4o-mini",
        llm_provider="openai",
        prompt="You are a helpful weather assistant."
    )
 
    agent.register_tool(get_current_weather)
    agent.register_tool(get_forecast)
 
    await agent.start()
 
    ui = StreamingUI()
 
    # Complex query requiring multiple tools
    result = await agent.run(
        "Compare the current weather in New York and San Francisco, then give me a 3-day forecast for the warmer city.",
        on_event=ui.handle_event
    )
 
    await agent.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

What happens:

  1. Agent calls get_current_weather for New York - streams
  2. Agent calls get_current_weather for San Francisco - streams
  3. Agent compares temperatures - streams thinking
  4. Agent calls get_forecast for warmer city - streams
  5. Agent generates final answer - streams

All visible in real-time!

#Framework Internals

How streaming works:

  1. Event Generation: Agent emits events at each step
  2. Event Handler: Your callback receives events immediately
  3. Non-Blocking: Events processed asynchronously
  4. Buffering: Text chunks buffered for smooth streaming
  5. Error Handling: Errors captured as events

Streaming architecture:

python
Agent

LLM Provider (OpenAI/Anthropic)

Stream Parser

Event Generator
  ↓ (emits events)
Your Event Handler

UI Update

Performance:

  • Streaming adds ~50ms latency (negligible)
  • No impact on token usage or cost
  • Same final result as non-streaming
  • Better perceived performance

#UI Patterns

#Simple Console Output

python
async def simple_handler(event: AgentEvent):
    if event.type == EventType.THINKING:
        print(event.content, end="", flush=True)

#Structured Progress

python
async def progress_handler(event: AgentEvent):
    if event.type == EventType.ITERATION:
        print(f"\n[Iteration {event.iteration}]")
    elif event.type == EventType.THINKING:
        print(event.content, end="", flush=True)
    elif event.type == EventType.TOOL_CALL:
        print(f"\n→ Using tool: {event.tool_name}")

#Web UI with WebSocket

python
import json
 
async def websocket_handler(event: AgentEvent, websocket):
    """Stream events to web clients via WebSocket"""
    await websocket.send(json.dumps({
        "type": event.type.value,
        "content": event.content if hasattr(event, 'content') else None,
        "tool_name": event.tool_name if hasattr(event, 'tool_name') else None
    }))

#Logging and Metrics

python
import logging
 
class MetricsHandler:
    def __init__(self):
        self.tool_calls = 0
        self.iterations = 0
 
    async def handle(self, event: AgentEvent):
        if event.type == EventType.TOOL_CALL:
            self.tool_calls += 1
            logging.info(f"Tool called: {event.tool_name}")
        elif event.type == EventType.ITERATION:
            self.iterations += 1
        elif event.type == EventType.COMPLETE:
            logging.info(f"Complete - {self.tool_calls} tools, {self.iterations} iterations")

#Best Practices

  1. Always flush output: Use flush=True for real-time display
  2. Handle all event types: Especially ERROR for robustness
  3. Don't block in handlers: Keep event handlers fast
  4. Buffer text appropriately: For smoother streaming UX
  5. Show progress indicators: Let users know something is happening
  6. Graceful degradation: Fall back if streaming fails
  7. Test with slow connections: Ensure streaming works everywhere

#Streaming vs Non-Streaming

FeatureNon-StreamingStreaming
User sees progress❌ No✅ Yes
Perceived speedSlowerFaster
TransparencyBlack boxFully visible
DebuggingHarderEasier
Code complexitySimplerSlightly more
Works offline✅ Yes✅ Yes

When to use streaming:

  • Interactive applications (chatbots, assistants)
  • Long-running tasks (research, analysis)
  • User-facing applications
  • Debugging and development

When to skip streaming:

  • Batch processing
  • Background jobs
  • API endpoints returning JSON
  • Very short tasks (less than 1 second)

#Key Takeaways

  1. Streaming improves UX by showing real-time progress
  2. Event handlers receive different event types as they occur
  3. All LLM providers supported - works with OpenAI, Anthropic, etc.
  4. Tool calls stream automatically - no extra configuration
  5. Non-blocking and async - doesn't slow down execution
  6. Production-ready - use in real applications

#Next Steps