Workflows

Workflows orchestrate multiple agents using relay-based communication for multi-agent systems. Agents publish results to named channels, and the workflow automatically routes these messages to subscribed agents. All workflow communication is automatically traced without any configuration.

#Overview

A Workflow manages agent lifecycle, connections, and communication. It provides:

  • Relay-based messaging: Agents communicate through named channels instead of direct calls
  • Automatic routing: Workflow handles message delivery between agents
  • Lifecycle management: Coordinated start/stop of all agents
  • Automatic tracing: All communication is traced for observability
  • Reliability features: Optional message acknowledgments, retries, and backpressure control
  • Horizontal scaling: Agent pools for parallel processing

#Core Concepts

#Relay Channels

Relay channels are named communication pathways. Agents publish messages to channels, and other agents subscribe to receive those messages.



Key Points:

  • Channels are identified by string names (e.g., "processed_data", "analysis_results")
  • Multiple agents can subscribe to the same channel
  • Messages are delivered to all subscribers
  • Channels are managed by the RelayManager (global singleton)

#Workflow Execution Flow

  1. Create agents and workflow
  2. Add agents to workflow with add_agent()
  3. Connect agents via relay channels with connect()
  4. Start workflow (starts all agents and sets up subscriptions)
  5. Inject data into entry point agent(s)
  6. Process data flows automatically through connected agents
  7. Stop workflow (stops agents and cleans up subscriptions)

#Basic Workflow

python
from daita import Agent
from daita.core.workflow import Workflow
 
# Create agents
processor = Agent(
    name="Processor",
    relay="processed_data"  # Publishes to this channel
)
 
analyzer = Agent(name="Analyzer")
 
# Create workflow
workflow = Workflow("Data Pipeline")
 
# Add agents
workflow.add_agent("processor", processor)
workflow.add_agent("analyzer", analyzer)
 
# Connect: processor -> processed_data channel -> analyzer
workflow.connect("processor", "processed_data", "analyzer")
 
# Start workflow
await workflow.start()
 
# Trigger workflow by having first agent process data
# Agents autonomously decide how to process based on their tools and prompts
await processor.run("Process this data: [1, 2, 3]")
 
# Stop workflow
await workflow.stop()

#Constructor Parameters

python
Workflow(
    name: str,
    project_id: Optional[str] = None,
    relay_manager: Optional[RelayManager] = None
)

Parameters:

  • name (str): Workflow name for identification and tracing
  • project_id (str): Optional project identifier for multi-project environments
  • relay_manager (RelayManager): Custom relay manager (uses global singleton if not provided)

#Core Methods

#Adding Agents

Add individual agents to the workflow:

python
workflow.add_agent(name: str, agent: Any) -> Workflow

Returns the workflow for method chaining.

#Connecting Agents

Connect agents through relay channels:

python
workflow.connect(
    from_agent: str,      # Source agent name
    channel: str,         # Relay channel name
    to_agent: str         # Destination agent name
) -> Workflow

The connection defines how data flows between agents. When from_agent publishes to channel, the data is automatically routed to to_agent which receives it via the receive_message() method and processes it autonomously using its tools.

Returns the workflow for method chaining.

#Starting and Stopping

Start the workflow (starts all agents and sets up relay subscriptions):

python
await workflow.start()

Stop the workflow (stops all agents and cleans up subscriptions):

python
await workflow.stop()

Always use try/finally or context managers to ensure proper cleanup.

#Triggering Workflows

Option 1: Direct Agent Execution (Recommended)

Trigger workflows by calling run() on the entry point agent:

python
# Start workflow
await workflow.start()
 
# Trigger via first agent - it processes and publishes to its relay channel
result = await entry_agent.run("Process this dataset")
 
# Results automatically flow through connected agents

Option 2: Legacy inject_data() Method

For backward compatibility, you can use inject_data() (requires agent to have process() method):

