Back to Examples
Advanced

Multi-Agent Orchestration with Workflows

Build intelligent multi-agent systems with automatic task routing, parallel execution, and complex workflow DAGs

Multi-AgentOrchestrationWorkflows

#Overview

Learn how to coordinate multiple specialized agents using the Orchestrator plugin. This example demonstrates intelligent task routing, parallel execution, sequential workflows, and complex dependency graphs - everything you need to build sophisticated multi-agent systems.

#What You'll Learn

  • Creating specialized agents with distinct capabilities
  • Setting up the orchestrator plugin with auto-registration
  • LLM-based intelligent task routing
  • Parallel task execution across multiple agents
  • Sequential workflow coordination
  • Building complex workflow DAGs with dependencies
  • Mixing explicit and automatic agent assignment

#Prerequisites

  • Solid understanding of basic agent creation
  • Experience with custom tools and plugins
  • Familiarity with async/await in Python
  • Understanding of workflow and orchestration concepts
  • LLM API key (OpenAI or Anthropic)

#The Multi-Agent Orchestration Problem

Scenario: You're building a financial analysis system that needs:

  • Data Analyst: SQL queries and quantitative analysis
  • Report Writer: Document generation and summaries
  • Risk Analyzer: Risk assessment and compliance

Challenges without orchestration:

  1. How do you route tasks to the right agent?
  2. How do you run tasks in parallel for speed?
  3. How do you coordinate multi-step workflows?
  4. How do you handle dependencies between tasks?

The orchestrator plugin solves all of these.

#Step 1: Create Specialized Agents

First, create agents with distinct capabilities:

python
from daita import Agent
from daita.core.tools import tool
import asyncio
 
# Define specialized tools
@tool
async def analyze_financial_data(query: str) -> dict:
    """Analyze financial data based on query."""
    return {
        "query": query,
        "revenue": 1250000,
        "profit_margin": 0.23,
        "growth_rate": 0.15,
        "analysis": f"Financial analysis for: {query}"
    }
 
@tool
async def generate_report(topic: str, data: str = "") -> dict:
    """Generate a formatted report on the given topic."""
    return {
        "topic": topic,
        "pages": 12,
        "sections": ["Executive Summary", "Analysis", "Recommendations"],
        "report_preview": f"Report on {topic}: Based on the data provided..."
    }
 
@tool
async def assess_risk(scenario: str) -> dict:
    """Assess risk level for a given scenario."""
    return {
        "scenario": scenario,
        "risk_level": "MEDIUM",
        "risk_score": 6.5,
        "factors": ["Market volatility", "Regulatory changes"],
        "recommendation": "Implement additional controls"
    }
 
# Create specialized agents
data_analyst = Agent(
    name="financial_data_analyst",
    model="gpt-4o-mini",
    prompt="""You are a financial data analyst specializing in data
    aggregation, SQL queries, and quantitative analysis. You provide
    precise numerical insights and data-driven recommendations."""
)
data_analyst.register_tool(analyze_financial_data)
 
report_writer = Agent(
    name="report_writer",
    model="gpt-4o-mini",
    prompt="""You are a professional report writer specializing in
    business documentation. You create clear, well-structured reports
    with executive summaries and actionable recommendations."""
)
report_writer.register_tool(generate_report)
 
risk_analyzer = Agent(
    name="risk_analyzer",
    model="gpt-4o-mini",
    prompt="""You are a risk assessment specialist. You evaluate
    business scenarios for potential risks, compliance issues, and
    provide mitigation strategies."""
)
risk_analyzer.register_tool(assess_risk)

Key points:

  • Each agent has a specialized prompt defining its expertise
  • Each agent has domain-specific tools
  • Agents are completely independent
  • Names identify agents in the orchestrator

#Step 2: Initialize the Orchestrator

Set up the orchestrator with auto-registration:

python
from daita.plugins import orchestrator
 
# Initialize orchestrator with all agents
orch = orchestrator(agents={
    "data_analyst": data_analyst,
    "report_writer": report_writer,
    "risk_analyzer": risk_analyzer
})
 
# Start all agents
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()

What happens:

  • Auto-registration: Agents are registered with capabilities inferred from tools and prompts
  • LLM-based routing: Orchestrator uses an LLM to intelligently route tasks
  • No manual configuration: No need to explicitly list capabilities
  • Dynamic discovery: Tasks are routed based on natural language descriptions

#Step 3: Intelligent Agent Discovery

