Catalog Plugin

Schema discovery and metadata management for databases, APIs, and organizational systems.

#Installation

bash
pip install asyncpg  # For PostgreSQL support
pip install aiomysql  # For MySQL support
pip install motor  # For MongoDB support
pip install httpx  # For API discovery

#Quick Start

python
from daita import Agent
from daita.plugins import catalog
 
# Create catalog plugin
cat = catalog()
 
# Agent uses catalog tools autonomously
agent = Agent(
    name="Schema Analyst",
    prompt="You are a database schema expert. Help users discover and analyze database structures.",
    tools=[cat]
)
 
await agent.start()
result = await agent.run("Discover the PostgreSQL database schema at localhost")

#Direct Usage

The plugin can be used directly for programmatic schema discovery:

python
from daita.plugins import catalog
 
cat = catalog()
 
# Discover PostgreSQL schema
result = await cat.discover_postgres(
    connection_string="postgresql://user:pass@localhost:5432/mydb",
    schema="public"
)
schema = result['schema']
 
print(f"Found {schema['table_count']} tables")
print(f"Total columns: {schema['column_count']}")

#Configuration Parameters

python
catalog(
    backend: Optional[Any] = None,
    organization_id: Optional[int] = None,
    auto_persist: bool = False
)

#Parameters

  • backend (Any): Optional graph backend override. If None, the backend is selected automatically based on the runtime environment
  • organization_id (int): Optional organization ID for multi-tenant storage
  • auto_persist (bool): Automatically persist discoveries to graph storage

#Schema Discovery

#PostgreSQL

Discover PostgreSQL database schemas including tables, columns, foreign keys, and indexes:

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_postgres(
    connection_string="postgresql://user:pass@localhost:5432/analytics",
    schema="public",
    persist=False
)
schema = result['schema']
 
print(f"Database: {schema['database_type']}")
print(f"Tables: {schema['table_count']}")
for table in schema['tables']:
    print(f"  - {table['table_name']}: {table['row_count']} rows")

#MySQL

Discover MySQL/MariaDB database schemas:

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_mysql(
    connection_string="mysql://user:pass@localhost:3306/sales",
    schema="sales"
)
schema = result['schema']
 
print(f"Found {len(schema['tables'])} tables")
print(f"Foreign keys: {len(schema['foreign_keys'])}")

#MongoDB

Infer MongoDB schema by sampling documents:

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_mongodb(
    connection_string="mongodb://localhost:27017",
    database="analytics",
    sample_size=100
)
schema = result['schema']
 
for collection in schema['collections']:
    print(f"{collection['collection_name']}: {collection['document_count']} documents")
    print(f"  Sampled: {collection['sampled_count']}")
    print(f"  Fields: {len(collection['fields'])}")

#OpenAPI Discovery

Discover API structure from OpenAPI/Swagger specifications:

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_openapi(
    spec_url="https://api.example.com/openapi.json",
    service_name="Example API"
)
schema = result['schema']
 
print(f"Service: {schema['service_name']}")
print(f"Version: {schema['version']}")
print(f"Endpoints: {schema['endpoint_count']}")

#Schema Analysis

#Compare Schemas

Identify differences between two schemas for migration planning:

python
from daita.plugins import catalog
 
cat = catalog()
 
# Discover two schemas
result_dev = await cat.discover_postgres(
    connection_string="postgresql://localhost/dev_db"
)
 
result_prod = await cat.discover_postgres(
    connection_string="postgresql://localhost/prod_db"
)
 
# Compare (pass the inner schema dicts)
result = await cat.compare_schemas(result_dev['schema'], result_prod['schema'])
comparison = result['comparison']
 
print(f"Added tables: {comparison['added_tables']}")
print(f"Removed tables: {comparison['removed_tables']}")
print(f"Modified columns: {len(comparison['modified_columns'])}")
print(f"Breaking changes: {comparison['breaking_changes']}")

#Export Diagrams

Export schemas as visual diagrams:

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_postgres(
    connection_string="postgresql://localhost/mydb"
)
 
# Export as Mermaid diagram
mermaid = await cat.export_diagram(result['schema'], format="mermaid")
print(mermaid['diagram'])
 
# Export as JSON Schema
json_schema = await cat.export_diagram(result['schema'], format="json_schema")
print(json_schema['schema'])

#Using with Agents

The Catalog plugin exposes discovery operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import catalog
 
# Create catalog plugin
cat = catalog(auto_persist=True)
 
# Agent with catalog tools
agent = Agent(
    name="Schema Expert",
    prompt="You are a database schema analyst. Help users discover and understand database structures.",
    llm_provider="openai",
    model="gpt-4",
    tools=[cat]
)
 
await agent.start()
 
# Agent autonomously discovers and analyzes schemas
result = await agent.run("""
Discover the PostgreSQL schema at localhost:5432/analytics.
Then compare it to the MySQL schema at localhost:3306/sales.
Identify any structural differences.
""")
 
print(result)
await agent.stop()

#Available Tools

The Catalog plugin exposes these tools to agents:

ToolDescriptionParameters
discover_postgresDiscover PostgreSQL schemaconnection_string, schema, persist
discover_mysqlDiscover MySQL schemaconnection_string, schema, persist
discover_mongodbDiscover MongoDB schemaconnection_string, database, sample_size
discover_openapiDiscover API from OpenAPI specspec_url, service_name, persist
compare_schemasCompare two schemasschema_a, schema_b
export_diagramExport schema as diagramschema, format (mermaid or json_schema)

#Pruning Stale Catalog Entries

Remove catalog entries that haven't been seen in a while, useful after running a full discovery pass to evict databases or services no longer in use:

python
from daita.plugins import catalog
 
cat = catalog()
 
# Run a full discovery pass...
await cat.discover_postgres(connection_string="postgresql://localhost/db1", persist=True)
await cat.discover_postgres(connection_string="postgresql://localhost/db2", persist=True)
 
# Remove entries not refreshed in the last 7 days (604800 seconds)
result = await cat.prune_stale_catalog(max_age_seconds=604800)
print(f"Removed stale entries: {result['removed']}")

Entries with no last_seen timestamp (written before persistence was enabled) are left untouched.

#Best Practices

Discovery:

  • Use appropriate sample sizes for MongoDB (larger for accurate inference)
  • Enable persistence when building organizational knowledge graphs
  • Compare schemas regularly to track drift

Performance:

  • Discovery can be slow for large databases
  • Use specific schema names to limit scope
  • Consider timeouts for large-scale discovery operations

Security:

  • Use read-only database accounts for discovery
  • Store credentials securely, never hardcode
  • Limit discovery to necessary schemas only

#Next Steps