python
await workflow.inject_data(
    agent_name: str,
    data: Any
)

Note: Most modern agents use run() instead of process(), so Option 1 is preferred.

#Multi-Stage Pipeline

Create pipelines with multiple processing stages. Data flows sequentially through connected agents.

python
from daita import Agent
from daita.core.workflow import Workflow
from daita.core.tools import tool
 
# Define tools for each stage
@tool
async def clean_data(data: dict) -> dict:
    """Clean and normalize data."""
    # Cleaning logic
    return {"cleaned": data}
 
@tool
async def analyze_data(data: dict) -> dict:
    """Analyze cleaned data."""
    # Analysis logic
    return {"analysis": "results"}
 
# Stage 1: Data cleaning
cleaner = Agent(
    name="Cleaner",
    prompt="You are a data cleaning expert. Clean and normalize data.",
    relay="clean_data"
)
cleaner.register_tool(clean_data)
 
# Stage 2: Analysis
analyzer = Agent(
    name="Analyzer",
    prompt="You are a data analyst. Analyze cleaned data and extract insights.",
    relay="analysis_results"
)
analyzer.register_tool(analyze_data)
 
# Stage 3: Report generation
reporter = Agent(
    name="Reporter",
    prompt="You are a report writer. Generate clear, concise reports."
)
 
# Build workflow
workflow = Workflow("Analysis Pipeline")
workflow.add_agent("cleaner", cleaner)
workflow.add_agent("analyzer", analyzer)
workflow.add_agent("reporter", reporter)
 
# Connect stages sequentially
workflow.connect("cleaner", "clean_data", "analyzer")
workflow.connect("analyzer", "analysis_results", "reporter")
 
# Execute
await workflow.start()
 
# Trigger workflow through first agent
await cleaner.run("Clean this data: " + str(raw_data))
 
# Data automatically flows: cleaner -> analyzer -> reporter
await workflow.stop()

#Parallel Processing

Multiple agents can subscribe to the same channel for parallel processing:

python
# Create parallel processors
sentiment = Agent(name="Sentiment", relay="sentiment_results")
classifier = Agent(name="Classifier", relay="class_results")
aggregator = Agent(name="Aggregator")
 
workflow = Workflow("Parallel Pipeline")
workflow.add_agent("prep", preprocessor)
workflow.add_agent("sentiment", sentiment)
workflow.add_agent("classifier", classifier)
workflow.add_agent("aggregator", aggregator)
 
# Parallel branches: preprocessor feeds both sentiment and classifier
workflow.connect("prep", "preprocessed", "sentiment")
workflow.connect("prep", "preprocessed", "classifier")
 
# Aggregator receives from both branches
workflow.connect("sentiment", "sentiment_results", "aggregator")
workflow.connect("classifier", "class_results", "aggregator")

#Agent Pools (Horizontal Scaling)

Agent pools create multiple instances of the same agent for parallel processing. Tasks are distributed across pool instances automatically.

python
from daita import Agent
from daita.core.workflow import Workflow
 
# Agent factory function
def create_worker():
    return Agent(name="Worker", relay="processed")
 
# Add agent pool to workflow
workflow = Workflow("Scaled Pipeline")
workflow.add_agent_pool(
    name="workers",           # Pool name
    agent_factory=create_worker,  # Factory function
    instances=5               # Number of agent instances
)
 
# Connect pool like a regular agent
workflow.add_agent("aggregator", Agent(name="Aggregator"))
workflow.connect("workers", "processed", "aggregator")
 
# Start and process
await workflow.start()
 
# Trigger processing - workers receive data via relay channels
# and process autonomously using their tools
for item in data_items:
    # Each worker autonomously processes items using its registered tools
    await entry_agent.run(f"Process item: {item}")
 
await workflow.stop()

Key Points:

  • Agent pools act like a single agent in workflow connections
  • Tasks are distributed using round-robin scheduling
  • Each instance has its own state and lifecycle
  • Useful for CPU-intensive or I/O-bound operations

