Daita Logo

Catalog Plugin

Pluggable infrastructure discovery, schema profiling, and metadata management across databases, APIs, and cloud services.

#Installation

Install the optional extras for the sources you plan to discover:

bash
pip install 'daita-agents[postgresql]'    # PostgreSQL discovery
pip install 'daita-agents[mysql]'         # MySQL discovery
pip install 'daita-agents[mongodb]'       # MongoDB discovery
pip install 'daita-agents[aws]'           # AWS infrastructure discovery
pip install 'daita-agents[azure]'         # Azure infrastructure discovery
pip install 'daita-agents[opensearch]'    # OpenSearch discovery
pip install 'daita-agents[github]'        # GitHub repository discovery

#Quick Start

python
from daita import Agent
from daita.plugins import catalog
 
cat = catalog()
 
agent = Agent(
    name="Schema Analyst",
    prompt="You are a database schema expert. Help users discover and analyze database structures.",
    tools=[cat]
)
 
await agent.start()
result = await agent.run("Discover the PostgreSQL database schema at localhost")

#Direct Usage

python
from daita.plugins import catalog
 
cat = catalog()
 
result = await cat.discover_postgres(
    connection_string="postgresql://user:pass@localhost:5432/mydb",
    schema="public"
)
schema = result['schema']
 
print(f"Found {schema['table_count']} tables")
print(f"Total columns: {schema['column_count']}")

#Configuration Parameters

python
catalog(
    backend: Optional[Any] = None,
    organization_id: Optional[int] = None,
    auto_persist: bool = False
)

#Parameters

  • backend (Any): Optional graph backend override. If None, the backend is selected automatically based on the runtime environment
  • organization_id (int): Optional organization ID for multi-tenant storage
  • auto_persist (bool): Automatically persist discoveries to graph storage

#Schema Discovery

#PostgreSQL

python
result = await cat.discover_postgres(
    connection_string="postgresql://user:pass@localhost:5432/analytics",
    schema="public",
    ssl_mode="verify-full",  # or "require" for pgbouncer poolers
    persist=False
)
schema = result['schema']
 
print(f"Database: {schema['database_type']}")
print(f"Tables: {schema['table_count']}")
for table in schema['tables']:
    print(f"  - {table['table_name']}: {table['row_count']} rows")

#MySQL

python
result = await cat.discover_mysql(
    connection_string="mysql://user:pass@localhost:3306/sales",
    schema="sales"
)

#MongoDB

python
result = await cat.discover_mongodb(
    connection_string="mongodb://localhost:27017",
    database="analytics",
    sample_size=100
)

#OpenAPI

python
result = await cat.discover_openapi(
    spec_url="https://api.example.com/openapi.json",
    service_name="Example API"
)

#Infrastructure Discovery

The Catalog plugin supports pluggable discoverers that enumerate data stores across cloud accounts, config files, and service registries — and profilers that extract normalized schemas from each discovered store.

#Built-in Discoverers

AWS (AWSDiscoverer) scans your account for:

ServiceStore TypeDiscoverer
RDS PostgreSQLpostgresql_postgres
RDS MySQLmysql_mysql
DynamoDBdynamodb_dynamodb
S3s3_s3
DocumentDB (MongoDB-compat)mongodb_documentdb
API Gatewayapigateway_apigateway
Kinesiskinesis_kinesis
OpenSearchopensearch_opensearch
SNSsns_sns
SQSsqs_sqs

GCP (GCPDiscoverer, new in 0.16.0) scans your projects for:

ServiceStore TypeDiscoverer
Cloud SQL PostgreSQLpostgresql_postgres
Cloud SQL MySQLmysql_mysql
BigQuerybigquery_bigquery
Firestorefirestore_firestore
Bigtablebigtable_bigtable
Cloud Storagegcs_gcs
Pub/Subpubsub_pubsub
Memorystore (Redis)memorystore_memorystore
API Gatewaygcp_apigateway_gcp_apigateway

Install GCP support with pip install 'daita-agents[gcp]'.

Azure (AzureDiscoverer, new in 0.17.0) scans your subscriptions for:

ServiceStore TypeDiscoverer
Azure SQLsqlserver_azure_sql
Azure Database for PostgreSQLpostgresql_azure_postgresql
Azure Database for MySQLmysql_azure_mysql
Cosmos DBcosmosdb_azure_cosmosdb
Blob Storage containersazure_blob_azure_blob
Azure Cache for Redisredis_azure_redis
Event Hubseventhub_azure_eventhub
Service Bus queuesservicebus_queue_azure_servicebus
Service Bus topicsservicebus_topic_azure_servicebus
API Managementazure_apim_azure_apim

