Daita Logo

Catalog Plugin

Pluggable infrastructure discovery, schema profiling, and metadata management across databases, APIs, and cloud services.

#Installation

Install the optional extras for the sources you plan to discover:

bash
pip install 'daita-agents[postgresql]'    # PostgreSQL discovery
pip install 'daita-agents[mysql]'         # MySQL discovery
pip install 'daita-agents[mongodb]'       # MongoDB discovery
pip install 'daita-agents[aws]'           # AWS infrastructure discovery
pip install 'daita-agents[opensearch]'    # OpenSearch discovery
pip install 'daita-agents[github]'        # GitHub repository discovery

#Quick Start

python
from daita import Agent
from daita.plugins import catalog
 
cat = catalog()
 
agent = Agent(
    name="Schema Analyst",
    prompt="You are a database schema expert. Help users discover and analyze database structures.",
    tools=[cat]
)
 
await agent.start()
result = await agent.run("Discover the PostgreSQL database schema at localhost")

#Direct Usage

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_postgres(
    connection_string="postgresql://user:pass@localhost:5432/mydb",
    schema="public"
)
schema = result['schema']
 
print(f"Found {schema['table_count']} tables")
print(f"Total columns: {schema['column_count']}")

#Configuration Parameters

python
catalog(
    backend: Optional[Any] = None,
    organization_id: Optional[int] = None,
    auto_persist: bool = False
)

#Parameters

  • backend (Any): Optional graph backend override. If None, the backend is selected automatically based on the runtime environment
  • organization_id (int): Optional organization ID for multi-tenant storage
  • auto_persist (bool): Automatically persist discoveries to graph storage

#Schema Discovery

#PostgreSQL

python
result = await cat.discover_postgres(
    connection_string="postgresql://user:pass@localhost:5432/analytics",
    schema="public",
    ssl_mode="verify-full",  # or "require" for pgbouncer poolers
    persist=False
)
schema = result['schema']
 
print(f"Database: {schema['database_type']}")
print(f"Tables: {schema['table_count']}")
for table in schema['tables']:
    print(f"  - {table['table_name']}: {table['row_count']} rows")

#MySQL

python
result = await cat.discover_mysql(
    connection_string="mysql://user:pass@localhost:3306/sales",
    schema="sales"
)

#MongoDB

python
result = await cat.discover_mongodb(
    connection_string="mongodb://localhost:27017",
    database="analytics",
    sample_size=100
)

#OpenAPI

python
result = await cat.discover_openapi(
    spec_url="https://api.example.com/openapi.json",
    service_name="Example API"
)

#Infrastructure Discovery

New in 0.14.0. The Catalog plugin supports pluggable discoverers that enumerate data stores across cloud accounts, config files, and service registries — and profilers that extract normalized schemas from each discovered store.

#Built-in Discoverers

The AWS discoverer scans your account for the following services:

ServiceStore TypeDiscoverer
RDS PostgreSQLpostgresql_postgres
RDS MySQLmysql_mysql
DynamoDBdynamodb_dynamodb
S3s3_s3
DocumentDB (MongoDB-compat)mongodb_documentdb
API Gatewayapigateway_apigateway
Kinesiskinesis_kinesis
OpenSearchopensearch_opensearch
SNSsns_sns
SQSsqs_sqs

A GitHub scanner (GitHubScanner) is also included for scanning repositories and OpenAPI specs in GitHub organizations.

#Using Infrastructure Discovery

python
from daita.plugins.catalog import CatalogPlugin
from daita.plugins.catalog.aws import AWSDiscoverer
from daita.plugins.catalog.github import GitHubScanner
 
cat = CatalogPlugin(auto_persist=True)
 
# Register discoverers
cat.add_discoverer(AWSDiscoverer(regions=["us-east-1", "us-west-2"]))
cat.add_discoverer(GitHubScanner(org="my-company", token="ghp_..."))
 
# Discover all infrastructure
result = await cat.discover_all(concurrency=5)
 
print(f"Found {len(result.stores)} stores")
for store in result.stores:
    print(f"  {store.display_name} ({store.store_type}) — {store.region}")
 
if result.errors:
    for err in result.errors:
        print(f"  Error in {err.discoverer_name}: {err.error}")

#Profiling Discovered Stores

After discovery, profile individual stores to extract their full schema:

python
from daita.plugins.catalog.profiler import build_default_profilers
 
# Register profilers for all supported store types
for profiler in build_default_profilers():
    cat.add_profiler(profiler)
 
# Profile a specific store
store = result.stores[0]
profiler = cat._find_profiler(store.store_type)
schema = await profiler.profile(store)
 
print(f"Tables: {len(schema.tables)}")
for table in schema.tables:
    print(f"  {table.name}: {len(table.columns)} columns")

Or discover and profile in one call:

python
result = await cat.discover_and_profile(concurrency=5)

#Custom Discoverers

