Skip to main content

Plugin System Overview

The Daita plugin system provides clean, async interfaces for database, API, storage, search, and collaboration integrations. All plugins follow consistent patterns with context manager support for automatic resource cleanup.

Available Plugins

The framework includes 9 built-in plugins covering the most common integration needs:

Database Plugins

  • PostgreSQL - Relational database with connection pooling
  • MySQL - MySQL/MariaDB database operations
  • MongoDB - Document database with aggregation support
  • Snowflake - Cloud data warehouse with stage operations

API & Integration Plugins

  • REST - HTTP client with authentication and file transfer
  • S3 - AWS S3 object storage with format detection
  • Slack - Team messaging and collaboration

Search & Analytics

Messaging

  • Redis - Pub/sub messaging and caching

Common Characteristics

All plugins share these features:

  • Async/await pattern for non-blocking operations
  • Context manager support (async with) for automatic cleanup
  • Simple initialization with sensible defaults
  • Focus system integration for data filtering
  • Consistent error handling with descriptive messages
  • Automatic connection management with connection pooling where applicable
  • Agent tool integration - plugins can expose their operations as LLM-usable tools

Plugin Access Patterns

There are two main ways to use plugins:

Pass plugins to agents so the LLM can autonomously use their tools:

from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin

# Create plugin
db = PostgreSQLPlugin(host="localhost", database="mydb")

# Pass to agent
agent = SubstrateAgent(
name="Data Agent",
prompt="You are a database analyst.",
tools=[db]
)

await agent.start()

# Agent autonomously uses plugin tools
result = await agent.run("Show me all users")

2. Direct Usage

Use plugins directly in scripts without agents:

from daita.plugins import PostgreSQLPlugin

# Direct instantiation for scripts
async with PostgreSQLPlugin(host="localhost", database="mydb") as db:
results = await db.query("SELECT * FROM users")
print(results)

Context Managers

All plugins support context managers for automatic connection and cleanup:

from daita.plugins import RESTPlugin, PostgreSQLPlugin

# Automatic connection and disconnection
async with RESTPlugin(base_url="https://api.example.com") as api:
data = await api.get("/users")
# Connection automatically closed after block

# Manual connection management (when needed)
db = PostgreSQLPlugin(host="localhost", database="mydb")
await db.connect()
try:
results = await db.query("SELECT * FROM users")
finally:
await db.disconnect()

Best Practice: Always use context managers (async with) for automatic cleanup unless you have a specific reason to manage connections manually.

Note: When using plugins with agents, the agent manages the plugin lifecycle automatically - you don't need to use context managers or manually connect/disconnect.

Direct Plugin Instantiation

Plugins can be instantiated directly for scripts and utilities:

from daita.plugins import PostgreSQLPlugin, RESTPlugin, S3Plugin

# Direct instantiation (useful for scripts without agents)
async with PostgreSQLPlugin(host="localhost", database="mydb") as db:
results = await db.query("SELECT * FROM users")
print(results)

# Multiple plugins in a script
async with RESTPlugin(base_url="https://api.example.com") as api:
data = await api.get("/users")
print(data)

This is useful for scripts or utilities that don't need full agent capabilities or LLM integration.

Using Plugins in Agents

Plugins integrate seamlessly with agents by exposing their capabilities as tools that the LLM can autonomously use:

from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin

# Create plugin instance
db_plugin = PostgreSQLPlugin(host="localhost", database="analytics")

# Pass plugin to agent - its tools are automatically registered
agent = SubstrateAgent(
name="DB Agent",
prompt="You are a database analyst. Help users query and analyze data.",
llm_provider="openai",
model="gpt-4",
tools=[db_plugin]
)

await agent.start()

# Agent autonomously uses database tools from plugin
result = await agent.run("Show me all events for user 12345")
# Agent will use the query_database tool from PostgreSQLPlugin

Plugin Tools - LLM Integration

