Skip to main content

PostgreSQL Plugin

Async PostgreSQL operations with automatic connection pooling built on asyncpg.

Installation

pip install asyncpg

Quick Start

Direct Usage (Scripts)

from daita.plugins import postgresql

# Direct usage in scripts
async with postgresql(
host="localhost",
database="mydb",
username="user",
password="password"
) as db:
results = await db.query("SELECT * FROM users")
print(results)
from daita import SubstrateAgent
from daita.plugins import postgresql

# Create plugin
db = postgresql(
host="localhost",
database="mydb",
username="user",
password="password"
)

# Agent uses database tools autonomously
agent = SubstrateAgent(
name="DB Agent",
prompt="You are a database analyst. Help users query and analyze data.",
tools=[db]
)

await agent.start()
result = await agent.run("Show me all users")

Connection Parameters

postgresql(
host: str = "localhost",
port: int = 5432,
database: str = "",
username: str = "",
user: Optional[str] = None, # Alias for username
password: str = "",
connection_string: Optional[str] = None,
min_size: int = 1,
max_size: int = 10,
command_timeout: int = 60
)

Parameters

  • host (str): Database host address
  • port (int): Database port (default: 5432)
  • database (str): Database name to connect to
  • username or user (str): Username for authentication
  • password (str): Password for authentication
  • connection_string (str): Full PostgreSQL connection string (overrides individual params)
  • min_size (int): Minimum connections in pool (default: 1)
  • max_size (int): Maximum connections in pool (default: 10)
  • command_timeout (int): Query timeout in seconds (default: 60)

Connection Methods

Connection string or individual parameters:

from daita.plugins import postgresql

# Individual parameters
async with postgresql(
host="localhost",
database="analytics",
username="analyst",
password="secure_password"
) as db:
results = await db.query("SELECT * FROM events")

# Connection string
async with postgresql(
connection_string="postgresql://user:pass@localhost:5432/analytics"
) as db:
results = await db.query("SELECT * FROM events")

# With connection pooling
async with postgresql(
host="localhost",
database="mydb",
username="user",
password="password",
min_size=5,
max_size=20,
command_timeout=30
) as db:
results = await db.query("SELECT * FROM users")

Query Operations

SELECT queries - Use query() method, returns list of dictionaries:

from daita.plugins import postgresql

async with postgresql(host="localhost", database="analytics") as db:
# Simple query
users = await db.query("SELECT id, name, email FROM users")
# Returns: [{"id": 1, "name": "John", "email": "john@example.com"}, ...]

# Parameterized query (use $1, $2, $3, etc.)
active_users = await db.query(
"SELECT * FROM users WHERE status = $1 AND created_at > $2",
["active", "2024-01-01"]
)

Parameter syntax: PostgreSQL uses $1, $2, $3 (not %s like MySQL)

Data Modification

INSERT, UPDATE, DELETE - Use execute() method, returns row count:

from daita.plugins import postgresql

async with postgresql(host="localhost", database="app") as db:
# Insert
await db.execute(
"INSERT INTO users (name, email, status) VALUES ($1, $2, $3)",
["Jane Doe", "jane@example.com", "active"]
)

# Update
await db.execute(
"UPDATE users SET last_login = NOW() WHERE id = $1",
[user_id]
)

# Delete
await db.execute(
"DELETE FROM users WHERE status = $1 AND inactive_days > $2",
["inactive", 365]
)

Bulk Operations & Schema

Bulk insert:

from daita.plugins import postgresql

async with postgresql(host="localhost", database="app") as db:
users_data = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
]
count = await db.insert_many("users", users_data)

List tables:

from daita.plugins import postgresql

async with postgresql(host="localhost", database="app") as db:
tables = await db.tables()
# Returns: ["users", "orders", "products"]

Using with Agents

PostgreSQL plugin exposes database operations as tools that agents can use autonomously:

from daita import SubstrateAgent
from daita.plugins import postgresql
import os

# Create database plugin
db = postgresql(
host="localhost",
database="analytics",
username=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD")
)

# Pass plugin to agent - agent can now use database tools autonomously
agent = SubstrateAgent(
name="Data Analyst",
prompt="You are a database analyst. Help users query and analyze data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)

await agent.start()

# Agent autonomously uses database tools to answer questions
result = await agent.run("Show me the top 10 users by activity count")

# The agent will autonomously:
# 1. Use list_tables to explore database
# 2. Use get_table_schema to understand structure
# 3. Use query_database to fetch the data
# 4. Analyze and present results in natural language

await agent.stop()

Direct Database Operations (Scripts)

For scripts that don't need agent capabilities:

from daita.plugins import postgresql

async with postgresql(
host="localhost",
database="analytics",
username="analyst",
password="password"
) as db:
# Direct queries
user = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
events = await db.query(
"SELECT * FROM events WHERE user_id = $1 LIMIT 100",
[user_id]
)

print(f"User: {user[0] if user else None}")
print(f"Events: {len(events)}")

Available Tools

The PostgreSQL plugin exposes these tools to LLM agents:

ToolDescriptionParameters
query_databaseExecute SELECT queriessql (required), params (optional array)
execute_sqlRun INSERT/UPDATE/DELETEsql (required), params (optional array)
list_tablesList all tablesNone
get_table_schemaGet table column infotable_name (required)

Tool Categories: database Tool Source: plugin

Tool Usage Example

from daita import SubstrateAgent
from daita.plugins import postgresql

# Setup database with tool integration
db = postgresql(host="localhost", database="sales")

agent = SubstrateAgent(
name="Sales Analyzer",
prompt="You are a sales analyst. Help users analyze sales data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)

await agent.start()

# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze sales data:
1. What tables are available?
2. Find total revenue by product category
3. Show top 5 customers by purchase amount
""")

# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - get_table_schema("sales") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns formatted analysis in natural language

print(result)
await agent.stop()

Error Handling

from daita.plugins import postgresql

try:
async with postgresql(host="localhost", database="mydb") as db:
results = await db.query("SELECT * FROM users WHERE id = $1", [user_id])
except RuntimeError as e:
if "asyncpg not installed" in str(e):
print("Install asyncpg: pip install asyncpg")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")
else:
print(f"Error: {e}")

Best Practices

Connection Management:

  • Always use context managers (async with) for automatic cleanup
  • Configure pool size based on workload (min_size, max_size)
  • Set appropriate timeouts to prevent hanging operations

Security:

  • Use parameterized queries to prevent SQL injection ($1, $2, etc.)
  • Store credentials in environment variables, never hardcode
  • Use read-only accounts when possible

Performance:

  • Use insert_many() for bulk inserts instead of loops
  • Limit result sets to avoid memory issues
  • Configure connection pools for high-throughput apps

Troubleshooting

IssueSolution
asyncpg not installedpip install asyncpg
Connection to database failedCheck database is running, verify host/port/credentials
Command timeoutIncrease command_timeout, optimize queries
Access deniedVerify username/password, check user permissions

Next Steps