PostgreSQL Plugin

Async PostgreSQL operations with automatic connection pooling built on `asyncpg`.

#Installation

bash
pip install asyncpg

#Quick Start

python
from daita import Agent
from daita.plugins import postgresql
 
# Create plugin
db = postgresql(
    host="localhost",
    database="mydb",
    username="user",
    password="password"
)
 
# Agent uses database tools autonomously
agent = Agent(
    name="DB Agent",
    prompt="You are a database analyst. Help users query and analyze data.",
    tools=[db]
)
 
await agent.start()
result = await agent.run("Show me all users")

#Direct Usage

The plugin can be used directly without agents for programmatic access. For comprehensive PostgreSQL documentation, see the official PostgreSQL docs. The main value of this plugin is agent integration - enabling LLMs to autonomously query and analyze database data.

#Connection Parameters

python
postgresql(
    host: str = "localhost",
    port: int = 5432,
    database: str = "",
    username: str = "",
    user: Optional[str] = None,  # Alias for username
    password: str = "",
    connection_string: Optional[str] = None,
    min_size: int = 1,
    max_size: int = 10,
    command_timeout: int = 60
)

#Parameters

  • host (str): Database host address
  • port (int): Database port (default: 5432)
  • database (str): Database name to connect to
  • username or user (str): Username for authentication
  • password (str): Password for authentication
  • connection_string (str): Full PostgreSQL connection string (overrides individual params)
  • min_size (int): Minimum connections in pool (default: 1)
  • max_size (int): Maximum connections in pool (default: 10)
  • command_timeout (int): Query timeout in seconds (default: 60)

#Connection Methods

Connection string or individual parameters:

python
from daita.plugins import postgresql
 
# Individual parameters
async with postgresql(
    host="localhost",
    database="analytics",
    username="analyst",
    password="secure_password"
) as db:
    results = await db.query("SELECT * FROM events")
 
# Connection string
async with postgresql(
    connection_string="postgresql://user:pass@localhost:5432/analytics"
) as db:
    results = await db.query("SELECT * FROM events")
 
# With connection pooling
async with postgresql(
    host="localhost",
    database="mydb",
    username="user",
    password="password",
    min_size=5,
    max_size=20,
    command_timeout=30
) as db:
    results = await db.query("SELECT * FROM users")

#Query Operations

SELECT queries - Use query() method, returns list of dictionaries:

python
from daita.plugins import postgresql
 
async with postgresql(host="localhost", database="analytics") as db:
    # Simple query
    users = await db.query("SELECT id, name, email FROM users")
    # Returns: [{"id": 1, "name": "John", "email": "john@example.com"}, ...]
 
    # Parameterized query (use $1, $2, $3, etc.)
    active_users = await db.query(
        "SELECT * FROM users WHERE status = $1 AND created_at > $2",
        ["active", "2024-01-01"]
    )

Parameter syntax: PostgreSQL uses $1, $2, $3 (not %s like MySQL)

#Data Modification

INSERT, UPDATE, DELETE - Use execute() method, returns row count:

python
from daita.plugins import postgresql
 
async with postgresql(host="localhost", database="app") as db:
    # Insert
    await db.execute(
        "INSERT INTO users (name, email, status) VALUES ($1, $2, $3)",
        ["Jane Doe", "jane@example.com", "active"]
    )
 
    # Update
    await db.execute(
        "UPDATE users SET last_login = NOW() WHERE id = $1",
        [user_id]
    )
 
    # Delete
    await db.execute(
        "DELETE FROM users WHERE status = $1 AND inactive_days > $2",
        ["inactive", 365]
    )

#Bulk Operations & Schema

Bulk insert:

python
from daita.plugins import postgresql
 
async with postgresql(host="localhost", database="app") as db:
    users_data = [
        {"name": "Alice", "email": "alice@example.com"},
        {"name": "Bob", "email": "bob@example.com"},
    ]
    count = await db.insert_many("users", users_data)

List tables:

python
from daita.plugins import postgresql
 
async with postgresql(host="localhost", database="app") as db:
    tables = await db.tables()
    # Returns: ["users", "orders", "products"]

#Using with Agents

PostgreSQL plugin exposes database operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import postgresql
import os
 
# Create database plugin
db = postgresql(
    host="localhost",
    database="analytics",
    username=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD")
)
 
# Pass plugin to agent - agent can now use database tools autonomously
agent = Agent(
    name="Data Analyst",
    prompt="You are a database analyst. Help users query and analyze data.",
    llm_provider="openai",
    model="gpt-4",
    tools=[db]
)
 
await agent.start()
 
# Agent autonomously uses database tools to answer questions
result = await agent.run("Show me the top 10 users by activity count")
 
# The agent will autonomously:
# 1. Use list_tables to explore database
# 2. Use get_table_schema to understand structure
# 3. Use query_database to fetch the data
# 4. Analyze and present results in natural language
 
await agent.stop()

#Direct Database Operations (Scripts)

For scripts that don't need agent capabilities:

python
from daita.plugins import postgresql
 
async with postgresql(
    host="localhost",
    database="analytics",
    username="analyst",
    password="password"
) as db:
    # Direct queries
    user = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
    events = await db.query(
        "SELECT * FROM events WHERE user_id = $1 LIMIT 100",
        [user_id]
    )
 
    print(f"User: {user[0] if user else None}")
    print(f"Events: {len(events)}")

#Available Tools

The PostgreSQL plugin exposes these tools to LLM agents:

ToolDescriptionParameters
query_databaseExecute SELECT queriessql (required), params (optional array)
execute_sqlRun INSERT/UPDATE/DELETEsql (required), params (optional array)
list_tablesList all tablesNone
get_table_schemaGet table column infotable_name (required)

Tool Categories: database Tool Source: plugin

#Tool Usage Example

python
from daita import Agent
from daita.plugins import postgresql
 
# Setup database with tool integration
db = postgresql(host="localhost", database="sales")
 
agent = Agent(
    name="Sales Analyzer",
    prompt="You are a sales analyst. Help users analyze sales data.",
    llm_provider="openai",
    model="gpt-4",
    tools=[db]
)
 
await agent.start()
 
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze sales data:
1. What tables are available?
2. Find total revenue by product category
3. Show top 5 customers by purchase amount
""")
 
# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - get_table_schema("sales") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns formatted analysis in natural language
 
print(result)
await agent.stop()

#Error Handling

python
from daita.plugins import postgresql
 
try:
    async with postgresql(host="localhost", database="mydb") as db:
        results = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
except RuntimeError as e:
    if "asyncpg not installed" in str(e):
        print("Install asyncpg: pip install asyncpg")
    elif "connection" in str(e).lower():
        print(f"Connection failed: {e}")
    else:
        print(f"Error: {e}")

#Best Practices

Connection Management:

  • Always use context managers (async with) for automatic cleanup
  • Configure pool size based on workload (min_size, max_size)
  • Set appropriate timeouts to prevent hanging operations

Security:

  • Use parameterized queries to prevent SQL injection ($1, $2, etc.)
  • Store credentials in environment variables, never hardcode
  • Use read-only accounts when possible

Performance:

  • Use insert_many() for bulk inserts instead of loops
  • Limit result sets to avoid memory issues
  • Configure connection pools for high-throughput apps

#Troubleshooting

IssueSolution
asyncpg not installedpip install asyncpg
Connection to database failedCheck database is running, verify host/port/credentials
Command timeoutIncrease command_timeout, optimize queries
Access deniedVerify username/password, check user permissions

#Next Steps