Catalog Plugin
Schema discovery and metadata management for databases, APIs, and organizational systems.
#Installation
pip install asyncpg # For PostgreSQL support
pip install aiomysql # For MySQL support
pip install motor # For MongoDB support
pip install httpx # For API discovery#Quick Start
from daita import Agent
from daita.plugins import catalog
# Create catalog plugin
cat = catalog()
# Agent uses catalog tools autonomously
agent = Agent(
name="Schema Analyst",
prompt="You are a database schema expert. Help users discover and analyze database structures.",
tools=[cat]
)
await agent.start()
result = await agent.run("Discover the PostgreSQL database schema at localhost")#Direct Usage
The plugin can be used directly for programmatic schema discovery:
from daita.plugins import catalog
cat = catalog()
# Discover PostgreSQL schema
result = await cat.discover_postgres(
connection_string="postgresql://user:pass@localhost:5432/mydb",
schema="public"
)
schema = result['schema']
print(f"Found {schema['table_count']} tables")
print(f"Total columns: {schema['column_count']}")#Configuration Parameters
catalog(
backend: Optional[Any] = None,
organization_id: Optional[int] = None,
auto_persist: bool = False
)#Parameters
backend(Any): Optional graph backend override. IfNone, the backend is selected automatically based on the runtime environmentorganization_id(int): Optional organization ID for multi-tenant storageauto_persist(bool): Automatically persist discoveries to graph storage
#Schema Discovery
#PostgreSQL
Discover PostgreSQL database schemas including tables, columns, foreign keys, and indexes:
from daita.plugins import catalog
cat = catalog()
result = await cat.discover_postgres(
connection_string="postgresql://user:pass@localhost:5432/analytics",
schema="public",
persist=False
)
schema = result['schema']
print(f"Database: {schema['database_type']}")
print(f"Tables: {schema['table_count']}")
for table in schema['tables']:
print(f" - {table['table_name']}: {table['row_count']} rows")#MySQL
Discover MySQL/MariaDB database schemas:
from daita.plugins import catalog
cat = catalog()
result = await cat.discover_mysql(
connection_string="mysql://user:pass@localhost:3306/sales",
schema="sales"
)
schema = result['schema']
print(f"Found {len(schema['tables'])} tables")
print(f"Foreign keys: {len(schema['foreign_keys'])}")#MongoDB
Infer MongoDB schema by sampling documents:
from daita.plugins import catalog
cat = catalog()
result = await cat.discover_mongodb(
connection_string="mongodb://localhost:27017",
database="analytics",
sample_size=100
)
schema = result['schema']
for collection in schema['collections']:
print(f"{collection['collection_name']}: {collection['document_count']} documents")
print(f" Sampled: {collection['sampled_count']}")
print(f" Fields: {len(collection['fields'])}")#OpenAPI Discovery
Discover API structure from OpenAPI/Swagger specifications:
from daita.plugins import catalog
cat = catalog()
result = await cat.discover_openapi(
spec_url="https://api.example.com/openapi.json",
service_name="Example API"
)
schema = result['schema']
print(f"Service: {schema['service_name']}")
print(f"Version: {schema['version']}")
print(f"Endpoints: {schema['endpoint_count']}")#Schema Analysis
#Compare Schemas
Identify differences between two schemas for migration planning:
from daita.plugins import catalog
cat = catalog()
# Discover two schemas
result_dev = await cat.discover_postgres(
connection_string="postgresql://localhost/dev_db"
)
result_prod = await cat.discover_postgres(
connection_string="postgresql://localhost/prod_db"
)
# Compare (pass the inner schema dicts)
result = await cat.compare_schemas(result_dev['schema'], result_prod['schema'])
comparison = result['comparison']
print(f"Added tables: {comparison['added_tables']}")
print(f"Removed tables: {comparison['removed_tables']}")
print(f"Modified columns: {len(comparison['modified_columns'])}")
print(f"Breaking changes: {comparison['breaking_changes']}")#Export Diagrams
Export schemas as visual diagrams:
from daita.plugins import catalog
cat = catalog()
result = await cat.discover_postgres(
connection_string="postgresql://localhost/mydb"
)
# Export as Mermaid diagram
mermaid = await cat.export_diagram(result['schema'], format="mermaid")
print(mermaid['diagram'])
# Export as JSON Schema
json_schema = await cat.export_diagram(result['schema'], format="json_schema")
print(json_schema['schema'])#Using with Agents
The Catalog plugin exposes discovery operations as tools that agents can use autonomously:
from daita import Agent
from daita.plugins import catalog
# Create catalog plugin
cat = catalog(auto_persist=True)
# Agent with catalog tools
agent = Agent(
name="Schema Expert",
prompt="You are a database schema analyst. Help users discover and understand database structures.",
llm_provider="openai",
model="gpt-4",
tools=[cat]
)
await agent.start()
# Agent autonomously discovers and analyzes schemas
result = await agent.run("""
Discover the PostgreSQL schema at localhost:5432/analytics.
Then compare it to the MySQL schema at localhost:3306/sales.
Identify any structural differences.
""")
print(result)
await agent.stop()#Available Tools
The Catalog plugin exposes these tools to agents:
| Tool | Description | Parameters |
|---|---|---|
| discover_postgres | Discover PostgreSQL schema | connection_string, schema, persist |
| discover_mysql | Discover MySQL schema | connection_string, schema, persist |
| discover_mongodb | Discover MongoDB schema | connection_string, database, sample_size |
| discover_openapi | Discover API from OpenAPI spec | spec_url, service_name, persist |
| compare_schemas | Compare two schemas | schema_a, schema_b |
| export_diagram | Export schema as diagram | schema, format (mermaid or json_schema) |
#Pruning Stale Catalog Entries
Remove catalog entries that haven't been seen in a while, useful after running a full discovery pass to evict databases or services no longer in use:
from daita.plugins import catalog
cat = catalog()
# Run a full discovery pass...
await cat.discover_postgres(connection_string="postgresql://localhost/db1", persist=True)
await cat.discover_postgres(connection_string="postgresql://localhost/db2", persist=True)
# Remove entries not refreshed in the last 7 days (604800 seconds)
result = await cat.prune_stale_catalog(max_age_seconds=604800)
print(f"Removed stale entries: {result['removed']}")Entries with no last_seen timestamp (written before persistence was enabled) are left untouched.
#Best Practices
Discovery:
- Use appropriate sample sizes for MongoDB (larger for accurate inference)
- Enable persistence when building organizational knowledge graphs
- Compare schemas regularly to track drift
Performance:
- Discovery can be slow for large databases
- Use specific schema names to limit scope
- Consider timeouts for large-scale discovery operations
Security:
- Use read-only database accounts for discovery
- Store credentials securely, never hardcode
- Limit discovery to necessary schemas only
#Next Steps
- Lineage Plugin - Track data flows and dependencies
- Neo4j Plugin - Store schemas in graph databases
- Workflows - Use catalog in multi-agent workflows