Orchestrator Plugin
Multi-agent coordination and task routing for complex workflows.
#Quick Start
from daita import Agent
from daita.plugins import orchestrator
# Create specialized agents
data_agent = Agent(name="Data Analyst", prompt="You analyze data and generate insights.")
report_agent = Agent(name="Report Writer", prompt="You write clear, concise reports.")
# Create orchestrator with registered agents
orch = orchestrator(agents={
"data_analyst": data_agent,
"report_writer": report_agent
})
# Coordinator agent uses orchestrator tools
coordinator = Agent(
name="Coordinator",
prompt="You coordinate tasks across multiple specialized agents.",
tools=[orch]
)
await coordinator.start()
result = await coordinator.run("Analyze sales data and generate a report")#Direct Usage
The plugin can be used directly for programmatic orchestration:
from daita.plugins import orchestrator
orch = orchestrator(agents={
"analyst": data_agent,
"writer": report_agent
})
# Route task to best agent
result = await orch.route_task("Analyze quarterly revenue trends")
print(f"Routed to: {result['agent_id']}")
print(f"Result: {result['result']}")#Configuration Parameters
orchestrator(
agents: Optional[Dict[str, Agent]] = None,
graph_store: Optional[Any] = None,
organization_id: Optional[int] = None,
llm_provider: Optional[Any] = None,
routing_model: Optional[str] = "gpt-4o-mini",
routing_api_key: Optional[str] = None,
enable_llm_routing: bool = True,
routing_cache_ttl: int = 300,
max_description_length: int = 500
)#Parameters
agents(Dict[str, Agent]): Dictionary of agent name to Agent instancegraph_store(Any): Optional graph storage for capability lookuporganization_id(int): Optional organization ID for multi-tenant graphsllm_provider(Any): LLM provider for routing decisionsrouting_model(str): Model to use for routing (default: "gpt-4o-mini")routing_api_key(str): API key for routing LLMenable_llm_routing(bool): Enable LLM-based routing (default: True)routing_cache_ttl(int): Cache TTL in seconds (default: 300)max_description_length(int): Max length for agent descriptions (default: 500)
#Agent Registration
#Auto-Registration
Agents are automatically registered when passed to the constructor:
from daita import Agent
from daita.plugins import orchestrator
# Create specialized agents
sql_agent = Agent(name="SQL Expert", prompt="You are a SQL database expert.")
python_agent = Agent(name="Python Expert", prompt="You are a Python programming expert.")
# Auto-register agents
orch = orchestrator(agents={
"sql": sql_agent,
"python": python_agent
})#Manual Registration
Register agents programmatically:
from daita.plugins import orchestrator
orch = orchestrator()
# Register with capabilities
orch.register_agent(
agent=sql_agent,
agent_id="sql_expert",
capabilities=["query_database", "schema_analysis"],
entity_types=["database", "table"]
)#Task Routing
#Find Best Agent
Find the most suitable agent for a task:
from daita.plugins import orchestrator
orch = orchestrator(agents={"analyst": data_agent, "writer": report_agent})
# Find best agent using LLM-based routing
agent_id = await orch.find_agent("Analyze customer churn data")
print(f"Best agent: {agent_id}")#Route and Execute
Route a task to the best agent and execute it:
from daita.plugins import orchestrator
orch = orchestrator(agents={"analyst": data_agent, "writer": report_agent})
# Route and execute automatically
result = await orch.route_task("Generate a summary of Q4 performance")
print(f"Agent used: {result['agent_id']}")
print(f"Result: {result['result']}")
print(f"Execution time: {result['execution_time_ms']}ms")#Parallel Execution
Run multiple tasks concurrently:
from daita.plugins import orchestrator
orch = orchestrator(agents={
"analyst": data_agent,
"writer": report_agent,
"reviewer": review_agent
})
# Execute tasks in parallel
result = await orch.run_parallel([
"Analyze sales trends",
"Analyze customer demographics",
"Analyze product performance"
])
print(f"Total tasks: {result['total_tasks']}")
print(f"Successful: {result['successful_tasks']}")
for task_result in result['results']:
print(f" - {task_result['agent_id']}: {task_result['result']}")#Explicit Agent Assignment
from daita.plugins import orchestrator
orch = orchestrator(agents={"analyst": data_agent, "writer": report_agent})
# Specify agents explicitly
result = await orch.run_parallel([
{"task": "Analyze data", "agent_id": "analyst"},
{"task": "Write report", "agent_id": "writer"}
])#Sequential Execution
Execute tasks in sequence, passing results between agents:
from daita.plugins import orchestrator
orch = orchestrator(agents={
"analyst": data_agent,
"writer": report_agent,
"reviewer": review_agent
})
# Execute sequentially - each agent receives previous results
result = await orch.run_sequential([
"Analyze the sales data",
"Write a report based on the analysis",
"Review the report for accuracy"
])
print(f"Final result: {result['final_result']}")
print(f"Completed tasks: {result['completed_tasks']}/{result['total_tasks']}")#Workflow DAGs
Define complex workflows as directed acyclic graphs:
from daita.plugins import orchestrator
orch = orchestrator(agents={
"data": data_agent,
"ml": ml_agent,
"report": report_agent
})
# Create workflow
await orch.create_workflow(
name="customer_churn_analysis",
steps=[
{
"step_id": "extract_data",
"task": "Extract customer data from database",
"depends_on": []
},
{
"step_id": "analyze_patterns",
"task": "Analyze churn patterns",
"depends_on": ["extract_data"]
},
{
"step_id": "train_model",
"task": "Train prediction model",
"depends_on": ["analyze_patterns"]
},
{
"step_id": "generate_report",
"task": "Generate churn report",
"depends_on": ["analyze_patterns", "train_model"]
}
]
)
# Execute workflow
result = await orch.run_workflow("customer_churn_analysis")
print(f"Workflow completed: {result['completed_steps']}/{result['total_steps']}")#Agent Performance
#Get Performance Metrics
Track agent execution statistics:
from daita.plugins import orchestrator
orch = orchestrator(agents={"analyst": data_agent})
# Get performance metrics
metrics = await orch.get_performance(
agent_id="analyst",
time_range_days=30
)
print(f"Total executions: {metrics['total_executions']}")
print(f"Success rate: {metrics['success_rate_percent']}%")
print(f"Avg execution time: {metrics['avg_execution_time_ms']}ms")#Get Agent Capabilities
Retrieve agent capabilities and specializations:
from daita.plugins import orchestrator
orch = orchestrator(agents={"analyst": data_agent})
# Get capabilities
capabilities = await orch.get_capabilities("analyst")
print(f"Capabilities: {capabilities['capabilities']}")
print(f"Entity types: {capabilities['entity_types']}")#Using with Agents
The Orchestrator plugin exposes coordination operations as tools:
from daita import Agent
from daita.plugins import orchestrator
# Create specialized agents
data_agent = Agent(name="Data Analyst", prompt="Analyze data and generate insights.")
report_agent = Agent(name="Report Writer", prompt="Write clear reports.")
ml_agent = Agent(name="ML Engineer", prompt="Build and train ML models.")
# Create orchestrator
orch = orchestrator(agents={
"data": data_agent,
"report": report_agent,
"ml": ml_agent
})
# Coordinator agent
coordinator = Agent(
name="Workflow Coordinator",
prompt="You coordinate complex workflows across specialized agents.",
llm_provider="openai",
model="gpt-4",
tools=[orch]
)
await coordinator.start()
# Coordinator autonomously orchestrates multi-agent workflow
result = await coordinator.run("""
We need to analyze customer churn:
1. Extract and analyze customer data
2. Train a prediction model
3. Generate a comprehensive report
Coordinate these tasks across the appropriate agents.
""")
print(result)
await coordinator.stop()#Available Tools
The Orchestrator plugin exposes these tools to agents:
| Tool | Description | Parameters |
|---|---|---|
| find_agent | Find best agent for task | task |
| route_task | Route and execute task | task, required_capabilities |
| run_parallel | Execute tasks in parallel | tasks |
| run_sequential | Execute tasks sequentially | tasks |
| create_workflow | Define workflow DAG | name, steps |
| run_workflow | Execute workflow | workflow_id, input_data |
| get_performance | Get agent performance metrics | agent_id, time_range_days |
| get_capabilities | Get agent capabilities | agent_id |
#LLM-Based Routing
The orchestrator uses LLM-based routing to intelligently match tasks to agents:
from daita.plugins import orchestrator
# Enable LLM routing with custom model
orch = orchestrator(
agents={"analyst": data_agent, "writer": report_agent},
enable_llm_routing=True,
routing_model="gpt-4o-mini",
routing_cache_ttl=600 # Cache for 10 minutes
)
# LLM automatically selects best agent based on task and agent descriptions
result = await orch.route_task("Analyze customer behavior patterns")#Best Practices
Agent Design:
- Create focused agents with clear specializations
- Write descriptive agent prompts for better routing
- Register agents with relevant capabilities
Task Routing:
- Let LLM routing handle agent selection when possible
- Use explicit agent assignment for critical workflows
- Monitor performance metrics to optimize routing
Workflows:
- Define clear dependencies in DAG workflows
- Handle failures gracefully with error recovery
- Test workflows with small datasets first
Performance:
- Use routing cache to reduce LLM calls
- Monitor execution times and success rates
- Scale horizontally by adding specialized agents
#Next Steps
- Workflows - Multi-agent orchestration patterns
- Catalog Plugin - Discover database schemas
- Lineage Plugin - Track data flows