Orchestrator Plugin

Multi-agent coordination and task routing for complex workflows.

#Quick Start

python
from daita import Agent
from daita.plugins import orchestrator
 
# Create specialized agents
data_agent = Agent(name="Data Analyst", prompt="You analyze data and generate insights.")
report_agent = Agent(name="Report Writer", prompt="You write clear, concise reports.")
 
# Create orchestrator with registered agents
orch = orchestrator(agents={
    "data_analyst": data_agent,
    "report_writer": report_agent
})
 
# Coordinator agent uses orchestrator tools
coordinator = Agent(
    name="Coordinator",
    prompt="You coordinate tasks across multiple specialized agents.",
    tools=[orch]
)
 
await coordinator.start()
result = await coordinator.run("Analyze sales data and generate a report")

#Direct Usage

The plugin can be used directly for programmatic orchestration:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={
    "analyst": data_agent,
    "writer": report_agent
})
 
# Route task to best agent
result = await orch.route_task("Analyze quarterly revenue trends")
print(f"Routed to: {result['agent_id']}")
print(f"Result: {result['result']}")

#Configuration Parameters

python
orchestrator(
    agents: Optional[Dict[str, Agent]] = None,
    graph_store: Optional[Any] = None,
    organization_id: Optional[int] = None,
    llm_provider: Optional[Any] = None,
    routing_model: Optional[str] = "gpt-4o-mini",
    routing_api_key: Optional[str] = None,
    enable_llm_routing: bool = True,
    routing_cache_ttl: int = 300,
    max_description_length: int = 500
)

#Parameters

  • agents (Dict[str, Agent]): Dictionary of agent name to Agent instance
  • graph_store (Any): Optional graph storage for capability lookup
  • organization_id (int): Optional organization ID for multi-tenant graphs
  • llm_provider (Any): LLM provider for routing decisions
  • routing_model (str): Model to use for routing (default: "gpt-4o-mini")
  • routing_api_key (str): API key for routing LLM
  • enable_llm_routing (bool): Enable LLM-based routing (default: True)
  • routing_cache_ttl (int): Cache TTL in seconds (default: 300)
  • max_description_length (int): Max length for agent descriptions (default: 500)

#Agent Registration

#Auto-Registration

Agents are automatically registered when passed to the constructor:

python
from daita import Agent
from daita.plugins import orchestrator
 
# Create specialized agents
sql_agent = Agent(name="SQL Expert", prompt="You are a SQL database expert.")
python_agent = Agent(name="Python Expert", prompt="You are a Python programming expert.")
 
# Auto-register agents
orch = orchestrator(agents={
    "sql": sql_agent,
    "python": python_agent
})

#Manual Registration

Register agents programmatically:

python
from daita.plugins import orchestrator
 
orch = orchestrator()
 
# Register with capabilities
orch.register_agent(
    agent=sql_agent,
    agent_id="sql_expert",
    capabilities=["query_database", "schema_analysis"],
    entity_types=["database", "table"]
)

#Task Routing

#Find Best Agent

Find the most suitable agent for a task:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={"analyst": data_agent, "writer": report_agent})
 
# Find best agent using LLM-based routing
agent_id = await orch.find_agent("Analyze customer churn data")
print(f"Best agent: {agent_id}")

#Route and Execute

Route a task to the best agent and execute it:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={"analyst": data_agent, "writer": report_agent})
 
# Route and execute automatically
result = await orch.route_task("Generate a summary of Q4 performance")
 
print(f"Agent used: {result['agent_id']}")
print(f"Result: {result['result']}")
print(f"Execution time: {result['execution_time_ms']}ms")

#Parallel Execution

Run multiple tasks concurrently:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={
    "analyst": data_agent,
    "writer": report_agent,
    "reviewer": review_agent
})
 
# Execute tasks in parallel
result = await orch.run_parallel([
    "Analyze sales trends",
    "Analyze customer demographics",
    "Analyze product performance"
])
 
print(f"Total tasks: {result['total_tasks']}")
print(f"Successful: {result['successful_tasks']}")
for task_result in result['results']:
    print(f"  - {task_result['agent_id']}: {task_result['result']}")

#Explicit Agent Assignment

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={"analyst": data_agent, "writer": report_agent})
 
# Specify agents explicitly
result = await orch.run_parallel([
    {"task": "Analyze data", "agent_id": "analyst"},
    {"task": "Write report", "agent_id": "writer"}
])

#Sequential Execution

Execute tasks in sequence, passing results between agents:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={
    "analyst": data_agent,
    "writer": report_agent,
    "reviewer": review_agent
})
 
# Execute sequentially - each agent receives previous results
result = await orch.run_sequential([
    "Analyze the sales data",
    "Write a report based on the analysis",
    "Review the report for accuracy"
])
 