Let the orchestrator find the right agent for each task:

python
async def main():
    orch = orchestrator(agents={
        "data_analyst": data_analyst,
        "report_writer": report_writer,
        "risk_analyzer": risk_analyzer
    })
 
    await data_analyst.start()
    await report_writer.start()
    await risk_analyzer.start()
 
    # Just describe the task - LLM figures out which agent
    print("Task: Analyze Q4 revenue trends")
    agent_id = await orch.find_agent("Analyze Q4 revenue trends")
    print(f"Selected Agent: {agent_id}")
    # Output: financial_data_analyst
 
    print("\nTask: Generate executive summary report")
    agent_id = await orch.find_agent("Generate executive summary report")
    print(f"Selected Agent: {agent_id}")
    # Output: report_writer
 
    print("\nTask: Assess compliance risk")
    agent_id = await orch.find_agent("Assess compliance risk for trading")
    print(f"Selected Agent: {agent_id}")
    # Output: risk_analyzer
 
    await data_analyst.stop()
    await report_writer.stop()
    await risk_analyzer.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

How it works:

  1. You provide a natural language task description
  2. Orchestrator uses LLM to analyze task requirements
  3. LLM matches task to agent capabilities (tools + prompts)
  4. Returns the best-fit agent ID
  5. Fallback to keyword matching if LLM unavailable

#Step 4: Parallel Task Execution

Execute multiple tasks concurrently for speed:

python
async def main():
    orch = orchestrator(agents={
        "data_analyst": data_analyst,
        "report_writer": report_writer,
        "risk_analyzer": risk_analyzer
    })
 
    await data_analyst.start()
    await report_writer.start()
    await risk_analyzer.start()
 
    # Define tasks - just strings with descriptions
    parallel_tasks = [
        "Calculate year-over-year revenue growth",
        "Generate monthly financial summary report",
        "Assess compliance risk for cryptocurrency trading"
    ]
 
    print(f"Executing {len(parallel_tasks)} tasks in parallel...")
    import time
    start_time = time.time()
 
    # Orchestrator auto-routes each task to the right agent
    result = await orch.run_parallel(parallel_tasks)
 
    end_time = time.time()
    total_time_ms = (end_time - start_time) * 1000
 
    if result["success"]:
        print(f"Completed {result['successful_tasks']} / {result['total_tasks']} tasks")
        print(f"Total Time: {total_time_ms:.0f}ms")
 
        for i, task_result in enumerate(result['results'], 1):
            print(f"\nTask {i}: {parallel_tasks[i-1]}")
            print(f"  Auto-routed to: {task_result['agent_id']}")
            print(f"  Success: {task_result['success']}")
            if task_result['success']:
                print(f"  Result: {task_result['result']}")
 
    await data_analyst.stop()
    await report_writer.stop()
    await risk_analyzer.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

Result structure:

python
{
    "success": True,
    "total_tasks": 3,
    "successful_tasks": 3,
    "failed_tasks": 0,
    "results": [
        {
            "agent_id": "financial_data_analyst",
            "success": True,
            "result": {...},
            "error": None
        },
        # ... more results
    ]
}

Benefits of parallel execution:

  • Speed: Tasks run concurrently
  • Auto-routing: Each task finds its optimal agent
  • Error isolation: One failure doesn't stop others
  • Simple API: Just pass task strings

#Step 5: Sequential Workflows

Coordinate multi-step workflows where steps depend on previous results:

python
async def main():
    orch = orchestrator(agents={
        "data_analyst": data_analyst,
        "report_writer": report_writer,
        "risk_analyzer": risk_analyzer
    })
 
    await data_analyst.start()
    await report_writer.start()
    await risk_analyzer.start()
 
    # Mix explicit assignment and auto-routing
    sequential_tasks = [
        # Explicit agent assignment
        {
            "task": "Analyze customer churn data",
            "agent_id": "data_analyst"
        },
        # Auto-routed based on task description
        "Create a report based on the analysis",
        # Auto-routed
        "Assess risks from the retention strategies"
    ]
 
    print("Executing sequential workflow...")
    start_time = time.time()
 
    result = await orch.run_sequential(sequential_tasks)
 
    end_time = time.time()
    total_time_ms = (end_time - start_time) * 1000
 
    if result["success"]:
        print(f"Completed {result['successful_tasks']} / {result['total_tasks']} stages")
        print(f"Total Time: {total_time_ms:.0f}ms")
 
        for i, task_result in enumerate(result['results'], 1):
            task_input = sequential_tasks[i-1]
            task_desc = task_input['task'] if isinstance(task_input, dict) else task_input
            routing_type = "Explicit" if isinstance(task_input, dict) and 'agent_id' in task_input else "Auto-routed"
 
            print(f"\nStage {i}: {task_desc}")
            print(f"  {routing_type} to: {task_result['agent_id']}")
            print(f"  Success: {task_result['success']}")
 
    await data_analyst.stop()
    await report_writer.stop()
    await risk_analyzer.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

