Daita Logo

Data Quality Plugin

Statistical profiling, anomaly detection, freshness checks, and consolidated quality reporting for any database plugin.

#Installation

bash
pip install 'daita-agents[data]'

#Quick Start

python
from daita import Agent
from daita.plugins import postgresql, data_quality
 
db = postgresql(host="localhost", database="mydb")
dq = data_quality(db=db)
 
agent = Agent(
    name="Quality Checker",
    prompt="You monitor data quality across our database.",
    tools=[db, dq]
)
 
await agent.start()
result = await agent.run("Generate a quality report for the orders table")

#Constructor Parameters

python
data_quality(
    db=None,           # database plugin (required at tool execution time)
    backend=None,      # graph backend for persisting metrics (auto-selected if None)
    thresholds=None,   # dict of custom quality thresholds
)

#Parameters

  • db: Database plugin to run quality checks against. Required at tool execution time. Works with any BaseDatabasePlugin (PostgreSQL, MySQL, SQLite, Snowflake).
  • backend: Optional graph backend for persisting quality metrics as stable METRIC nodes. Auto-selected from registered backends when None.
  • thresholds: Optional dict of custom thresholds for anomaly detection and quality scoring.

#Profiling

Compute statistical profiles for each column in a table:

python
from daita.plugins import postgresql, data_quality
 
db = postgresql(host="localhost", database="mydb")
await db.connect()
 
dq = data_quality(db=db)
 
profile = await dq.dq_profile("orders")
# Returns:
# {
#   "table": "orders",
#   "row_count": 15420,
#   "columns": [
#     {
#       "column": "amount",
#       "null_rate": 0.02,
#       "cardinality": 8543,
#       "min": 0.99,
#       "max": 9999.99,
#       "avg": 124.50
#     },
#     ...
#   ]
# }

#Anomaly Detection

Detect statistical outliers using z-score (scipy, if available) or IQR:

python
result = await dq.dq_detect_anomaly(
    table="orders",
    column="amount",
    method="zscore"    # or "iqr"
)
# Returns: {"anomalies": [...], "count": 12, "method": "zscore"}

Methods:

  • zscore: Uses scipy.stats.zscore when available, falls back to numpy. Values with |z| > 3 are flagged.
  • iqr: Interquartile range method. Values outside Q1 - 1.5×IQR or Q3 + 1.5×IQR are flagged.

#Freshness Checks

Verify that data has been updated recently by checking a timestamp column:

python
result = await dq.dq_check_freshness(
    table="events",
    timestamp_column="created_at",
    max_age_hours=24
)
# Returns:
# {
#   "fresh": True,
#   "latest_timestamp": "2024-01-15T10:30:00Z",
#   "age_hours": 3.5,
#   "max_age_hours": 24
# }

#Quality Reports

Generate a consolidated report combining profile, anomaly counts, and a completeness score:

python
report = await dq.dq_report("orders")
# Returns:
# {
#   "table": "orders",
#   "completeness_score": 0.94,
#   "row_count": 15420,
#   "profile": {...},
#   "issues": [
#     {"type": "high_null_rate", "column": "notes", "null_rate": 0.45},
#     ...
#   ]
# }

Reports are persisted as stable METRIC graph nodes using a deterministic node ID, so repeated calls update the existing metric rather than creating duplicates.

#Using with Agents

python
from daita import Agent
from daita.plugins import postgresql, data_quality
 
db = postgresql(host="localhost", database="analytics")
dq = data_quality(db=db)
 
agent = Agent(
    name="Data Monitor",
    prompt="You monitor data quality. Profile tables, detect anomalies, and check freshness.",
    llm_provider="openai",
    model="gpt-4",
    tools=[db, dq]
)
 
await agent.start()
 
result = await agent.run("""
Check the quality of the 'transactions' table:
1. Profile all columns
2. Detect anomalies in the 'amount' column
3. Verify data was updated in the last 12 hours
4. Generate a full quality report
""")
 
await agent.stop()

#Available Tools

ToolDescriptionKey Parameters
dq_profileStatistical profile of all columnstable (required)
dq_detect_anomalyFind outliers in a numeric columntable (required), column (required), method ("zscore" or "iqr")
dq_check_freshnessVerify data recency via timestamp columntable (required), timestamp_column (required), max_age_hours
dq_reportConsolidated quality report with completeness scoretable (required)

Tool Categories: data_quality Tool Source: plugin

#Error Handling

python
from daita.plugins import data_quality
 
dq = data_quality()
 
try:
    report = await dq.dq_report("nonexistent_table")
except ValueError as e:
    # No db configured or invalid identifier
    print(f"Configuration error: {e}")
except RuntimeError as e:
    # Database query failed
    print(f"Query failed: {e}")

#Best Practices

  • Profile before building pipelines: Run dq_profile on source tables before writing transformations to understand data shape and null rates.
  • Schedule regular reports: Use dq_report on a schedule (via Scheduling) to track quality over time — metric nodes are updated in place, not duplicated.
  • Combine with Transformer: Run dq_report after transform_run to verify output quality immediately.
  • Use IQR for skewed data: If your numeric columns follow a non-normal distribution, iqr anomaly detection is more reliable than zscore.

#Next Steps