#Overview
Learn how to implement real-time streaming in your agents to create responsive, transparent AI applications. This example shows how to stream LLM responses character-by-character, display tool calls as they happen, and build event-driven UIs that keep users engaged.
#What You'll Learn
- Understanding streaming vs non-streaming responses
- Setting up streaming event handlers
- Processing different event types (thinking, tool calls, completion)
- Building real-time UI feedback
- Streaming with multiple tool calls
- Best practices for streaming UX
#Prerequisites
- Understanding of basic agent creation
- Familiarity with custom tools
- Basic knowledge of async/await in Python
- LLM API key (OpenAI or Anthropic)
#Why Streaming Matters
Without streaming:
User: "What's the weather in Tokyo?"
[30 second wait...]
Agent: "The weather in Tokyo is 68°F and partly cloudy."With streaming:
User: "What's the weather in Tokyo?"
Agent: "Let me check" [streams in real-time]
🔧 Calling tool: get_current_weather...
Agent: "the weather for you..." [streams as it generates]
Agent: "The weather in Tokyo is 68°F and partly cloudy."Benefits:
- Better UX: Users see progress immediately
- Transparency: Watch the agent's decision-making process
- Engagement: No black-box waiting periods
- Debugging: See exactly what the agent is doing
#Step 1: Basic Streaming Setup
Enable streaming by providing an event handler:
from daita import Agent
from daita.core.streaming import AgentEvent, EventType
import asyncio
async def handle_event(event: AgentEvent):
"""Handle streaming events in real-time"""
if event.type == EventType.THINKING:
# Stream text character by character
print(event.content, end="", flush=True)
async def main():
agent = Agent(
name="streaming_agent",
model="gpt-4o-mini",
llm_provider="openai",
prompt="You are a helpful assistant."
)
await agent.start()
# Enable streaming with on_event parameter
result = await agent.run(
"Tell me a short joke",
on_event=handle_event # This enables streaming!
)
await agent.stop()
if __name__ == "__main__":
asyncio.run(main())What happens:
- Agent generates response
- Each text chunk triggers a
THINKINGevent - Your handler receives events in real-time
- Text streams to console character-by-character
#Step 2: Understanding Event Types
Daita streaming provides different event types for different stages:
from daita.core.streaming import AgentEvent, EventType
async def handle_event(event: AgentEvent):
"""Process different event types"""
if event.type == EventType.ITERATION:
# New reasoning iteration started
print(f"\n🔄 Iteration {event.iteration}/{event.max_iterations}")
elif event.type == EventType.THINKING:
# LLM is generating text
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
# Agent is about to call a tool
print(f"\n🔧 Calling: {event.tool_name}")
print(f" Args: {event.tool_args}")
elif event.type == EventType.TOOL_RESULT:
# Tool execution completed
print(f" ✅ Result: {event.result}")
elif event.type == EventType.COMPLETE:
# Agent finished completely
print(f"\n✅ Complete!")
print(f" Final answer: {event.final_result}")
print(f" Tokens used: {event.token_usage.get('total_tokens')}")
print(f" Cost: ${event.cost:.4f}")
elif event.type == EventType.ERROR:
# An error occurred
print(f"\n❌ Error: {event.error}")Event types:
| Event Type | When | Data Available |
|---|---|---|
ITERATION | New reasoning loop starts | iteration, max_iterations |
THINKING | LLM generates text | content (text chunk) |
TOOL_CALL | Before tool execution | tool_name, tool_args |
TOOL_RESULT | After tool execution | result |
COMPLETE | Task finished | final_result, token_usage, cost |
ERROR | Error occurred | error |
#Step 3: Create Tools for Streaming
Define tools that your agent can call:
from daita.core.tools import tool
@tool
async def get_current_weather(location: str, unit: str = "fahrenheit") -> dict:
"""
Get the current weather for a location.
Args:
location: The city and state, e.g. San Francisco, CA
unit: Temperature unit (celsius or fahrenheit)
Returns:
Weather information including temperature and conditions
"""
# Simulate API call
await asyncio.sleep(0.5)
# Mock weather data
weather_data = {
"San Francisco, CA": {"temp": 72, "condition": "Sunny", "humidity": 65},
"New York, NY": {"temp": 58, "condition": "Cloudy", "humidity": 75},
"Tokyo, Japan": {"temp": 68, "condition": "Partly Cloudy", "humidity": 70},
}
data = weather_data.get(location, {"temp": 70, "condition": "Unknown", "humidity": 50})
if unit.lower() == "celsius":
data["temp"] = round((data["temp"] - 32) * 5/9, 1)
data["unit"] = "C"
else:
data["unit"] = "F"
return {
"location": location,
"temperature": data["temp"],
"unit": data["unit"],
"condition": data["condition"],
"humidity": data["humidity"]
}
@tool
async def get_forecast(location: str, days: int = 3) -> dict:
"""
Get weather forecast for upcoming days.
Args:
location: The city and state
days: Number of days to forecast (1-7)
Returns:
Multi-day weather forecast
"""
await asyncio.sleep(0.3)
forecast = []
for i in range(min(days, 5)):
forecast.append({
"day": i + 1,
"temperature": 72 - i * 2,
"condition": ["Sunny", "Partly Cloudy", "Cloudy", "Rainy", "Clear"][i],
"precipitation_chance": [10, 20, 30, 60, 15][i]
})
return {
"location": location,
"forecast": forecast
}#Step 4: Build a Streaming UI Handler
Create a reusable class to handle streaming events with visual feedback:
class StreamingUI:
"""Handle real-time streaming events with visual feedback."""
def __init__(self):
self.iteration_count = 0
self.thinking_buffer = ""
def handle_event(self, event: AgentEvent):
"""Process streaming events and display them in real-time."""
if event.type == EventType.ITERATION:
self.iteration_count += 1
print(f"\n{'='*70}")
print(f"🔄 ITERATION {event.iteration}/{event.max_iterations}")
print(f"{'='*70}")
self.thinking_buffer = ""
elif event.type == EventType.THINKING:
# Stream text character by character
print(event.content, end="", flush=True)
self.thinking_buffer += event.content
elif event.type == EventType.TOOL_CALL:
print(f"\n\n🔧 TOOL CALL: {event.tool_name}")
print(f" Arguments: {event.tool_args}")
print(" Executing...", end="", flush=True)
elif event.type == EventType.TOOL_RESULT:
print(" ✅ Done")
# Show abbreviated result
result_str = str(event.result)
if len(result_str) > 100:
result_str = result_str[:100] + "..."
print(f" Result: {result_str}")
elif event.type == EventType.COMPLETE:
print(f"\n\n{'='*70}")
print("✅ COMPLETE")
print(f"{'='*70}")
print(f"Final Answer:\n{event.final_result}")
print(f"\nStats:")
print(f" - Iterations: {event.iterations or 'N/A'}")
print(f" - Tokens: {event.token_usage.get('total_tokens', 'N/A') if event.token_usage else 'N/A'}")
if event.cost:
print(f" - Cost: ${event.cost:.4f}")
elif event.type == EventType.ERROR:
print(f"\n❌ ERROR: {event.error}")What this provides:
- Visual separators between iterations
- Real-time text streaming
- Tool call progress indicators
- Result summaries
- Final statistics
#Step 5: Complete Streaming Example
Put it all together:
from daita import Agent
from daita.core.tools import tool
from daita.core.streaming import AgentEvent, EventType
import asyncio
@tool
async def get_current_weather(location: str, unit: str = "fahrenheit") -> dict:
"""Get the current weather for a location."""
await asyncio.sleep(0.5)
weather_data = {
"San Francisco, CA": {"temp": 72, "condition": "Sunny", "humidity": 65},
"New York, NY": {"temp": 58, "condition": "Cloudy", "humidity": 75},
"Tokyo, Japan": {"temp": 68, "condition": "Partly Cloudy", "humidity": 70},
}
data = weather_data.get(location, {"temp": 70, "condition": "Unknown", "humidity": 50})
if unit.lower() == "celsius":
data["temp"] = round((data["temp"] - 32) * 5/9, 1)
data["unit"] = "C"
else:
data["unit"] = "F"
return {
"location": location,
"temperature": data["temp"],
"unit": data["unit"],
"condition": data["condition"],
"humidity": data["humidity"]
}
class StreamingUI:
"""Handle streaming events with visual feedback."""
def __init__(self):
self.iteration_count = 0
def handle_event(self, event: AgentEvent):
if event.type == EventType.ITERATION:
self.iteration_count += 1
print(f"\n{'='*70}")
print(f"🔄 ITERATION {event.iteration}/{event.max_iterations}")
print(f"{'='*70}")
elif event.type == EventType.THINKING:
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
print(f"\n\n🔧 TOOL CALL: {event.tool_name}")
print(f" Arguments: {event.tool_args}")
print(" Executing...", end="", flush=True)
elif event.type == EventType.TOOL_RESULT:
print(" ✅ Done")
elif event.type == EventType.COMPLETE:
print(f"\n\n{'='*70}")
print("✅ COMPLETE")
print(f"{'='*70}")
print(f"\nFinal Answer: {event.final_result}")
if event.token_usage:
print(f"Tokens: {event.token_usage.get('total_tokens', 'N/A')}")
if event.cost:
print(f"Cost: ${event.cost:.4f}")
elif event.type == EventType.ERROR:
print(f"\n❌ ERROR: {event.error}")
async def main():
print("="*70)
print("REAL-TIME STREAMING DEMO")
print("="*70)
# Create agent
agent = Agent(
name="weather_assistant",
model="gpt-4o-mini",
llm_provider="openai",
prompt="""You are a helpful weather assistant. When users ask about
weather, use the available tools to get accurate information."""
)
# Register tools
agent.register_tool(get_current_weather)
# Start agent
await agent.start()
# Create UI handler
ui = StreamingUI()
# Run with streaming
print("\nQuery: What's the weather like in Tokyo?\n")
result = await agent.run(
"What's the weather like in Tokyo?",
on_event=ui.handle_event # Enable streaming!
)
print("\n" + "="*70)
# Cleanup
await agent.stop()
if __name__ == "__main__":
asyncio.run(main())Output:
======================================================================
REAL-TIME STREAMING DEMO
======================================================================
Query: What's the weather like in Tokyo?
======================================================================
🔄 ITERATION 1 of 5
======================================================================
🔧 TOOL CALL: get_current_weather
Arguments: {'location': 'Tokyo, Japan'}
Executing... ✅ Done
======================================================================
🔄 ITERATION 2 of 5
======================================================================
The weather in Tokyo is currently 68°F and partly cloudy with 70% humidity.
======================================================================
✅ COMPLETE
======================================================================
Final Answer: The weather in Tokyo is currently 68°F and partly cloudy with 70% humidity.
Tokens: 127
Cost: $0.0003
======================================================================#Step 6: Multiple Tool Calls
Streaming automatically handles complex multi-tool scenarios:
async def main():
agent = Agent(
name="weather_assistant",
model="gpt-4o-mini",
llm_provider="openai",
prompt="You are a helpful weather assistant."
)
agent.register_tool(get_current_weather)
agent.register_tool(get_forecast)
await agent.start()
ui = StreamingUI()
# Complex query requiring multiple tools
result = await agent.run(
"Compare the current weather in New York and San Francisco, then give me a 3-day forecast for the warmer city.",
on_event=ui.handle_event
)
await agent.stop()
if __name__ == "__main__":
asyncio.run(main())What happens:
- Agent calls
get_current_weatherfor New York - streams - Agent calls
get_current_weatherfor San Francisco - streams - Agent compares temperatures - streams thinking
- Agent calls
get_forecastfor warmer city - streams - Agent generates final answer - streams
All visible in real-time!
#Framework Internals
How streaming works:
- Event Generation: Agent emits events at each step
- Event Handler: Your callback receives events immediately
- Non-Blocking: Events processed asynchronously
- Buffering: Text chunks buffered for smooth streaming
- Error Handling: Errors captured as events
Streaming architecture:
Agent
↓
LLM Provider (OpenAI/Anthropic)
↓
Stream Parser
↓
Event Generator
↓ (emits events)
Your Event Handler
↓
UI UpdatePerformance:
- Streaming adds ~50ms latency (negligible)
- No impact on token usage or cost
- Same final result as non-streaming
- Better perceived performance
#UI Patterns
#Simple Console Output
async def simple_handler(event: AgentEvent):
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)#Structured Progress
async def progress_handler(event: AgentEvent):
if event.type == EventType.ITERATION:
print(f"\n[Iteration {event.iteration}]")
elif event.type == EventType.THINKING:
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
print(f"\n→ Using tool: {event.tool_name}")#Web UI with WebSocket
import json
async def websocket_handler(event: AgentEvent, websocket):
"""Stream events to web clients via WebSocket"""
await websocket.send(json.dumps({
"type": event.type.value,
"content": event.content if hasattr(event, 'content') else None,
"tool_name": event.tool_name if hasattr(event, 'tool_name') else None
}))#Logging and Metrics
import logging
class MetricsHandler:
def __init__(self):
self.tool_calls = 0
self.iterations = 0
async def handle(self, event: AgentEvent):
if event.type == EventType.TOOL_CALL:
self.tool_calls += 1
logging.info(f"Tool called: {event.tool_name}")
elif event.type == EventType.ITERATION:
self.iterations += 1
elif event.type == EventType.COMPLETE:
logging.info(f"Complete - {self.tool_calls} tools, {self.iterations} iterations")#Best Practices
- Always flush output: Use
flush=Truefor real-time display - Handle all event types: Especially
ERRORfor robustness - Don't block in handlers: Keep event handlers fast
- Buffer text appropriately: For smoother streaming UX
- Show progress indicators: Let users know something is happening
- Graceful degradation: Fall back if streaming fails
- Test with slow connections: Ensure streaming works everywhere
#Streaming vs Non-Streaming
| Feature | Non-Streaming | Streaming |
|---|---|---|
| User sees progress | ❌ No | ✅ Yes |
| Perceived speed | Slower | Faster |
| Transparency | Black box | Fully visible |
| Debugging | Harder | Easier |
| Code complexity | Simpler | Slightly more |
| Works offline | ✅ Yes | ✅ Yes |
When to use streaming:
- Interactive applications (chatbots, assistants)
- Long-running tasks (research, analysis)
- User-facing applications
- Debugging and development
When to skip streaming:
- Batch processing
- Background jobs
- API endpoints returning JSON
- Very short tasks (less than 1 second)
#Key Takeaways
- Streaming improves UX by showing real-time progress
- Event handlers receive different event types as they occur
- All LLM providers supported - works with OpenAI, Anthropic, etc.
- Tool calls stream automatically - no extra configuration
- Non-blocking and async - doesn't slow down execution
- Production-ready - use in real applications
#Next Steps
- Multi-Agent Orchestration to stream multi-agent workflows
- Database Query Agent to stream database operations
- Custom Handlers to add streaming to custom logic