PostgreSQL Plugin
Async PostgreSQL operations with automatic connection pooling built on asyncpg.
Installation
pip install asyncpg
Quick Start
Direct Usage (Scripts)
from daita.plugins import postgresql
# Direct usage in scripts
async with postgresql(
host="localhost",
database="mydb",
username="user",
password="password"
) as db:
results = await db.query("SELECT * FROM users")
print(results)
Agent Integration (Recommended)
from daita import SubstrateAgent
from daita.plugins import postgresql
# Create plugin
db = postgresql(
host="localhost",
database="mydb",
username="user",
password="password"
)
# Agent uses database tools autonomously
agent = SubstrateAgent(
name="DB Agent",
prompt="You are a database analyst. Help users query and analyze data.",
tools=[db]
)
await agent.start()
result = await agent.run("Show me all users")
Connection Parameters
postgresql(
host: str = "localhost",
port: int = 5432,
database: str = "",
username: str = "",
user: Optional[str] = None, # Alias for username
password: str = "",
connection_string: Optional[str] = None,
min_size: int = 1,
max_size: int = 10,
command_timeout: int = 60
)
Parameters
host(str): Database host addressport(int): Database port (default: 5432)database(str): Database name to connect tousernameoruser(str): Username for authenticationpassword(str): Password for authenticationconnection_string(str): Full PostgreSQL connection string (overrides individual params)min_size(int): Minimum connections in pool (default: 1)max_size(int): Maximum connections in pool (default: 10)command_timeout(int): Query timeout in seconds (default: 60)
Connection Methods
Connection string or individual parameters:
from daita.plugins import postgresql
# Individual parameters
async with postgresql(
host="localhost",
database="analytics",
username="analyst",
password="secure_password"
) as db:
results = await db.query("SELECT * FROM events")
# Connection string
async with postgresql(
connection_string="postgresql://user:pass@localhost:5432/analytics"
) as db:
results = await db.query("SELECT * FROM events")
# With connection pooling
async with postgresql(
host="localhost",
database="mydb",
username="user",
password="password",
min_size=5,
max_size=20,
command_timeout=30
) as db:
results = await db.query("SELECT * FROM users")
Query Operations
SELECT queries - Use query() method, returns list of dictionaries:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="analytics") as db:
# Simple query
users = await db.query("SELECT id, name, email FROM users")
# Returns: [{"id": 1, "name": "John", "email": "john@example.com"}, ...]
# Parameterized query (use $1, $2, $3, etc.)
active_users = await db.query(
"SELECT * FROM users WHERE status = $1 AND created_at > $2",
["active", "2024-01-01"]
)
Parameter syntax: PostgreSQL uses $1, $2, $3 (not %s like MySQL)
Data Modification
INSERT, UPDATE, DELETE - Use execute() method, returns row count:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="app") as db:
# Insert
await db.execute(
"INSERT INTO users (name, email, status) VALUES ($1, $2, $3)",
["Jane Doe", "jane@example.com", "active"]
)
# Update
await db.execute(
"UPDATE users SET last_login = NOW() WHERE id = $1",
[user_id]
)
# Delete
await db.execute(
"DELETE FROM users WHERE status = $1 AND inactive_days > $2",
["inactive", 365]
)
Bulk Operations & Schema
Bulk insert:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="app") as db:
users_data = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
]
count = await db.insert_many("users", users_data)
List tables:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="app") as db:
tables = await db.tables()
# Returns: ["users", "orders", "products"]
Using with Agents
Tool-Based Integration (Recommended)
PostgreSQL plugin exposes database operations as tools that agents can use autonomously:
from daita import SubstrateAgent
from daita.plugins import postgresql
import os
# Create database plugin
db = postgresql(
host="localhost",
database="analytics",
username=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD")
)
# Pass plugin to agent - agent can now use database tools autonomously
agent = SubstrateAgent(
name="Data Analyst",
prompt="You are a database analyst. Help users query and analyze data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)
await agent.start()
# Agent autonomously uses database tools to answer questions
result = await agent.run("Show me the top 10 users by activity count")
# The agent will autonomously:
# 1. Use list_tables to explore database
# 2. Use get_table_schema to understand structure
# 3. Use query_database to fetch the data
# 4. Analyze and present results in natural language
await agent.stop()
Direct Database Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import postgresql
async with postgresql(
host="localhost",
database="analytics",
username="analyst",
password="password"
) as db:
# Direct queries
user = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
events = await db.query(
"SELECT * FROM events WHERE user_id = $1 LIMIT 100",
[user_id]
)
print(f"User: {user[0] if user else None}")
print(f"Events: {len(events)}")
Available Tools
The PostgreSQL plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| query_database | Execute SELECT queries | sql (required), params (optional array) |
| execute_sql | Run INSERT/UPDATE/DELETE | sql (required), params (optional array) |
| list_tables | List all tables | None |
| get_table_schema | Get table column info | table_name (required) |
Tool Categories: database
Tool Source: plugin
Tool Usage Example
from daita import SubstrateAgent
from daita.plugins import postgresql
# Setup database with tool integration
db = postgresql(host="localhost", database="sales")
agent = SubstrateAgent(
name="Sales Analyzer",
prompt="You are a sales analyst. Help users analyze sales data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)
await agent.start()
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze sales data:
1. What tables are available?
2. Find total revenue by product category
3. Show top 5 customers by purchase amount
""")
# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - get_table_schema("sales") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns formatted analysis in natural language
print(result)
await agent.stop()
Error Handling
from daita.plugins import postgresql
try:
async with postgresql(host="localhost", database="mydb") as db:
results = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
except RuntimeError as e:
if "asyncpg not installed" in str(e):
print("Install asyncpg: pip install asyncpg")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")
else:
print(f"Error: {e}")
Best Practices
Connection Management:
- Always use context managers (
async with) for automatic cleanup - Configure pool size based on workload (
min_size,max_size) - Set appropriate timeouts to prevent hanging operations
Security:
- Use parameterized queries to prevent SQL injection (
$1,$2, etc.) - Store credentials in environment variables, never hardcode
- Use read-only accounts when possible
Performance:
- Use
insert_many()for bulk inserts instead of loops - Limit result sets to avoid memory issues
- Configure connection pools for high-throughput apps
Troubleshooting
| Issue | Solution |
|---|---|
asyncpg not installed | pip install asyncpg |
Connection to database failed | Check database is running, verify host/port/credentials |
Command timeout | Increase command_timeout, optimize queries |
| Access denied | Verify username/password, check user permissions |
Next Steps
- MySQL Plugin - For MySQL databases
- MongoDB Plugin - For document databases
- Workflows - Use PostgreSQL in multi-agent workflows
- Plugin Overview - Learn about other plugins