Extend BaseDiscoverer to add support for new infrastructure sources:

python
from daita.plugins.catalog import BaseDiscoverer, DiscoveredStore
 
class MyDiscoverer(BaseDiscoverer):
    name = "my-source"
 
    async def authenticate(self):
        # Set up credentials
        ...
 
    async def enumerate(self):
        # Yield DiscoveredStore instances
        yield DiscoveredStore(
            id="my-store-1",
            store_type="postgresql",
            display_name="My Database",
            source="my-source",
            region="us-east-1",
        )
 
cat.add_discoverer(MyDiscoverer())

#Custom Profilers

Extend BaseProfiler to add schema extraction for new store types:

python
from daita.plugins.catalog import BaseProfiler, NormalizedSchema
 
class MyProfiler(BaseProfiler):
    def supports(self, store_type: str) -> bool:
        return store_type == "my-store-type"
 
    async def profile(self, store) -> NormalizedSchema:
        # Connect and extract schema
        ...
 
cat.add_profiler(MyProfiler())

#Schema Analysis

#Compare Schemas

python
result_dev = await cat.discover_postgres(connection_string="postgresql://localhost/dev_db")
result_prod = await cat.discover_postgres(connection_string="postgresql://localhost/prod_db")
 
result = await cat.compare_schemas(result_dev['schema'], result_prod['schema'])
comparison = result['comparison']
 
print(f"Added tables: {comparison['added_tables']}")
print(f"Removed tables: {comparison['removed_tables']}")
print(f"Modified columns: {len(comparison['modified_columns'])}")
print(f"Breaking changes: {comparison['breaking_changes']}")

#Compare Store to Baseline

Compare a discovered store's current schema against its last persisted snapshot. This operation is available as an agent tool (compare_store_to_baseline) — the agent calls it automatically when asked to check for schema drift:

python
# Via agent (recommended)
result = await agent.run("Compare store abc123 against its baseline")

#Export Diagrams

python
result = await cat.discover_postgres(connection_string="postgresql://localhost/mydb")
 
# Export as Mermaid ER diagram
mermaid = await cat.export_diagram(result['schema'], format="mermaid")
print(mermaid['diagram'])
 
# Also supports "json_schema" format

#Using with Agents

python
from daita import Agent
from daita.plugins import catalog
 
cat = catalog(auto_persist=True)
 
agent = Agent(
    name="Schema Expert",
    prompt="You are a database schema analyst. Help users discover and understand database structures.",
    tools=[cat]
)
 
await agent.start()
 
result = await agent.run("""
Discover the PostgreSQL schema at localhost:5432/analytics.
Then compare it to the MySQL schema at localhost:3306/sales.
Identify any structural differences.
""")
 
await agent.stop()

#Available Tools

ToolDescriptionKey Parameters
discover_postgresDiscover PostgreSQL schemaconnection_string, schema, persist, ssl_mode, table_filter, max_tables
discover_mysqlDiscover MySQL schemaconnection_string, schema, persist, table_filter, max_tables
discover_mongodbDiscover MongoDB schemaconnection_string, database, sample_size, persist
discover_openapiDiscover API from OpenAPI specspec_url, service_name, persist
discover_infrastructureRun all registered discoverersconcurrency, offset, limit, refresh
profile_storeProfile a discovered store's full schemastore_id
find_storeSearch catalog by name, type, environment, or tagsquery, store_type, environment, tag, offset, limit
compare_schemasCompare two schemasschema_a, schema_b
compare_store_to_baselineCompare store against last persisted snapshotstore_id
export_diagramExport schema as diagramschema, format (mermaid or json_schema)

#Persistence

#Local (default)

Schemas are persisted to .daita/catalog.json when auto_persist=True or when persist=True is passed to individual discovery calls.

#Custom Backend

Register a custom storage backend at application startup:

python
from daita.plugins.catalog import register_catalog_backend_factory
 
register_catalog_backend_factory(lambda: MyStorageBackend())

The factory must return an object with an async persist_schema(schema: dict) -> bool method.

#Pruning Stale Entries

Remove catalog entries that haven't been refreshed recently:

python
result = await cat.prune_stale_catalog(max_age_seconds=604800)  # 7 days
print(f"Removed stale entries: {result['removed']}")

Entries with no last_seen timestamp are left untouched.

#Best Practices

Discovery:

  • Register only the discoverers you need to keep scans fast
  • Use concurrency to control parallel discovery (default: 5)
  • Use table_filter and max_tables to limit output size for large databases
  • Enable auto_persist when building organizational knowledge graphs

Performance:

  • Use refresh=False on discover_infrastructure to reuse cached results for pagination
  • Discovery can be slow for large accounts — consider running with higher concurrency
  • Use find_store to search the catalog instead of re-scanning

Security:

  • Use read-only database accounts for discovery
  • Store credentials securely, never hardcode
  • AWS discoverers use your default boto3 credential chain

#Next Steps