Workflows
Workflows orchestrate multiple agents using relay-based communication for multi-agent systems. Agents publish results to named channels, and the workflow automatically routes these messages to subscribed agents. All workflow communication is automatically traced without any configuration.
Overview
A Workflow manages agent lifecycle, connections, and communication. It provides:
- Relay-based messaging: Agents communicate through named channels instead of direct calls
- Automatic routing: Workflow handles message delivery between agents
- Lifecycle management: Coordinated start/stop of all agents
- Automatic tracing: All communication is traced for observability
- Reliability features: Optional message acknowledgments, retries, and backpressure control
- Horizontal scaling: Agent pools for parallel processing
Core Concepts
Relay Channels
Relay channels are named communication pathways. Agents publish messages to channels, and other agents subscribe to receive those messages.
Key Points:
- Channels are identified by string names (e.g., "processed_data", "analysis_results")
- Multiple agents can subscribe to the same channel
- Messages are delivered to all subscribers
- Channels are managed by the
RelayManager(global singleton)
Workflow Execution Flow
- Create agents and workflow
- Add agents to workflow with
add_agent() - Connect agents via relay channels with
connect() - Start workflow (starts all agents and sets up subscriptions)
- Inject data into entry point agent(s)
- Process data flows automatically through connected agents
- Stop workflow (stops agents and cleans up subscriptions)
Basic Workflow
from daita.agents import SubstrateAgent
from daita.core.workflow import Workflow
# Create agents
processor = SubstrateAgent(
name="Processor",
relay="processed_data" # Publishes to this channel
)
analyzer = SubstrateAgent(name="Analyzer")
# Create workflow
workflow = Workflow("Data Pipeline")
# Add agents
workflow.add_agent("processor", processor)
workflow.add_agent("analyzer", analyzer)
# Connect: processor -> processed_data channel -> analyzer
workflow.connect("processor", "processed_data", "analyzer")
# Start workflow
await workflow.start()
# Trigger workflow by having first agent process data
# Agents autonomously decide how to process based on their tools and prompts
await processor.run("Process this data: [1, 2, 3]")
# Stop workflow
await workflow.stop()
Constructor Parameters
Workflow(
name: str,
project_id: Optional[str] = None,
relay_manager: Optional[RelayManager] = None
)
Parameters:
name(str): Workflow name for identification and tracingproject_id(str): Optional project identifier for multi-project environmentsrelay_manager(RelayManager): Custom relay manager (uses global singleton if not provided)
Core Methods
Adding Agents
Add individual agents to the workflow:
workflow.add_agent(name: str, agent: Any) -> Workflow
Returns the workflow for method chaining.
Connecting Agents
Connect agents through relay channels:
workflow.connect(
from_agent: str, # Source agent name
channel: str, # Relay channel name
to_agent: str # Destination agent name
) -> Workflow
The connection defines how data flows between agents. When from_agent publishes to channel, the data is automatically routed to to_agent which receives it via the receive_message() method and processes it autonomously using its tools.
Returns the workflow for method chaining.
Starting and Stopping
Start the workflow (starts all agents and sets up relay subscriptions):
await workflow.start()
Stop the workflow (stops all agents and cleans up subscriptions):
await workflow.stop()
Always use try/finally or context managers to ensure proper cleanup.
Triggering Workflows
Option 1: Direct Agent Execution (Recommended)
Trigger workflows by calling run() on the entry point agent:
# Start workflow
await workflow.start()
# Trigger via first agent - it processes and publishes to its relay channel
result = await entry_agent.run("Process this dataset")
# Results automatically flow through connected agents
Option 2: Legacy inject_data() Method
For backward compatibility, you can use inject_data() (requires agent to have process() method):
await workflow.inject_data(
agent_name: str,
data: Any
)
Note: Most modern agents use run() instead of process(), so Option 1 is preferred.
Multi-Stage Pipeline
Create pipelines with multiple processing stages. Data flows sequentially through connected agents.
from daita import SubstrateAgent
from daita.core.workflow import Workflow
from daita.core.tools import tool
# Define tools for each stage
@tool
async def clean_data(data: dict) -> dict:
"""Clean and normalize data."""
# Cleaning logic
return {"cleaned": data}
@tool
async def analyze_data(data: dict) -> dict:
"""Analyze cleaned data."""
# Analysis logic
return {"analysis": "results"}
# Stage 1: Data cleaning
cleaner = SubstrateAgent(
name="Cleaner",
prompt="You are a data cleaning expert. Clean and normalize data.",
relay="clean_data"
)
cleaner.register_tool(clean_data)
# Stage 2: Analysis
analyzer = SubstrateAgent(
name="Analyzer",
prompt="You are a data analyst. Analyze cleaned data and extract insights.",
relay="analysis_results"
)
analyzer.register_tool(analyze_data)
# Stage 3: Report generation
reporter = SubstrateAgent(
name="Reporter",
prompt="You are a report writer. Generate clear, concise reports."
)
# Build workflow
workflow = Workflow("Analysis Pipeline")
workflow.add_agent("cleaner", cleaner)
workflow.add_agent("analyzer", analyzer)
workflow.add_agent("reporter", reporter)
# Connect stages sequentially
workflow.connect("cleaner", "clean_data", "analyzer")
workflow.connect("analyzer", "analysis_results", "reporter")
# Execute
await workflow.start()
# Trigger workflow through first agent
await cleaner.run("Clean this data: " + str(raw_data))
# Data automatically flows: cleaner -> analyzer -> reporter
await workflow.stop()
Parallel Processing
Multiple agents can subscribe to the same channel for parallel processing:
# Create parallel processors
sentiment = SubstrateAgent(name="Sentiment", relay="sentiment_results")
classifier = SubstrateAgent(name="Classifier", relay="class_results")
aggregator = SubstrateAgent(name="Aggregator")
workflow = Workflow("Parallel Pipeline")
workflow.add_agent("prep", preprocessor)
workflow.add_agent("sentiment", sentiment)
workflow.add_agent("classifier", classifier)
workflow.add_agent("aggregator", aggregator)
# Parallel branches: preprocessor feeds both sentiment and classifier
workflow.connect("prep", "preprocessed", "sentiment")
workflow.connect("prep", "preprocessed", "classifier")
# Aggregator receives from both branches
workflow.connect("sentiment", "sentiment_results", "aggregator")
workflow.connect("classifier", "class_results", "aggregator")
Agent Pools (Horizontal Scaling)
Agent pools create multiple instances of the same agent for parallel processing. Tasks are distributed across pool instances automatically.
from daita.agents import SubstrateAgent
from daita.core.workflow import Workflow
# Agent factory function
def create_worker():
return SubstrateAgent(name="Worker", relay="processed")
# Add agent pool to workflow
workflow = Workflow("Scaled Pipeline")
workflow.add_agent_pool(
name="workers", # Pool name
agent_factory=create_worker, # Factory function
instances=5 # Number of agent instances
)
# Connect pool like a regular agent
workflow.add_agent("aggregator", SubstrateAgent(name="Aggregator"))
workflow.connect("workers", "processed", "aggregator")
# Start and process
await workflow.start()
# Trigger processing - workers receive data via relay channels
# and process autonomously using their tools
for item in data_items:
# Each worker autonomously processes items using its registered tools
await entry_agent.run(f"Process item: {item}")
await workflow.stop()
Key Points:
- Agent pools act like a single agent in workflow connections
- Tasks are distributed using round-robin scheduling
- Each instance has its own state and lifecycle
- Useful for CPU-intensive or I/O-bound operations
Reliability Configuration
Workflows support optional reliability features for production environments. These features ensure message delivery, handle failures gracefully, and prevent system overload.
Configuration Method
Configure reliability after creating the workflow:
workflow = Workflow("Production Pipeline")
workflow.configure_reliability(
preset="production", # Use predefined preset
acknowledgments=True, # Message acknowledgments
task_tracking=True, # Track task lifecycle
backpressure_control=True, # Prevent overload
circuit_breaker=False, # Circuit breaker pattern
retry_policy=None, # Custom retry policy
dead_letter_queue=False, # Failed message queue
transactional=False, # Transactional workflows
monitoring=False, # Advanced monitoring
alerting=False # Alerting system
)
Reliability Presets
Three predefined configurations for common scenarios:
Basic Preset:
workflow.configure_reliability(preset="basic")
- Message acknowledgments
- Task tracking
- Backpressure control
- 3 retry attempts
- Suitable for development and testing
Production Preset:
workflow.configure_reliability(preset="production")
- All basic features
- Circuit breaker pattern
- Dead letter queue
- Monitoring enabled
- 5 retry attempts
- Suitable for production deployments
Enterprise Preset:
workflow.configure_reliability(preset="enterprise")
- All production features
- Transactional workflows
- Alerting enabled
- Advanced monitoring
- Suitable for mission-critical systems
Custom Reliability Configuration
Override specific settings:
workflow.configure_reliability(
preset="production", # Start with production preset
circuit_breaker=False, # But disable circuit breaker
retry_policy=RetryPolicy(max_retries=10, delay=2.0) # Custom retries
)
Reliability Features Explained
Message Acknowledgments: Messages must be acknowledged after processing. Unacknowledged messages are retried.
Task Tracking: Each task has a lifecycle (pending, processing, completed, failed) tracked for visibility.
Backpressure Control: Agents reject new work when queue is full, preventing memory exhaustion.
Circuit Breaker: Automatically stops sending messages to failing agents, allowing recovery time.
Dead Letter Queue: Failed messages moved to special queue for manual inspection.
Transactional: Ensures all-or-nothing processing across multiple agents.
Monitoring: Collects detailed metrics on message processing and failures.
Alerting: Sends notifications when error thresholds are exceeded.
Monitoring and Observability
Communication Log
Access recent workflow communication events:
# Get recent communication (default: 20 events)
comm_log = workflow.get_communication_log(count=50)
for event in comm_log:
print(f"From: {event['from_agent']}")
print(f"To: {event['to_agent']}")
print(f"Channel: {event['channel']}")
print(f"Data: {event['data']}")
Workflow Statistics
Get comprehensive workflow statistics:
stats = workflow.get_stats()
print(f"Name: {stats['name']}")
print(f"Status: {stats['status']}")
print(f"Agent count: {stats['agent_count']}")
print(f"Connection count: {stats['connection_count']}")
print(f"Running time: {stats['running_time']} seconds")
print(f"Reliability enabled: {stats['reliability_enabled']}")
# If reliability is enabled
if stats.get('reliability_config'):
print(f"Acknowledgments: {stats['reliability_config']['acknowledgments']}")
print(f"Backpressure: {stats['reliability_config']['backpressure_control']}")
print(f"Pending messages: {stats.get('pending_messages', 0)}")
Workflow Status
Check workflow state:
# Workflow status enum
status = workflow.status
# WorkflowStatus.CREATED, STARTING, RUNNING, STOPPING, STOPPED, ERROR
Status Values:
CREATED: Workflow created but not startedSTARTING: In the process of starting agentsRUNNING: Active and processing dataSTOPPING: In the process of stoppingSTOPPED: Completely stoppedERROR: Error during lifecycle operation
Agent-Specific Reliability Stats
Get reliability statistics for individual agents:
# Only available when reliability is enabled
agent_stats = await workflow.get_agent_reliability_stats("processor")
print(f"Reliability enabled: {agent_stats['reliability_enabled']}")
print(f"Total tasks: {agent_stats.get('total_tasks', 0)}")
print(f"Tasks by status: {agent_stats.get('tasks_by_status', {})}")
print(f"Backpressure stats: {agent_stats.get('backpressure', {})}")
Pending Messages
View messages waiting for acknowledgment (reliability mode only):
pending = workflow.get_pending_messages()
for msg in pending:
print(f"Message ID: {msg['message_id']}")
print(f"Channel: {msg['channel']}")
print(f"Timeout: {msg['timeout']}")
Utility Methods
Query Methods
# Get specific agent
agent = workflow.get_agent("processor")
# List all agent names
agent_names = workflow.list_agents()
# Returns: ['processor', 'analyzer', 'reporter']
# List all connections
connections = workflow.list_connections()
# Returns: ['processor -> processed_data -> analyzer', ...]
# Get latest data from channel
channel_data = workflow.get_channel_data("processed_data", count=5)
Context Manager Support
Use workflows with async context managers for automatic cleanup:
async with Workflow("Pipeline") as workflow:
workflow.add_agent("processor", processor)
workflow.add_agent("analyzer", analyzer)
workflow.connect("processor", "data", "analyzer")
# Trigger workflow through processor
await processor.run("Analyze this data")
# Results automatically flow to analyzer via relay channel
# Workflow automatically stopped
Error Handling
Handle workflow errors with proper cleanup:
from daita.core.exceptions import WorkflowError
workflow = Workflow("Pipeline")
# ... setup agents and connections ...
try:
await workflow.start()
# Trigger workflow through entry agent
await entry_agent.run("Process this data")
except WorkflowError as e:
print(f"Workflow error: {e}")
print(f"Workflow: {e.workflow_name}")
print(f"Context: {e.context}")
finally:
await workflow.stop()
Common Errors:
WorkflowError: Base exception for workflow operations- Agent not found when connecting or injecting
- Workflow not running when injecting data
- Agent failures during processing (with reliability context)
Automatic Tracing
All workflow communication is automatically traced without configuration. The tracing system captures:
- Workflow lifecycle: Start, stop, configuration changes
- Agent communication: All messages between agents with timestamps
- Data injection: Entry points and initial data
- Errors: Failed communications and error context
- Reliability events: Acknowledgments, retries, backpressure events
Access traces programmatically:
from daita.core.tracing import get_trace_manager
trace_manager = get_trace_manager()
# Get workflow-specific metrics
metrics = trace_manager.get_workflow_metrics(workflow.name)
print(f"Total messages: {metrics['message_count']}")
print(f"Average latency: {metrics['avg_latency_ms']}ms")
Complete Example
from daita import SubstrateAgent
from daita.core.workflow import Workflow
from daita.core.tools import tool
from daita.config.base import FocusConfig
import asyncio
# Define tools
@tool
async def preprocess_text(text: str) -> dict:
"""Clean and normalize text data."""
return {"text": text.strip().lower(), "cleaned": True}
@tool
async def analyze_sentiment(text: str) -> dict:
"""Analyze sentiment of text."""
# Sentiment analysis logic
return {"sentiment": "positive", "confidence": 0.95}
@tool
async def classify_content(text: str) -> dict:
"""Classify content by category."""
# Classification logic
return {"category": "news", "confidence": 0.88}
# Create processing agents with tools
preprocessor = SubstrateAgent(
name="Preprocessor",
prompt="You are a text preprocessor. Clean and normalize text data.",
focus=FocusConfig(type="column", columns=["text", "metadata"]),
relay="cleaned"
)
preprocessor.register_tool(preprocess_text)
sentiment = SubstrateAgent(
name="Sentiment",
prompt="You are a sentiment analyzer. Analyze the emotional tone of text.",
relay="sentiment"
)
sentiment.register_tool(analyze_sentiment)
classifier = SubstrateAgent(
name="Classifier",
prompt="You are a content classifier. Categorize text into topics.",
relay="classified"
)
classifier.register_tool(classify_content)
summarizer = SubstrateAgent(
name="Summarizer",
prompt="You are a summarizer. Combine sentiment and classification results into clear summaries."
)
# Build workflow
workflow = Workflow("Content Analysis")
workflow.add_agent("prep", preprocessor)
workflow.add_agent("sentiment", sentiment)
workflow.add_agent("classifier", classifier)
workflow.add_agent("summarizer", summarizer)
# Connect agents (parallel processing)
workflow.connect("prep", "cleaned", "sentiment")
workflow.connect("prep", "cleaned", "classifier")
workflow.connect("sentiment", "sentiment", "summarizer")
workflow.connect("classifier", "classified", "summarizer")
# Configure reliability
workflow.configure_reliability(preset="basic")
# Execute
async def main():
try:
await workflow.start()
# Process multiple items
for item in data_items:
# Trigger workflow - preprocessor processes and publishes to relay
await preprocessor.run(f"Preprocess: {item['text']}")
# Results automatically flow through sentiment, classifier, and summarizer
# View stats
stats = workflow.get_stats()
print(f"Processed {stats['connection_count']} connections")
print(f"Running time: {stats['running_time']}s")
finally:
await workflow.stop()
asyncio.run(main())
Best Practices
Design Patterns
- Single Responsibility: Each agent should have one clear purpose
- Loose Coupling: Use relay channels instead of direct agent references
- Error Isolation: Handle errors within agents to prevent workflow crashes
- Idempotency: Design agents to handle duplicate messages safely
Performance Optimization
- Use Agent Pools: Deploy pools for CPU-intensive or I/O-bound operations
- Appropriate Reliability: Don't enable enterprise features for simple workflows
- Monitor Queue Sizes: Watch pending message counts to identify bottlenecks
- Use Focus: Filter data early with focus to reduce message sizes
Reliability Best Practices
- Always Cleanup: Use try/finally or context managers for proper shutdown
- Start Simple: Begin with basic reliability, upgrade to production/enterprise as needed
- Test Failures: Simulate agent failures to verify recovery behavior
- Monitor Dead Letters: Check dead letter queue regularly for systemic issues
Workflow Lifecycle
- Graceful Shutdown: Always call
stop()to clean up subscriptions - Check Status: Verify
workflow.status == WorkflowStatus.RUNNINGbefore injection - Error Recovery: Log workflow errors with full context for debugging
- Resource Management: Stop workflows when not in use to free resources
CLI Usage
Workflows can be created, tested, and deployed using the Daita CLI:
# Create new workflow file
daita create workflow my_pipeline
# Test workflow locally
daita test my_pipeline
# Deploy to cloud
daita push
# View deployment status
daita status
Workflows must export a create_workflow() function for CLI compatibility:
def create_workflow():
workflow = Workflow("My Pipeline")
# ... setup agents and connections ...
return workflow
Related Documentation
- SubstrateAgent - Agent creation and tools
- Plugins - Database and API integrations