#Overview
Learn how to coordinate multiple specialized agents using the Orchestrator plugin. This example demonstrates intelligent task routing, parallel execution, sequential workflows, and complex dependency graphs - everything you need to build sophisticated multi-agent systems.
#What You'll Learn
- Creating specialized agents with distinct capabilities
- Setting up the orchestrator plugin with auto-registration
- LLM-based intelligent task routing
- Parallel task execution across multiple agents
- Sequential workflow coordination
- Building complex workflow DAGs with dependencies
- Mixing explicit and automatic agent assignment
#Prerequisites
- Solid understanding of basic agent creation
- Experience with custom tools and plugins
- Familiarity with async/await in Python
- Understanding of workflow and orchestration concepts
- LLM API key (OpenAI or Anthropic)
#The Multi-Agent Orchestration Problem
Scenario: You're building a financial analysis system that needs:
- Data Analyst: SQL queries and quantitative analysis
- Report Writer: Document generation and summaries
- Risk Analyzer: Risk assessment and compliance
Challenges without orchestration:
- How do you route tasks to the right agent?
- How do you run tasks in parallel for speed?
- How do you coordinate multi-step workflows?
- How do you handle dependencies between tasks?
The orchestrator plugin solves all of these.
#Step 1: Create Specialized Agents
First, create agents with distinct capabilities:
from daita import Agent
from daita.core.tools import tool
import asyncio
# Define specialized tools
@tool
async def analyze_financial_data(query: str) -> dict:
"""Analyze financial data based on query."""
return {
"query": query,
"revenue": 1250000,
"profit_margin": 0.23,
"growth_rate": 0.15,
"analysis": f"Financial analysis for: {query}"
}
@tool
async def generate_report(topic: str, data: str = "") -> dict:
"""Generate a formatted report on the given topic."""
return {
"topic": topic,
"pages": 12,
"sections": ["Executive Summary", "Analysis", "Recommendations"],
"report_preview": f"Report on {topic}: Based on the data provided..."
}
@tool
async def assess_risk(scenario: str) -> dict:
"""Assess risk level for a given scenario."""
return {
"scenario": scenario,
"risk_level": "MEDIUM",
"risk_score": 6.5,
"factors": ["Market volatility", "Regulatory changes"],
"recommendation": "Implement additional controls"
}
# Create specialized agents
data_analyst = Agent(
name="financial_data_analyst",
model="gpt-4o-mini",
prompt="""You are a financial data analyst specializing in data
aggregation, SQL queries, and quantitative analysis. You provide
precise numerical insights and data-driven recommendations."""
)
data_analyst.register_tool(analyze_financial_data)
report_writer = Agent(
name="report_writer",
model="gpt-4o-mini",
prompt="""You are a professional report writer specializing in
business documentation. You create clear, well-structured reports
with executive summaries and actionable recommendations."""
)
report_writer.register_tool(generate_report)
risk_analyzer = Agent(
name="risk_analyzer",
model="gpt-4o-mini",
prompt="""You are a risk assessment specialist. You evaluate
business scenarios for potential risks, compliance issues, and
provide mitigation strategies."""
)
risk_analyzer.register_tool(assess_risk)Key points:
- Each agent has a specialized prompt defining its expertise
- Each agent has domain-specific tools
- Agents are completely independent
- Names identify agents in the orchestrator
#Step 2: Initialize the Orchestrator
Set up the orchestrator with auto-registration:
from daita.plugins import orchestrator
# Initialize orchestrator with all agents
orch = orchestrator(agents={
"data_analyst": data_analyst,
"report_writer": report_writer,
"risk_analyzer": risk_analyzer
})
# Start all agents
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()What happens:
- Auto-registration: Agents are registered with capabilities inferred from tools and prompts
- LLM-based routing: Orchestrator uses an LLM to intelligently route tasks
- No manual configuration: No need to explicitly list capabilities
- Dynamic discovery: Tasks are routed based on natural language descriptions
#Step 3: Intelligent Agent Discovery
Let the orchestrator find the right agent for each task:
async def main():
orch = orchestrator(agents={
"data_analyst": data_analyst,
"report_writer": report_writer,
"risk_analyzer": risk_analyzer
})
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()
# Just describe the task - LLM figures out which agent
print("Task: Analyze Q4 revenue trends")
agent_id = await orch.find_agent("Analyze Q4 revenue trends")
print(f"Selected Agent: {agent_id}")
# Output: financial_data_analyst
print("\nTask: Generate executive summary report")
agent_id = await orch.find_agent("Generate executive summary report")
print(f"Selected Agent: {agent_id}")
# Output: report_writer
print("\nTask: Assess compliance risk")
agent_id = await orch.find_agent("Assess compliance risk for trading")
print(f"Selected Agent: {agent_id}")
# Output: risk_analyzer
await data_analyst.stop()
await report_writer.stop()
await risk_analyzer.stop()
if __name__ == "__main__":
asyncio.run(main())How it works:
- You provide a natural language task description
- Orchestrator uses LLM to analyze task requirements
- LLM matches task to agent capabilities (tools + prompts)
- Returns the best-fit agent ID
- Fallback to keyword matching if LLM unavailable
#Step 4: Parallel Task Execution
Execute multiple tasks concurrently for speed:
async def main():
orch = orchestrator(agents={
"data_analyst": data_analyst,
"report_writer": report_writer,
"risk_analyzer": risk_analyzer
})
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()
# Define tasks - just strings with descriptions
parallel_tasks = [
"Calculate year-over-year revenue growth",
"Generate monthly financial summary report",
"Assess compliance risk for cryptocurrency trading"
]
print(f"Executing {len(parallel_tasks)} tasks in parallel...")
import time
start_time = time.time()
# Orchestrator auto-routes each task to the right agent
result = await orch.run_parallel(parallel_tasks)
end_time = time.time()
total_time_ms = (end_time - start_time) * 1000
if result["success"]:
print(f"Completed {result['successful_tasks']} / {result['total_tasks']} tasks")
print(f"Total Time: {total_time_ms:.0f}ms")
for i, task_result in enumerate(result['results'], 1):
print(f"\nTask {i}: {parallel_tasks[i-1]}")
print(f" Auto-routed to: {task_result['agent_id']}")
print(f" Success: {task_result['success']}")
if task_result['success']:
print(f" Result: {task_result['result']}")
await data_analyst.stop()
await report_writer.stop()
await risk_analyzer.stop()
if __name__ == "__main__":
asyncio.run(main())Result structure:
{
"success": True,
"total_tasks": 3,
"successful_tasks": 3,
"failed_tasks": 0,
"results": [
{
"agent_id": "financial_data_analyst",
"success": True,
"result": {...},
"error": None
},
# ... more results
]
}Benefits of parallel execution:
- Speed: Tasks run concurrently
- Auto-routing: Each task finds its optimal agent
- Error isolation: One failure doesn't stop others
- Simple API: Just pass task strings
#Step 5: Sequential Workflows
Coordinate multi-step workflows where steps depend on previous results:
async def main():
orch = orchestrator(agents={
"data_analyst": data_analyst,
"report_writer": report_writer,
"risk_analyzer": risk_analyzer
})
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()
# Mix explicit assignment and auto-routing
sequential_tasks = [
# Explicit agent assignment
{
"task": "Analyze customer churn data",
"agent_id": "data_analyst"
},
# Auto-routed based on task description
"Create a report based on the analysis",
# Auto-routed
"Assess risks from the retention strategies"
]
print("Executing sequential workflow...")
start_time = time.time()
result = await orch.run_sequential(sequential_tasks)
end_time = time.time()
total_time_ms = (end_time - start_time) * 1000
if result["success"]:
print(f"Completed {result['successful_tasks']} / {result['total_tasks']} stages")
print(f"Total Time: {total_time_ms:.0f}ms")
for i, task_result in enumerate(result['results'], 1):
task_input = sequential_tasks[i-1]
task_desc = task_input['task'] if isinstance(task_input, dict) else task_input
routing_type = "Explicit" if isinstance(task_input, dict) and 'agent_id' in task_input else "Auto-routed"
print(f"\nStage {i}: {task_desc}")
print(f" {routing_type} to: {task_result['agent_id']}")
print(f" Success: {task_result['success']}")
await data_analyst.stop()
await report_writer.stop()
await risk_analyzer.stop()
if __name__ == "__main__":
asyncio.run(main())Sequential vs Parallel:
| Feature | Sequential | Parallel |
|---|---|---|
| Execution | One at a time | All at once |
| Use case | Steps depend on each other | Independent tasks |
| Speed | Slower (waits for each) | Faster (concurrent) |
| Context | Later steps see earlier results | No shared context |
#Step 6: Complex Workflow DAGs
Build sophisticated workflows with dependencies:
async def main():
orch = orchestrator(agents={
"data_analyst": data_analyst,
"report_writer": report_writer,
"risk_analyzer": risk_analyzer
})
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()
# Define workflow with dependencies
workflow_steps = [
{
"step_id": "data_collection",
"task": "Collect and analyze Q4 financial metrics",
"depends_on": []
# No agent_id - will be auto-routed!
},
{
"step_id": "risk_assessment",
"task": "Evaluate financial risks and compliance issues",
"depends_on": [],
"agent_id": "risk_analyzer" # Explicit assignment
},
{
"step_id": "final_report",
"task": "Create comprehensive Q4 report with analysis and risk assessment",
"depends_on": ["data_collection", "risk_assessment"]
# No agent_id - will be auto-routed!
}
]
# Create the workflow
workflow_result = await orch.create_workflow(
name="q4_review",
steps=workflow_steps
)
if workflow_result["success"]:
print(f"Workflow created: {workflow_result['workflow_id']}")
print(f"Total steps: {workflow_result['steps_count']}")
print("\nWorkflow structure:")
for step in workflow_steps:
assignment = f"agent_id={step['agent_id']}" if 'agent_id' in step else "auto-routed"
deps = step.get('depends_on', [])
dep_str = f" (depends on: {', '.join(deps)})" if deps else ""
print(f" - {step['step_id']}: {assignment}{dep_str}")
# Execute the workflow
print("\nExecuting workflow...")
start_time = time.time()
execution_result = await orch.run_workflow("q4_review")
end_time = time.time()
exec_time_ms = (end_time - start_time) * 1000
if execution_result["success"]:
print(f"\nWorkflow completed!")
print(f" Completed steps: {execution_result['completed_steps']}/{execution_result['total_steps']}")
print(f" Total time: {exec_time_ms:.0f}ms")
# Show execution order and results
for step_id, step_result in execution_result.get('step_results', {}).items():
print(f"\n Step: {step_id}")
print(f" Agent: {step_result.get('agent_id', 'N/A')}")
print(f" Success: {step_result.get('success', False)}")
await data_analyst.stop()
await report_writer.stop()
await risk_analyzer.stop()
if __name__ == "__main__":
asyncio.run(main())Workflow DAG features:
- Dependency management: Steps wait for their dependencies
- Parallel optimization: Independent steps run concurrently
- Auto-routing: agent_id is optional
- Named workflows: Reusable workflow definitions
- Error propagation: Failed dependencies prevent downstream steps
Execution flow:
┌─────────────────┐ ┌─────────────────┐
│ data_collection │ │ risk_assessment │
│ (auto-routed) │ │ (explicit) │
└────────┬────────┘ └────────┬────────┘
│ │
└───────┬───────────────┘
│
┌───────▼────────┐
│ final_report │
│ (auto-routed) │
└────────────────┘#Step 7: Understanding Task Routing
How the orchestrator decides which agent to use:
# 1. Explicit routing (you specify agent_id)
task_with_agent = {
"task": "Analyze revenue",
"agent_id": "data_analyst"
}
# 2. Auto-routing (LLM decides based on task description)
task_auto = "Analyze revenue trends for Q4"
# 3. Mixed routing
mixed_tasks = [
{"task": "Fetch data", "agent_id": "data_analyst"}, # Explicit
"Generate a report from the data", # Auto-routed
]LLM-based routing process:
- Orchestrator analyzes task description
- Reviews each agent's capabilities (tools + prompts)
- LLM determines best fit based on:
- Tool descriptions and parameters
- Agent prompt expertise
- Task requirements
- Returns agent ID
- Falls back to keyword matching if LLM unavailable
#Complete Example
Full working multi-agent orchestration system:
from daita import Agent
from daita.plugins import orchestrator
from daita.core.tools import tool
import asyncio
import time
# Define specialized tools
@tool
async def analyze_financial_data(query: str) -> dict:
"""Analyze financial data based on query."""
return {
"query": query,
"revenue": 1250000,
"profit_margin": 0.23,
"growth_rate": 0.15,
"analysis": f"Financial analysis for: {query}"
}
@tool
async def generate_report(topic: str, data: str = "") -> dict:
"""Generate a formatted report on the given topic."""
return {
"topic": topic,
"pages": 12,
"sections": ["Executive Summary", "Analysis", "Recommendations"],
"report_preview": f"Report on {topic}: Based on the data provided..."
}
@tool
async def assess_risk(scenario: str) -> dict:
"""Assess risk level for a given scenario."""
return {
"scenario": scenario,
"risk_level": "MEDIUM",
"risk_score": 6.5,
"factors": ["Market volatility", "Regulatory changes"],
"recommendation": "Implement additional controls"
}
async def main():
print("="*70)
print("MULTI-AGENT ORCHESTRATION")
print("="*70)
# Create specialized agents
print("\n1. Creating specialized agents...")
data_analyst = Agent(
name="financial_data_analyst",
model="gpt-4o-mini",
prompt="""You are a financial data analyst specializing in data
aggregation, SQL queries, and quantitative analysis."""
)
data_analyst.register_tool(analyze_financial_data)
print(" ✓ Financial Data Analyst")
report_writer = Agent(
name="report_writer",
model="gpt-4o-mini",
prompt="""You are a professional report writer specializing in
business documentation."""
)
report_writer.register_tool(generate_report)
print(" ✓ Report Writer")
risk_analyzer = Agent(
name="risk_analyzer",
model="gpt-4o-mini",
prompt="""You are a risk assessment specialist."""
)
risk_analyzer.register_tool(assess_risk)
print(" ✓ Risk Analyzer")
# Initialize orchestrator
print("\n2. Initializing orchestrator...")
orch = orchestrator(agents={
"data_analyst": data_analyst,
"report_writer": report_writer,
"risk_analyzer": risk_analyzer
})
print(f" ✓ Orchestrator ready with {len(orch._agents)} agents")
# Start all agents
await data_analyst.start()
await report_writer.start()
await risk_analyzer.start()
# Agent discovery
print("\n3. Testing agent discovery...")
agent_id = await orch.find_agent("Analyze Q4 revenue trends")
print(f" Task: 'Analyze Q4 revenue trends' → {agent_id}")
agent_id = await orch.find_agent("Generate executive summary")
print(f" Task: 'Generate executive summary' → {agent_id}")
# Parallel execution
print("\n4. Executing parallel tasks...")
parallel_tasks = [
"Calculate year-over-year revenue growth",
"Generate monthly financial summary report",
"Assess compliance risk for cryptocurrency trading"
]
start_time = time.time()
result = await orch.run_parallel(parallel_tasks)
elapsed = (time.time() - start_time) * 1000
if result["success"]:
print(f" ✓ Completed {result['successful_tasks']}/{result['total_tasks']} tasks in {elapsed:.0f}ms")
# Sequential workflow
print("\n5. Executing sequential workflow...")
sequential_tasks = [
{"task": "Analyze customer churn data", "agent_id": "data_analyst"},
"Create a report based on the analysis",
"Assess risks from the strategies"
]
start_time = time.time()
result = await orch.run_sequential(sequential_tasks)
elapsed = (time.time() - start_time) * 1000
if result["success"]:
print(f" ✓ Completed {result['successful_tasks']}/{result['total_tasks']} stages in {elapsed:.0f}ms")
# Workflow DAG
print("\n6. Creating and executing workflow DAG...")
workflow_steps = [
{
"step_id": "data_collection",
"task": "Collect and analyze Q4 financial metrics",
"depends_on": []
},
{
"step_id": "risk_assessment",
"task": "Evaluate financial risks",
"depends_on": [],
"agent_id": "risk_analyzer"
},
{
"step_id": "final_report",
"task": "Create comprehensive Q4 report",
"depends_on": ["data_collection", "risk_assessment"]
}
]
workflow_result = await orch.create_workflow(
name="q4_review",
steps=workflow_steps
)
if workflow_result["success"]:
print(f" ✓ Workflow created with {workflow_result['steps_count']} steps")
start_time = time.time()
execution_result = await orch.run_workflow("q4_review")
elapsed = (time.time() - start_time) * 1000
if execution_result["success"]:
print(f" ✓ Workflow completed: {execution_result['completed_steps']}/{execution_result['total_steps']} steps in {elapsed:.0f}ms")
print("\n" + "="*70)
print("ORCHESTRATION COMPLETE")
print("="*70 + "\n")
# Cleanup
await data_analyst.stop()
await report_writer.stop()
await risk_analyzer.stop()
if __name__ == "__main__":
asyncio.run(main())#Framework Internals
How the orchestrator plugin works:
- Agent Registration: Agents registered with metadata (tools, prompts, capabilities)
- LLM-Based Router: Uses LLM to match tasks to agent capabilities
- Execution Engine: Manages parallel and sequential task execution
- Workflow Manager: Handles DAG creation, dependency resolution, execution
- Result Aggregation: Collects and structures results from all agents
Orchestrator architecture:
┌─────────────────────────────────────┐
│ Orchestrator Plugin │
├─────────────────────────────────────┤
│ ┌──────────────────────────────┐ │
│ │ LLM-Based Task Router │ │
│ └──────────────────────────────┘ │
│ ┌──────────────────────────────┐ │
│ │ Agent Registry │ │
│ │ - data_analyst │ │
│ │ - report_writer │ │
│ │ - risk_analyzer │ │
│ └──────────────────────────────┘ │
│ ┌──────────────────────────────┐ │
│ │ Execution Engine │ │
│ │ - Parallel executor │ │
│ │ - Sequential executor │ │
│ │ - Workflow DAG executor │ │
│ └──────────────────────────────┘ │
└─────────────────────────────────────┘#Use Cases
When to use multi-agent orchestration:
- Specialized expertise: Different domains require different agents
- Parallel processing: Speed up by running tasks concurrently
- Complex workflows: Multi-step processes with dependencies
- Scalability: Distribute work across multiple agents
- Separation of concerns: Each agent focuses on its specialty
Real-world examples:
- Customer support: Routing → Technical → Billing agents
- Content pipeline: Research → Writing → Editing → Publishing agents
- Data analysis: Collection → Analysis → Visualization → Reporting agents
- DevOps: Monitoring → Diagnosis → Remediation → Reporting agents
#Best Practices
- Clear agent prompts: Define expertise clearly for better routing
- Descriptive tool names: Help LLM understand capabilities
- Meaningful task descriptions: Better descriptions = better routing
- Use parallel when possible: Speed up independent tasks
- Model dependencies explicitly: Use DAGs for complex workflows
- Error handling: Check success flags and handle failures
- Monitor execution times: Optimize slow steps
#Key Takeaways
- Orchestrator simplifies multi-agent systems with automatic routing
- LLM-based routing eliminates manual capability management
- Flexible task input: Strings (auto) or dicts (explicit)
- Parallel execution speeds up independent tasks
- Sequential workflows coordinate dependent steps
- Workflow DAGs handle complex dependencies
- Auto-registration infers capabilities from tools and prompts
#Next Steps
- Multi-Plugin Architecture to combine different plugin types
- PostgreSQL Custom Handlers for specialized agent logic
- Database Query Agent for data-driven agents in orchestration