print(f"Final result: {result['final_result']}")
print(f"Completed tasks: {result['completed_tasks']}/{result['total_tasks']}")

#Workflow DAGs

Define complex workflows as directed acyclic graphs:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={
    "data": data_agent,
    "ml": ml_agent,
    "report": report_agent
})
 
# Create workflow
await orch.create_workflow(
    name="customer_churn_analysis",
    steps=[
        {
            "step_id": "extract_data",
            "task": "Extract customer data from database",
            "depends_on": []
        },
        {
            "step_id": "analyze_patterns",
            "task": "Analyze churn patterns",
            "depends_on": ["extract_data"]
        },
        {
            "step_id": "train_model",
            "task": "Train prediction model",
            "depends_on": ["analyze_patterns"]
        },
        {
            "step_id": "generate_report",
            "task": "Generate churn report",
            "depends_on": ["analyze_patterns", "train_model"]
        }
    ]
)
 
# Execute workflow
result = await orch.run_workflow("customer_churn_analysis")
print(f"Workflow completed: {result['completed_steps']}/{result['total_steps']}")

#Agent Performance

#Get Performance Metrics

Track agent execution statistics:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={"analyst": data_agent})
 
# Get performance metrics
metrics = await orch.get_performance(
    agent_id="analyst",
    time_range_days=30
)
 
print(f"Total executions: {metrics['total_executions']}")
print(f"Success rate: {metrics['success_rate_percent']}%")
print(f"Avg execution time: {metrics['avg_execution_time_ms']}ms")

#Get Agent Capabilities

Retrieve agent capabilities and specializations:

python
from daita.plugins import orchestrator
 
orch = orchestrator(agents={"analyst": data_agent})
 
# Get capabilities
capabilities = await orch.get_capabilities("analyst")
 
print(f"Capabilities: {capabilities['capabilities']}")
print(f"Entity types: {capabilities['entity_types']}")

#Using with Agents

The Orchestrator plugin exposes coordination operations as tools:

python
from daita import Agent
from daita.plugins import orchestrator
 
# Create specialized agents
data_agent = Agent(name="Data Analyst", prompt="Analyze data and generate insights.")
report_agent = Agent(name="Report Writer", prompt="Write clear reports.")
ml_agent = Agent(name="ML Engineer", prompt="Build and train ML models.")
 
# Create orchestrator
orch = orchestrator(agents={
    "data": data_agent,
    "report": report_agent,
    "ml": ml_agent
})
 
# Coordinator agent
coordinator = Agent(
    name="Workflow Coordinator",
    prompt="You coordinate complex workflows across specialized agents.",
    llm_provider="openai",
    model="gpt-4",
    tools=[orch]
)
 
await coordinator.start()
 
# Coordinator autonomously orchestrates multi-agent workflow
result = await coordinator.run("""
We need to analyze customer churn:
1. Extract and analyze customer data
2. Train a prediction model
3. Generate a comprehensive report
 
Coordinate these tasks across the appropriate agents.
""")
 
print(result)
await coordinator.stop()

#Available Tools

The Orchestrator plugin exposes these tools to agents:

ToolDescriptionParameters
find_agentFind best agent for tasktask
route_taskRoute and execute tasktask, required_capabilities
run_parallelExecute tasks in paralleltasks
run_sequentialExecute tasks sequentiallytasks
create_workflowDefine workflow DAGname, steps
run_workflowExecute workflowworkflow_id, input_data
get_performanceGet agent performance metricsagent_id, time_range_days
get_capabilitiesGet agent capabilitiesagent_id

#LLM-Based Routing

The orchestrator uses LLM-based routing to intelligently match tasks to agents:

python
from daita.plugins import orchestrator
 
# Enable LLM routing with custom model
orch = orchestrator(
    agents={"analyst": data_agent, "writer": report_agent},
    enable_llm_routing=True,
    routing_model="gpt-4o-mini",
    routing_cache_ttl=600  # Cache for 10 minutes
)
 
# LLM automatically selects best agent based on task and agent descriptions
result = await orch.route_task("Analyze customer behavior patterns")

#Best Practices

Agent Design:

  • Create focused agents with clear specializations
  • Write descriptive agent prompts for better routing
  • Register agents with relevant capabilities

Task Routing:

  • Let LLM routing handle agent selection when possible
  • Use explicit agent assignment for critical workflows
  • Monitor performance metrics to optimize routing

Workflows:

  • Define clear dependencies in DAG workflows
  • Handle failures gracefully with error recovery
  • Test workflows with small datasets first

Performance:

  • Use routing cache to reduce LLM calls
  • Monitor execution times and success rates
  • Scale horizontally by adding specialized agents

#Next Steps