Catalog Plugin
Pluggable infrastructure discovery, schema profiling, and metadata management across databases, APIs, and cloud services.
#Installation
Install the optional extras for the sources you plan to discover:
pip install 'daita-agents[postgresql]' # PostgreSQL discovery
pip install 'daita-agents[mysql]' # MySQL discovery
pip install 'daita-agents[mongodb]' # MongoDB discovery
pip install 'daita-agents[aws]' # AWS infrastructure discovery
pip install 'daita-agents[opensearch]' # OpenSearch discovery
pip install 'daita-agents[github]' # GitHub repository discovery#Quick Start
from daita import Agent
from daita.plugins import catalog
cat = catalog()
agent = Agent(
name="Schema Analyst",
prompt="You are a database schema expert. Help users discover and analyze database structures.",
tools=[cat]
)
await agent.start()
result = await agent.run("Discover the PostgreSQL database schema at localhost")#Direct Usage
from daita.plugins import catalog
cat = catalog()
result = await cat.discover_postgres(
connection_string="postgresql://user:pass@localhost:5432/mydb",
schema="public"
)
schema = result['schema']
print(f"Found {schema['table_count']} tables")
print(f"Total columns: {schema['column_count']}")#Configuration Parameters
catalog(
backend: Optional[Any] = None,
organization_id: Optional[int] = None,
auto_persist: bool = False
)#Parameters
backend(Any): Optional graph backend override. IfNone, the backend is selected automatically based on the runtime environmentorganization_id(int): Optional organization ID for multi-tenant storageauto_persist(bool): Automatically persist discoveries to graph storage
#Schema Discovery
#PostgreSQL
result = await cat.discover_postgres(
connection_string="postgresql://user:pass@localhost:5432/analytics",
schema="public",
ssl_mode="verify-full", # or "require" for pgbouncer poolers
persist=False
)
schema = result['schema']
print(f"Database: {schema['database_type']}")
print(f"Tables: {schema['table_count']}")
for table in schema['tables']:
print(f" - {table['table_name']}: {table['row_count']} rows")#MySQL
result = await cat.discover_mysql(
connection_string="mysql://user:pass@localhost:3306/sales",
schema="sales"
)#MongoDB
result = await cat.discover_mongodb(
connection_string="mongodb://localhost:27017",
database="analytics",
sample_size=100
)#OpenAPI
result = await cat.discover_openapi(
spec_url="https://api.example.com/openapi.json",
service_name="Example API"
)#Infrastructure Discovery
New in 0.14.0. The Catalog plugin supports pluggable discoverers that enumerate data stores across cloud accounts, config files, and service registries — and profilers that extract normalized schemas from each discovered store.
#Built-in Discoverers
The AWS discoverer scans your account for the following services:
| Service | Store Type | Discoverer |
|---|---|---|
| RDS PostgreSQL | postgresql | _postgres |
| RDS MySQL | mysql | _mysql |
| DynamoDB | dynamodb | _dynamodb |
| S3 | s3 | _s3 |
| DocumentDB (MongoDB-compat) | mongodb | _documentdb |
| API Gateway | apigateway | _apigateway |
| Kinesis | kinesis | _kinesis |
| OpenSearch | opensearch | _opensearch |
| SNS | sns | _sns |
| SQS | sqs | _sqs |
A GitHub scanner (GitHubScanner) is also included for scanning repositories and OpenAPI specs in GitHub organizations.
#Using Infrastructure Discovery
from daita.plugins.catalog import CatalogPlugin
from daita.plugins.catalog.aws import AWSDiscoverer
from daita.plugins.catalog.github import GitHubScanner
cat = CatalogPlugin(auto_persist=True)
# Register discoverers
cat.add_discoverer(AWSDiscoverer(regions=["us-east-1", "us-west-2"]))
cat.add_discoverer(GitHubScanner(org="my-company", token="ghp_..."))
# Discover all infrastructure
result = await cat.discover_all(concurrency=5)
print(f"Found {len(result.stores)} stores")
for store in result.stores:
print(f" {store.display_name} ({store.store_type}) — {store.region}")
if result.errors:
for err in result.errors:
print(f" Error in {err.discoverer_name}: {err.error}")#Profiling Discovered Stores
After discovery, profile individual stores to extract their full schema:
from daita.plugins.catalog.profiler import build_default_profilers
# Register profilers for all supported store types
for profiler in build_default_profilers():
cat.add_profiler(profiler)
# Profile a specific store
store = result.stores[0]
profiler = cat._find_profiler(store.store_type)
schema = await profiler.profile(store)
print(f"Tables: {len(schema.tables)}")
for table in schema.tables:
print(f" {table.name}: {len(table.columns)} columns")Or discover and profile in one call:
result = await cat.discover_and_profile(concurrency=5)#Custom Discoverers
Extend BaseDiscoverer to add support for new infrastructure sources:
from daita.plugins.catalog import BaseDiscoverer, DiscoveredStore
class MyDiscoverer(BaseDiscoverer):
name = "my-source"
async def authenticate(self):
# Set up credentials
...
async def enumerate(self):
# Yield DiscoveredStore instances
yield DiscoveredStore(
id="my-store-1",
store_type="postgresql",
display_name="My Database",
source="my-source",
region="us-east-1",
)
cat.add_discoverer(MyDiscoverer())#Custom Profilers
Extend BaseProfiler to add schema extraction for new store types:
from daita.plugins.catalog import BaseProfiler, NormalizedSchema
class MyProfiler(BaseProfiler):
def supports(self, store_type: str) -> bool:
return store_type == "my-store-type"
async def profile(self, store) -> NormalizedSchema:
# Connect and extract schema
...
cat.add_profiler(MyProfiler())#Schema Analysis
#Compare Schemas
result_dev = await cat.discover_postgres(connection_string="postgresql://localhost/dev_db")
result_prod = await cat.discover_postgres(connection_string="postgresql://localhost/prod_db")
result = await cat.compare_schemas(result_dev['schema'], result_prod['schema'])
comparison = result['comparison']
print(f"Added tables: {comparison['added_tables']}")
print(f"Removed tables: {comparison['removed_tables']}")
print(f"Modified columns: {len(comparison['modified_columns'])}")
print(f"Breaking changes: {comparison['breaking_changes']}")#Compare Store to Baseline
Compare a discovered store's current schema against its last persisted snapshot. This operation is available as an agent tool (compare_store_to_baseline) — the agent calls it automatically when asked to check for schema drift:
# Via agent (recommended)
result = await agent.run("Compare store abc123 against its baseline")#Export Diagrams
result = await cat.discover_postgres(connection_string="postgresql://localhost/mydb")
# Export as Mermaid ER diagram
mermaid = await cat.export_diagram(result['schema'], format="mermaid")
print(mermaid['diagram'])
# Also supports "json_schema" format#Using with Agents
from daita import Agent
from daita.plugins import catalog
cat = catalog(auto_persist=True)
agent = Agent(
name="Schema Expert",
prompt="You are a database schema analyst. Help users discover and understand database structures.",
tools=[cat]
)
await agent.start()
result = await agent.run("""
Discover the PostgreSQL schema at localhost:5432/analytics.
Then compare it to the MySQL schema at localhost:3306/sales.
Identify any structural differences.
""")
await agent.stop()#Available Tools
| Tool | Description | Key Parameters |
|---|---|---|
| discover_postgres | Discover PostgreSQL schema | connection_string, schema, persist, ssl_mode, table_filter, max_tables |
| discover_mysql | Discover MySQL schema | connection_string, schema, persist, table_filter, max_tables |
| discover_mongodb | Discover MongoDB schema | connection_string, database, sample_size, persist |
| discover_openapi | Discover API from OpenAPI spec | spec_url, service_name, persist |
| discover_infrastructure | Run all registered discoverers | concurrency, offset, limit, refresh |
| profile_store | Profile a discovered store's full schema | store_id |
| find_store | Search catalog by name, type, environment, or tags | query, store_type, environment, tag, offset, limit |
| compare_schemas | Compare two schemas | schema_a, schema_b |
| compare_store_to_baseline | Compare store against last persisted snapshot | store_id |
| export_diagram | Export schema as diagram | schema, format (mermaid or json_schema) |
#Persistence
#Local (default)
Schemas are persisted to .daita/catalog.json when auto_persist=True or when persist=True is passed to individual discovery calls.
#Custom Backend
Register a custom storage backend at application startup:
from daita.plugins.catalog import register_catalog_backend_factory
register_catalog_backend_factory(lambda: MyStorageBackend())The factory must return an object with an async persist_schema(schema: dict) -> bool method.
#Pruning Stale Entries
Remove catalog entries that haven't been refreshed recently:
result = await cat.prune_stale_catalog(max_age_seconds=604800) # 7 days
print(f"Removed stale entries: {result['removed']}")Entries with no last_seen timestamp are left untouched.
#Best Practices
Discovery:
- Register only the discoverers you need to keep scans fast
- Use
concurrencyto control parallel discovery (default: 5) - Use
table_filterandmax_tablesto limit output size for large databases - Enable
auto_persistwhen building organizational knowledge graphs
Performance:
- Use
refresh=Falseondiscover_infrastructureto reuse cached results for pagination - Discovery can be slow for large accounts — consider running with higher concurrency
- Use
find_storeto search the catalog instead of re-scanning
Security:
- Use read-only database accounts for discovery
- Store credentials securely, never hardcode
- AWS discoverers use your default boto3 credential chain
#Next Steps
- Lineage Plugin — Track data flows and dependencies
- Neo4j Plugin — Store schemas in graph databases
- Workflows — Use catalog in multi-agent workflows