Install Azure support with pip install 'daita-agents[azure]'.

GitHub (GitHubScanner) scans repositories and OpenAPI specs in GitHub organizations.

#Using Infrastructure Discovery

python
from daita.plugins.catalog import CatalogPlugin
from daita.plugins.catalog.aws import AWSDiscoverer
from daita.plugins.catalog.azure import AzureDiscoverer
from daita.plugins.catalog.gcp import GCPDiscoverer
from daita.plugins.catalog.github import GitHubScanner
 
cat = CatalogPlugin(auto_persist=True)
 
# Register discoverers
cat.add_discoverer(AWSDiscoverer(regions=["us-east-1", "us-west-2"]))
cat.add_discoverer(AzureDiscoverer(
    subscriptions=["00000000-0000-0000-0000-000000000000"],
    locations=["eastus", "westus2"],
))
cat.add_discoverer(GCPDiscoverer(
    projects=["my-project-prod", "my-project-staging"],
    credentials_path="service-account.json",     # or use ADC
))
cat.add_discoverer(GitHubScanner(org="my-company", token="ghp_..."))
 
# Discover all infrastructure
result = await cat.discover_all(concurrency=5)
 
print(f"Found {len(result.stores)} stores")
for store in result.stores:
    print(f"  {store.display_name} ({store.store_type}) — {store.region}")
 
if result.errors:
    for err in result.errors:
        print(f"  Error in {err.discoverer_name}: {err.error}")

#GCP Authentication

GCPDiscoverer supports three credential modes, checked in order:

  1. Service account key — pass credentials_path="/path/to/key.json" or set GOOGLE_APPLICATION_CREDENTIALS.
  2. Service account impersonation — pass impersonate_service_account="target@project.iam.gserviceaccount.com".
  3. Application Default Credentials (ADC) — falls back to gcloud auth application-default login or the ambient workload identity.

Project and location lists can also come from the GCP_PROJECTS and GCP_LOCATIONS environment variables (CSV), mirroring the BigQueryPlugin conventions.

#Azure Authentication

AzureDiscoverer uses DefaultAzureCredential, so it honors the standard Azure SDK credential chain including environment credentials, managed identity, Azure CLI login, and workload identity.

python
from daita.plugins.catalog.azure import AzureDiscoverer
 
cat.add_discoverer(AzureDiscoverer(
    subscriptions=["00000000-0000-0000-0000-000000000000"],
    locations=["eastus"],
    services=["sql", "postgresql", "cosmosdb", "blob", "eventhub"],
))

Configuration can be passed directly or loaded from environment variables:

  • AZURE_SUBSCRIPTIONS - comma-separated subscription IDs
  • AZURE_LOCATIONS - comma-separated Azure regions
  • AZURE_TENANT_ID - tenant ID for credential selection

#Profiling Discovered Stores

After discovery, profile individual stores to extract their full schema:

python
from daita.plugins.catalog.profiler import build_default_profilers
 
# Register profilers for all supported store types
for profiler in build_default_profilers():
    cat.add_profiler(profiler)
 
# Profile a specific store
store = result.stores[0]
profiler = cat._find_profiler(store.store_type)
schema = await profiler.profile(store)
 
print(f"Tables: {len(schema.tables)}")
for table in schema.tables:
    print(f"  {table.name}: {len(table.columns)} columns")

Or discover and profile in one call:

python
result = await cat.discover_and_profile(concurrency=5)

When CatalogPlugin(auto_persist=True) is enabled, discover_and_profile() persists profiled schemas automatically. This writes local catalog snapshots and graph entities for supported source types, including Azure Blob, Cosmos DB, Event Hubs, Service Bus, and API Management.

#Custom Discoverers

Extend BaseDiscoverer to add support for new infrastructure sources:

python
from daita.plugins.catalog import BaseDiscoverer, DiscoveredStore
 
class MyDiscoverer(BaseDiscoverer):
    name = "my-source"
 
    async def authenticate(self):
        # Set up credentials
        ...
 
    async def enumerate(self):
        # Yield DiscoveredStore instances
        yield DiscoveredStore(
            id="my-store-1",
            store_type="postgresql",
            display_name="My Database",
            source="my-source",
            region="us-east-1",
        )
 