Sequential vs Parallel:

FeatureSequentialParallel
ExecutionOne at a timeAll at once
Use caseSteps depend on each otherIndependent tasks
SpeedSlower (waits for each)Faster (concurrent)
ContextLater steps see earlier resultsNo shared context

#Step 6: Complex Workflow DAGs

Build sophisticated workflows with dependencies:

python
async def main():
    orch = orchestrator(agents={
        "data_analyst": data_analyst,
        "report_writer": report_writer,
        "risk_analyzer": risk_analyzer
    })
 
    await data_analyst.start()
    await report_writer.start()
    await risk_analyzer.start()
 
    # Define workflow with dependencies
    workflow_steps = [
        {
            "step_id": "data_collection",
            "task": "Collect and analyze Q4 financial metrics",
            "depends_on": []
            # No agent_id - will be auto-routed!
        },
        {
            "step_id": "risk_assessment",
            "task": "Evaluate financial risks and compliance issues",
            "depends_on": [],
            "agent_id": "risk_analyzer"  # Explicit assignment
        },
        {
            "step_id": "final_report",
            "task": "Create comprehensive Q4 report with analysis and risk assessment",
            "depends_on": ["data_collection", "risk_assessment"]
            # No agent_id - will be auto-routed!
        }
    ]
 
    # Create the workflow
    workflow_result = await orch.create_workflow(
        name="q4_review",
        steps=workflow_steps
    )
 
    if workflow_result["success"]:
        print(f"Workflow created: {workflow_result['workflow_id']}")
        print(f"Total steps: {workflow_result['steps_count']}")
 
        print("\nWorkflow structure:")
        for step in workflow_steps:
            assignment = f"agent_id={step['agent_id']}" if 'agent_id' in step else "auto-routed"
            deps = step.get('depends_on', [])
            dep_str = f" (depends on: {', '.join(deps)})" if deps else ""
            print(f"  - {step['step_id']}: {assignment}{dep_str}")
 
        # Execute the workflow
        print("\nExecuting workflow...")
        start_time = time.time()
 
        execution_result = await orch.run_workflow("q4_review")
 
        end_time = time.time()
        exec_time_ms = (end_time - start_time) * 1000
 
        if execution_result["success"]:
            print(f"\nWorkflow completed!")
            print(f"  Completed steps: {execution_result['completed_steps']}/{execution_result['total_steps']}")
            print(f"  Total time: {exec_time_ms:.0f}ms")
 
            # Show execution order and results
            for step_id, step_result in execution_result.get('step_results', {}).items():
                print(f"\n  Step: {step_id}")
                print(f"    Agent: {step_result.get('agent_id', 'N/A')}")
                print(f"    Success: {step_result.get('success', False)}")
 
    await data_analyst.stop()
    await report_writer.stop()
    await risk_analyzer.stop()
 
if __name__ == "__main__":
    asyncio.run(main())

Workflow DAG features:

  • Dependency management: Steps wait for their dependencies
  • Parallel optimization: Independent steps run concurrently
  • Auto-routing: agent_id is optional
  • Named workflows: Reusable workflow definitions
  • Error propagation: Failed dependencies prevent downstream steps

Execution flow:

python
┌─────────────────┐     ┌─────────────────┐
│ data_collection │     │ risk_assessment │
│  (auto-routed)  │     │   (explicit)    │
└────────┬────────┘     └────────┬────────┘
         │                       │
         └───────┬───────────────┘

         ┌───────▼────────┐
         │  final_report  │
         │ (auto-routed)  │
         └────────────────┘

#Step 7: Understanding Task Routing

How the orchestrator decides which agent to use:

python
# 1. Explicit routing (you specify agent_id)
task_with_agent = {
    "task": "Analyze revenue",
    "agent_id": "data_analyst"
}
 