#Reliability Configuration

For production workflows, enable reliability features to ensure message delivery and handle failures:

python
# Use predefined presets for common scenarios
workflow.configure_reliability(preset="basic")        # Development/testing
workflow.configure_reliability(preset="production")   # Production deployments
workflow.configure_reliability(preset="enterprise")   # Mission-critical systems

Available Presets:

  • basic: Message acknowledgments, task tracking, backpressure control, 3 retries
  • production: Basic features + circuit breaker, dead letter queue, monitoring, 5 retries
  • enterprise: Production features + transactional workflows, alerting, advanced monitoring

Most workflows don't need reliability features initially - start simple and add as needed.

#Monitoring

Monitor workflow execution with built-in methods:

python
# Get workflow statistics
stats = workflow.get_stats()
print(f"Status: {stats['status']}")
print(f"Agents: {stats['agent_count']}")
print(f"Running time: {stats['running_time']}s")
 
# View communication log
comm_log = workflow.get_communication_log(count=20)
for event in comm_log:
    print(f"{event['from_agent']} -> {event['to_agent']} via {event['channel']}")
 
# Check workflow status
if workflow.status == WorkflowStatus.RUNNING:
    print("Workflow is active")

#Context Manager

Use async context managers for automatic cleanup:

python
async with Workflow("Pipeline") as workflow:
    workflow.add_agent("processor", processor)
    workflow.add_agent("analyzer", analyzer)
    workflow.connect("processor", "data", "analyzer")
 
    await processor.run("Analyze this data")
# Workflow automatically stopped

#Error Handling

Handle workflow errors with proper cleanup:

python
from daita.core.exceptions import WorkflowError
 
workflow = Workflow("Pipeline")
# ... setup agents and connections ...
 
try:
    await workflow.start()
 
    # Trigger workflow through entry agent
    await entry_agent.run("Process this data")
 
except WorkflowError as e:
    print(f"Workflow error: {e}")
    print(f"Workflow: {e.workflow_name}")
    print(f"Context: {e.context}")
finally:
    await workflow.stop()

Common Errors:

  • WorkflowError: Base exception for workflow operations
  • Agent not found when connecting or injecting
  • Workflow not running when injecting data
  • Agent failures during processing (with reliability context)

#Automatic Tracing

All workflow communication is automatically traced without configuration. The tracing system captures:

  • Workflow lifecycle: Start, stop, configuration changes
  • Agent communication: All messages between agents with timestamps
  • Data injection: Entry points and initial data
  • Errors: Failed communications and error context
  • Reliability events: Acknowledgments, retries, backpressure events

Access traces programmatically:

python
from daita.core.tracing import get_trace_manager
 
trace_manager = get_trace_manager()
 
# Get workflow-specific metrics
metrics = trace_manager.get_workflow_metrics(workflow.name)
print(f"Total messages: {metrics['message_count']}")
print(f"Average latency: {metrics['avg_latency_ms']}ms")

#Best Practices

Design:

  • Give each agent a single, clear responsibility
  • Use relay channels instead of direct agent references for loose coupling
  • Handle errors within agents to prevent workflow crashes
  • Design agents to be idempotent (safe to retry)

Performance:

  • Use agent pools for CPU-intensive or I/O-bound operations
  • Start with no reliability features, add as needed
  • Monitor pending message counts to identify bottlenecks
  • Use focus to filter data early and reduce message sizes

Lifecycle:

  • Always use try/finally or context managers for cleanup
  • Call stop() to clean up subscriptions properly
  • Check workflow.status == WorkflowStatus.RUNNING before triggering
  • Stop workflows when not in use to free resources

#Next Steps

  • Agent - Agent creation and tools
  • Tools - Creating custom tools
  • Plugins - Database and API integrations
  • Deployment - Deploy workflows to production