cat.add_discoverer(MyDiscoverer())

#Custom Profilers

Extend BaseProfiler to add schema extraction for new store types:

python
from daita.plugins.catalog import BaseProfiler, NormalizedSchema
 
class MyProfiler(BaseProfiler):
    def supports(self, store_type: str) -> bool:
        return store_type == "my-store-type"
 
    async def profile(self, store) -> NormalizedSchema:
        # Connect and extract schema
        ...
 
cat.add_profiler(MyProfiler())

#Schema Analysis

#Compare Schemas

python
result_dev = await cat.discover_postgres(connection_string="postgresql://localhost/dev_db")
result_prod = await cat.discover_postgres(connection_string="postgresql://localhost/prod_db")
 
result = await cat.compare_schemas(result_dev['schema'], result_prod['schema'])
comparison = result['comparison']
 
print(f"Added tables: {comparison['added_tables']}")
print(f"Removed tables: {comparison['removed_tables']}")
print(f"Modified columns: {len(comparison['modified_columns'])}")
print(f"Breaking changes: {comparison['breaking_changes']}")

#Compare Store to Baseline

Compare a discovered store's current schema against its last persisted snapshot. This operation is available as an agent tool (compare_store_to_baseline) — the agent calls it automatically when asked to check for schema drift:

python
# Via agent (recommended)
result = await agent.run("Compare store abc123 against its baseline")

#Export Diagrams

python
result = await cat.discover_postgres(connection_string="postgresql://localhost/mydb")
 
# Export as Mermaid ER diagram
mermaid = await cat.export_diagram(result['schema'], format="mermaid")
print(mermaid['diagram'])
 
# Also supports "json_schema" format

#Using with Agents

python
from daita import Agent
from daita.plugins import catalog
 
cat = catalog(auto_persist=True)
 
agent = Agent(
    name="Schema Expert",
    prompt="You are a database schema analyst. Help users discover and understand database structures.",
    tools=[cat]
)
 
await agent.start()
 
result = await agent.run("""
Discover the PostgreSQL schema at localhost:5432/analytics.
Then compare it to the MySQL schema at localhost:3306/sales.
Identify any structural differences.
""")
 
await agent.stop()

#Available Tools

ToolDescriptionKey Parameters
discover_postgresDiscover PostgreSQL schemaconnection_string, schema, persist, ssl_mode, table_filter, max_tables
discover_mysqlDiscover MySQL schemaconnection_string, schema, persist, table_filter, max_tables
discover_mongodbDiscover MongoDB schemaconnection_string, database, sample_size, persist
discover_openapiDiscover API from OpenAPI specspec_url, service_name, persist
discover_infrastructureRun all registered discoverers across AWS, GCP, Azure, GitHub, and custom sourcesconcurrency, offset, limit, refresh
profile_storeProfile a discovered store's full schemastore_id
find_storeSearch catalog by name, type, environment, or tagsquery, store_type, environment, tag, offset, limit
compare_schemasCompare two schemasschema_a, schema_b
compare_store_to_baselineCompare store against last persisted snapshotstore_id
export_diagramExport schema as diagramschema, format (mermaid or json_schema)

#Persistence

#Local (default)

Schemas are persisted to .daita/catalog.json when auto_persist=True or when persist=True is passed to individual discovery calls.

#Custom Backend

Register a custom storage backend at application startup:

python
from daita.plugins.catalog import register_catalog_backend_factory
 
register_catalog_backend_factory(lambda: MyStorageBackend())

The factory must return an object with an async persist_schema(schema: dict) -> bool method.

#Pruning Stale Entries

Remove catalog entries that haven't been refreshed recently:

python
result = await cat.prune_stale_catalog(max_age_seconds=604800)  # 7 days
print(f"Removed stale entries: {result['removed']}")

Entries with no last_seen timestamp are left untouched.

#Best Practices

Discovery:

  • Register only the discoverers you need to keep scans fast
  • Use concurrency to control parallel discovery (default: 5)
  • Use table_filter and max_tables to limit output size for large databases
  • Enable auto_persist when building organizational knowledge graphs

Performance:

  • Use refresh=False on discover_infrastructure to reuse cached results for pagination
  • Discovery can be slow for large accounts — consider running with higher concurrency
  • Use find_store to search the catalog instead of re-scanning

Security:

  • Use read-only database accounts for discovery
  • Store credentials securely, never hardcode
  • AWS discoverers use your default boto3 credential chain

#Next Steps