# 2. Auto-routing (LLM decides based on task description)
task_auto = "Analyze revenue trends for Q4"
 
# 3. Mixed routing
mixed_tasks = [
    {"task": "Fetch data", "agent_id": "data_analyst"},  # Explicit
    "Generate a report from the data",  # Auto-routed
]

LLM-based routing process:

  1. Orchestrator analyzes task description
  2. Reviews each agent's capabilities (tools + prompts)
  3. LLM determines best fit based on:
    • Tool descriptions and parameters
    • Agent prompt expertise
    • Task requirements
  4. Returns agent ID
  5. Falls back to keyword matching if LLM unavailable

#Complete Example

Full working multi-agent orchestration system:

python
from daita import Agent
from daita.plugins import orchestrator
from daita.core.tools import tool
import asyncio
import time
 
 
# Define specialized tools
@tool
async def analyze_financial_data(query: str) -> dict:
    """Analyze financial data based on query."""
    return {
        "query": query,
        "revenue": 1250000,
        "profit_margin": 0.23,
        "growth_rate": 0.15,
        "analysis": f"Financial analysis for: {query}"
    }
 
 
@tool
async def generate_report(topic: str, data: str = "") -> dict:
    """Generate a formatted report on the given topic."""
    return {
        "topic": topic,
        "pages": 12,
        "sections": ["Executive Summary", "Analysis", "Recommendations"],
        "report_preview": f"Report on {topic}: Based on the data provided..."
    }
 
 
@tool
async def assess_risk(scenario: str) -> dict:
    """Assess risk level for a given scenario."""
    return {
        "scenario": scenario,
        "risk_level": "MEDIUM",
        "risk_score": 6.5,
        "factors": ["Market volatility", "Regulatory changes"],
        "recommendation": "Implement additional controls"
    }
 
 
async def main():
    print("="*70)
    print("MULTI-AGENT ORCHESTRATION")
    print("="*70)
 
    # Create specialized agents
    print("\n1. Creating specialized agents...")
 
    data_analyst = Agent(
        name="financial_data_analyst",
        model="gpt-4o-mini",
        prompt="""You are a financial data analyst specializing in data
        aggregation, SQL queries, and quantitative analysis."""
    )
    data_analyst.register_tool(analyze_financial_data)
    print("  ✓ Financial Data Analyst")
 
    report_writer = Agent(
        name="report_writer",
        model="gpt-4o-mini",
        prompt="""You are a professional report writer specializing in
        business documentation."""
    )
    report_writer.register_tool(generate_report)
    print("  ✓ Report Writer")
 
    risk_analyzer = Agent(
        name="risk_analyzer",
        model="gpt-4o-mini",
        prompt="""You are a risk assessment specialist."""
    )
    risk_analyzer.register_tool(assess_risk)
    print("  ✓ Risk Analyzer")
 
    # Initialize orchestrator
    print("\n2. Initializing orchestrator...")
    orch = orchestrator(agents={
        "data_analyst": data_analyst,
        "report_writer": report_writer,
        "risk_analyzer": risk_analyzer
    })
    print(f"  ✓ Orchestrator ready with {len(orch._agents)} agents")
 
    # Start all agents
    await data_analyst.start()
    await report_writer.start()
    await risk_analyzer.start()
 
    # Agent discovery
    print("\n3. Testing agent discovery...")
    agent_id = await orch.find_agent("Analyze Q4 revenue trends")
    print(f"  Task: 'Analyze Q4 revenue trends' → {agent_id}")
 
    agent_id = await orch.find_agent("Generate executive summary")
    print(f"  Task: 'Generate executive summary' → {agent_id}")
 
    # Parallel execution
    print("\n4. Executing parallel tasks...")
    parallel_tasks = [
        "Calculate year-over-year revenue growth",
        "Generate monthly financial summary report",
        "Assess compliance risk for cryptocurrency trading"
    ]
 
    start_time = time.time()
    result = await orch.run_parallel(parallel_tasks)
    elapsed = (time.time() - start_time) * 1000
 
    if result["success"]:
        print(f"  ✓ Completed {result['successful_tasks']}/{result['total_tasks']} tasks in {elapsed:.0f}ms")
 
    # Sequential workflow
    print("\n5. Executing sequential workflow...")
    sequential_tasks = [
        {"task": "Analyze customer churn data", "agent_id": "data_analyst"},
        "Create a report based on the analysis",
        "Assess risks from the strategies"
    ]
 
    start_time = time.time()
    result = await orch.run_sequential(sequential_tasks)
    elapsed = (time.time() - start_time) * 1000
 
    if result["success"]:
        print(f"  ✓ Completed {result['successful_tasks']}/{result['total_tasks']} stages in {elapsed:.0f}ms")
 
    # Workflow DAG
    print("\n6. Creating and executing workflow DAG...")
    workflow_steps = [
        {
            "step_id": "data_collection",
            "task": "Collect and analyze Q4 financial metrics",
            "depends_on": []
        },
        {
            "step_id": "risk_assessment",
            "task": "Evaluate financial risks",
            "depends_on": [],
            "agent_id": "risk_analyzer"
        },
        {
            "step_id": "final_report",
            "task": "Create comprehensive Q4 report",
            "depends_on": ["data_collection", "risk_assessment"]
        }
    ]
 
    workflow_result = await orch.create_workflow(
        name="q4_review",
        steps=workflow_steps
    )
 
    if workflow_result["success"]:
        print(f"  ✓ Workflow created with {workflow_result['steps_count']} steps")
 
        start_time = time.time()
        execution_result = await orch.run_workflow("q4_review")
        elapsed = (time.time() - start_time) * 1000
 
        if execution_result["success"]:
            print(f"  ✓ Workflow completed: {execution_result['completed_steps']}/{execution_result['total_steps']} steps in {elapsed:.0f}ms")
 
    print("\n" + "="*70)
    print("ORCHESTRATION COMPLETE")
    print("="*70 + "\n")
 
    # Cleanup
    await data_analyst.stop()
    await report_writer.stop()
    await risk_analyzer.stop()
 
 
