PostgreSQL Plugin
Async PostgreSQL operations with automatic connection pooling built on `asyncpg`.
#Installation
pip install asyncpg#Quick Start
from daita import Agent
from daita.plugins import postgresql
# Create plugin
db = postgresql(
host="localhost",
database="mydb",
username="user",
password="password"
)
# Agent uses database tools autonomously
agent = Agent(
name="DB Agent",
prompt="You are a database analyst. Help users query and analyze data.",
tools=[db]
)
await agent.start()
result = await agent.run("Show me all users")#Direct Usage
The plugin can be used directly without agents for programmatic access. For comprehensive PostgreSQL documentation, see the official PostgreSQL docs. The main value of this plugin is agent integration - enabling LLMs to autonomously query and analyze database data.
#Connection Parameters
postgresql(
host: str = "localhost",
port: int = 5432,
database: str = "",
username: str = "",
user: Optional[str] = None, # Alias for username
password: str = "",
connection_string: Optional[str] = None,
min_size: int = 1,
max_size: int = 10,
command_timeout: int = 60
)#Parameters
host(str): Database host addressport(int): Database port (default: 5432)database(str): Database name to connect tousernameoruser(str): Username for authenticationpassword(str): Password for authenticationconnection_string(str): Full PostgreSQL connection string (overrides individual params)min_size(int): Minimum connections in pool (default: 1)max_size(int): Maximum connections in pool (default: 10)command_timeout(int): Query timeout in seconds (default: 60)
#Connection Methods
Connection string or individual parameters:
from daita.plugins import postgresql
# Individual parameters
async with postgresql(
host="localhost",
database="analytics",
username="analyst",
password="secure_password"
) as db:
results = await db.query("SELECT * FROM events")
# Connection string
async with postgresql(
connection_string="postgresql://user:pass@localhost:5432/analytics"
) as db:
results = await db.query("SELECT * FROM events")
# With connection pooling
async with postgresql(
host="localhost",
database="mydb",
username="user",
password="password",
min_size=5,
max_size=20,
command_timeout=30
) as db:
results = await db.query("SELECT * FROM users")#Query Operations
SELECT queries - Use query() method, returns list of dictionaries:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="analytics") as db:
# Simple query
users = await db.query("SELECT id, name, email FROM users")
# Returns: [{"id": 1, "name": "John", "email": "john@example.com"}, ...]
# Parameterized query (use $1, $2, $3, etc.)
active_users = await db.query(
"SELECT * FROM users WHERE status = $1 AND created_at > $2",
["active", "2024-01-01"]
)Parameter syntax: PostgreSQL uses $1, $2, $3 (not %s like MySQL)
#Data Modification
INSERT, UPDATE, DELETE - Use execute() method, returns row count:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="app") as db:
# Insert
await db.execute(
"INSERT INTO users (name, email, status) VALUES ($1, $2, $3)",
["Jane Doe", "jane@example.com", "active"]
)
# Update
await db.execute(
"UPDATE users SET last_login = NOW() WHERE id = $1",
[user_id]
)
# Delete
await db.execute(
"DELETE FROM users WHERE status = $1 AND inactive_days > $2",
["inactive", 365]
)#Bulk Operations & Schema
Bulk insert:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="app") as db:
users_data = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
]
count = await db.insert_many("users", users_data)List tables:
from daita.plugins import postgresql
async with postgresql(host="localhost", database="app") as db:
tables = await db.tables()
# Returns: ["users", "orders", "products"]#Using with Agents
#Tool-Based Integration (Recommended)
PostgreSQL plugin exposes database operations as tools that agents can use autonomously:
from daita import Agent
from daita.plugins import postgresql
import os
# Create database plugin
db = postgresql(
host="localhost",
database="analytics",
username=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD")
)
# Pass plugin to agent - agent can now use database tools autonomously
agent = Agent(
name="Data Analyst",
prompt="You are a database analyst. Help users query and analyze data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)
await agent.start()
# Agent autonomously uses database tools to answer questions
result = await agent.run("Show me the top 10 users by activity count")
# The agent will autonomously:
# 1. Use list_tables to explore database
# 2. Use get_table_schema to understand structure
# 3. Use query_database to fetch the data
# 4. Analyze and present results in natural language
await agent.stop()#Direct Database Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import postgresql
async with postgresql(
host="localhost",
database="analytics",
username="analyst",
password="password"
) as db:
# Direct queries
user = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
events = await db.query(
"SELECT * FROM events WHERE user_id = $1 LIMIT 100",
[user_id]
)
print(f"User: {user[0] if user else None}")
print(f"Events: {len(events)}")#Available Tools
The PostgreSQL plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| query_database | Execute SELECT queries | sql (required), params (optional array) |
| execute_sql | Run INSERT/UPDATE/DELETE | sql (required), params (optional array) |
| list_tables | List all tables | None |
| get_table_schema | Get table column info | table_name (required) |
Tool Categories: database
Tool Source: plugin
#Tool Usage Example
from daita import Agent
from daita.plugins import postgresql
# Setup database with tool integration
db = postgresql(host="localhost", database="sales")
agent = Agent(
name="Sales Analyzer",
prompt="You are a sales analyst. Help users analyze sales data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)
await agent.start()
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze sales data:
1. What tables are available?
2. Find total revenue by product category
3. Show top 5 customers by purchase amount
""")
# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - get_table_schema("sales") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns formatted analysis in natural language
print(result)
await agent.stop()#Error Handling
from daita.plugins import postgresql
try:
async with postgresql(host="localhost", database="mydb") as db:
results = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
except RuntimeError as e:
if "asyncpg not installed" in str(e):
print("Install asyncpg: pip install asyncpg")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")
else:
print(f"Error: {e}")#Best Practices
Connection Management:
- Always use context managers (
async with) for automatic cleanup - Configure pool size based on workload (
min_size,max_size) - Set appropriate timeouts to prevent hanging operations
Security:
- Use parameterized queries to prevent SQL injection (
$1,$2, etc.) - Store credentials in environment variables, never hardcode
- Use read-only accounts when possible
Performance:
- Use
insert_many()for bulk inserts instead of loops - Limit result sets to avoid memory issues
- Configure connection pools for high-throughput apps
#Troubleshooting
| Issue | Solution |
|---|---|
asyncpg not installed | pip install asyncpg |
Connection to database failed | Check database is running, verify host/port/credentials |
Command timeout | Increase command_timeout, optimize queries |
| Access denied | Verify username/password, check user permissions |
#Next Steps
- MySQL Plugin - For MySQL databases
- MongoDB Plugin - For document databases
- Workflows - Use PostgreSQL in multi-agent workflows
- Plugin Overview - Learn about other plugins