Plugins expose their operations as agent tools that LLMs can autonomously use. When you pass a plugin instance to an agent, the plugin's capabilities become available for the LLM to call:

from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin

# Create plugin instance with database connection
db = PostgreSQLPlugin(
host="localhost",
database="analytics",
username="user",
password="pass"
)

# Pass plugin to agent - tools are automatically registered
agent = SubstrateAgent(
name="Data Analyst",
prompt="You are a data analyst. Help users analyze data from the database.",
llm_provider="openai",
model="gpt-4",
tools=[db] # Plugin tools are now available to the LLM
)

await agent.start()

# LLM autonomously uses database tools
result = await agent.run("Analyze user activity for the past month")
# The LLM will automatically use query_database, list_tables, get_table_schema tools

Available Tools by Plugin

Different plugins expose different tools:

Database Plugins (PostgreSQL, MySQL, MongoDB, Snowflake):

  • query_database - Execute SELECT queries
  • list_tables / list_collections - List database objects
  • get_table_schema - Inspect table/collection structure
  • execute_sql - Run INSERT/UPDATE/DELETE operations
  • insert_document / update_documents / delete_documents (MongoDB)
  • list_warehouses / switch_warehouse - Warehouse management (Snowflake)
  • list_stages / upload_to_stage / load_from_stage - Data loading (Snowflake)

API & Storage Plugins (REST, S3):

  • http_get, http_post, http_put, http_delete (REST)
  • read_s3_file, write_s3_file, list_s3_objects, delete_s3_file (S3)

Communication Plugins (Slack):

  • send_slack_message - Send messages to channels
  • send_slack_summary - Send formatted agent summaries
  • list_slack_channels - List available channels

Search Plugins (Elasticsearch):

  • search_elasticsearch - Query documents
  • index_document - Index new documents
  • get_index_mapping - Get index schema

Tool Usage Example

from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin, SlackPlugin
import os

# Setup plugins
db = PostgreSQLPlugin(host="localhost", database="analytics")
slack_client = SlackPlugin(token=os.getenv("SLACK_BOT_TOKEN"))

# Create agent with multiple plugins
agent = SubstrateAgent(
name="Report Generator",
prompt="You are a report generator. Query databases and share insights via Slack.",
llm_provider="openai",
model="gpt-4",
tools=[db, slack_client]
)

await agent.start()

# LLM autonomously orchestrates operations across plugins
result = await agent.run(
"Query the top 10 users by activity and post a summary to #analytics"
)
# The LLM will:
# 1. Use query_database to get user data
# 2. Analyze the results
# 3. Use send_slack_message to post the summary

Checking Available Tools

You can inspect which tools a plugin provides:

from daita.plugins import PostgreSQLPlugin

db = PostgreSQLPlugin(host="localhost", database="mydb")

# Check if plugin has tools
if db.has_tools:
tools = db.get_tools()
for tool in tools:
print(f"Tool: {tool.name}")
print(f" Description: {tool.description}")
print(f" Category: {tool.category}")

# Example output:
# Tool: query_database
# Description: Execute a SQL SELECT query...
# Category: database
# Tool: list_tables
# Description: List all tables in the PostgreSQL database
# Category: database

Focus System Integration

The focus system filters tool results at the agent level, reducing token usage when plugins return large datasets:

from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin, S3Plugin
from daita.config.base import FocusConfig

# Focus filters plugin tool results before sending to LLM
agent = SubstrateAgent(
name="DB Agent",
prompt="You are a database analyst.",
tools=[PostgreSQLPlugin(host="localhost", database="mydb")],
focus=FocusConfig(
type="column",
columns=["user_id", "email", "created_at"] # Only these columns
)
)

await agent.start()

# When query_database tool returns 50 columns, focus filters to 3
# This reduces tokens sent to LLM by 94%
result = await agent.run("Show me all users")

Error Handling

All plugins raise RuntimeError with descriptive messages for:

  • Connection failures
  • Authentication errors
  • Missing dependencies
  • Operation failures