if __name__ == "__main__":
    asyncio.run(main())

#Framework Internals

How the orchestrator plugin works:

  1. Agent Registration: Agents registered with metadata (tools, prompts, capabilities)
  2. LLM-Based Router: Uses LLM to match tasks to agent capabilities
  3. Execution Engine: Manages parallel and sequential task execution
  4. Workflow Manager: Handles DAG creation, dependency resolution, execution
  5. Result Aggregation: Collects and structures results from all agents

Orchestrator architecture:

python
┌─────────────────────────────────────┐
│      Orchestrator Plugin            │
├─────────────────────────────────────┤
│  ┌──────────────────────────────┐  │
│  │   LLM-Based Task Router      │  │
│  └──────────────────────────────┘  │
│  ┌──────────────────────────────┐  │
│  │   Agent Registry             │  │
│  │   - data_analyst             │  │
│  │   - report_writer            │  │
│  │   - risk_analyzer            │  │
│  └──────────────────────────────┘  │
│  ┌──────────────────────────────┐  │
│  │   Execution Engine           │  │
│  │   - Parallel executor        │  │
│  │   - Sequential executor      │  │
│  │   - Workflow DAG executor    │  │
│  └──────────────────────────────┘  │
└─────────────────────────────────────┘

#Use Cases

When to use multi-agent orchestration:

  1. Specialized expertise: Different domains require different agents
  2. Parallel processing: Speed up by running tasks concurrently
  3. Complex workflows: Multi-step processes with dependencies
  4. Scalability: Distribute work across multiple agents
  5. Separation of concerns: Each agent focuses on its specialty

Real-world examples:

  • Customer support: Routing → Technical → Billing agents
  • Content pipeline: Research → Writing → Editing → Publishing agents
  • Data analysis: Collection → Analysis → Visualization → Reporting agents
  • DevOps: Monitoring → Diagnosis → Remediation → Reporting agents

#Best Practices

  1. Clear agent prompts: Define expertise clearly for better routing
  2. Descriptive tool names: Help LLM understand capabilities
  3. Meaningful task descriptions: Better descriptions = better routing
  4. Use parallel when possible: Speed up independent tasks
  5. Model dependencies explicitly: Use DAGs for complex workflows
  6. Error handling: Check success flags and handle failures
  7. Monitor execution times: Optimize slow steps

#Key Takeaways

  1. Orchestrator simplifies multi-agent systems with automatic routing
  2. LLM-based routing eliminates manual capability management
  3. Flexible task input: Strings (auto) or dicts (explicit)
  4. Parallel execution speeds up independent tasks
  5. Sequential workflows coordinate dependent steps
  6. Workflow DAGs handle complex dependencies
  7. Auto-registration infers capabilities from tools and prompts

#Next Steps