from daita.plugins import PostgreSQLPlugin

try:
async with PostgreSQLPlugin(host="invalid-host", database="test") as db:
results = await db.query("SELECT 1")
except RuntimeError as e:
print(f"Database error: {e}")
# Handle error appropriately

# When using with agents, errors are automatically traced
from daita import SubstrateAgent

agent = SubstrateAgent(
name="DB Agent",
tools=[PostgreSQLPlugin(host="localhost", database="mydb")]
)

await agent.start()

# Agent handles errors gracefully and reports them in results
result = await agent.run("Show me all users")

Environment-Based Configuration

Configure plugins using environment variables for production deployments:

import os
from daita import SubstrateAgent
from daita.plugins import PostgreSQLPlugin

db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"database": os.getenv("DB_NAME"),
"username": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD")
}

# Direct usage
async with PostgreSQLPlugin(**db_config) as db:
results = await db.query("SELECT * FROM users")

# Or with agents
db_plugin = PostgreSQLPlugin(**db_config)
agent = SubstrateAgent(
name="DB Agent",
prompt="You are a database analyst.",
tools=[db_plugin]
)

Best Practices

Connection Management

  1. Use context managers (async with) for direct plugin usage
  2. Let agents manage lifecycle when using plugins with agents
  3. Configure connection pools for high-throughput applications
  4. Set appropriate timeouts to prevent hanging operations

Performance

  1. Use bulk operations when inserting multiple records
  2. Leverage connection pooling for database plugins
  3. Apply focus early to reduce data transfer overhead
  4. Batch API requests when possible

Security

  1. Never hardcode credentials in source code
  2. Use environment variables or secret management systems
  3. Validate input data before passing to plugins
  4. Use parameterized queries to prevent SQL injection

Error Handling

  1. Handle connection errors gracefully with fallbacks
  2. Log errors with context for debugging
  3. Implement retries for transient failures
  4. Validate data before operations

Plugin Development

Want to create a custom plugin? Follow these patterns:

from daita.plugins.base import BasePlugin
from daita.core.tools import AgentTool
from typing import List, Dict, Any

class CustomPlugin(BasePlugin):
"""Custom plugin for external service integration."""

def __init__(self, api_key: str, **kwargs):
self._client = None
self.api_key = api_key
# Store configuration

async def connect(self):
"""Initialize connection."""
if self._client is not None:
return
# Setup connection
# self._client = SomeClient(api_key=self.api_key)

async def disconnect(self):
"""Close connection."""
if self._client:
# Cleanup
self._client = None

async def __aenter__(self):
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()

# Expose plugin capabilities as tools
def get_tools(self) -> List[AgentTool]:
"""Expose plugin operations as agent tools."""
return [
AgentTool(
name="custom_operation",
description="Perform a custom operation using the external service",
parameters={
"type": "object",
"properties": {
"input": {
"type": "string",
"description": "Input data for the operation"
}
},
"required": ["input"]
},
handler=self._tool_operation,
category="custom",
source="plugin",
plugin_name="Custom"
)
]

async def _tool_operation(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Tool handler for custom_operation."""
input_data = args.get("input")
# Perform operation
result = await self.do_operation(input_data)
return {"result": result}

async def do_operation(self, input_data: str):
"""Your custom operation logic."""
# Implementation here
return {"processed": input_data}

Key Points:

  • Inherit from BasePlugin
  • Implement get_tools() to expose capabilities as LLM tools
  • Support context managers (__aenter__ / __aexit__)
  • Handle connection lifecycle in connect() / disconnect()

Next Steps

Explore individual plugin documentation:

  • Database Operations: Start with PostgreSQL, MySQL, MongoDB, or Snowflake
  • API Integration: Check out REST for HTTP APIs
  • Cloud Storage: See S3 for object storage operations
  • Team Collaboration: Learn about Slack integration
  • Search & Analytics: Explore Elasticsearch
  • Messaging: Review Redis for pub/sub patterns