Data Assertions
ItemAssertion enforces row-level data quality rules at query time. Any violation raises a DataQualityError — a permanent error that agents will not retry.
#Overview
ItemAssertion lets you declare expectations about every row returned by a query. When you pass assertions to query_checked(), the framework evaluates each rule against every row and raises DataQualityError if any rule fails — before the result ever reaches your agent or application code.
from daita.plugins import sqlite
from daita import ItemAssertion
async with sqlite(path="transactions.db") as db:
rows = await db.query_checked(
"SELECT * FROM transactions",
assertions=[
ItemAssertion(lambda r: r["amount"] > 0, "All amounts must be positive"),
ItemAssertion(lambda r: r["customer_id"] is not None, "customer_id required"),
],
)#ItemAssertion
from daita import ItemAssertion
ItemAssertion(
check: Callable[[dict], bool],
description: str,
)#Parameters
check(callable): A function that receives a single row as a dict and returnsTrueif the row passes,Falseif it violates the assertion.description(str): Human-readable description of the rule. Included in error messages and theviolationslist onDataQualityError.
#Examples
from daita import ItemAssertion
# Field presence
ItemAssertion(lambda r: r.get("email") is not None, "Email required")
# Range check
ItemAssertion(lambda r: 0 < r["amount"] <= 1_000_000, "Amount out of range")
# Enum check
ItemAssertion(
lambda r: r["status"] in ("pending", "shipped", "delivered"),
"Invalid order status"
)
# Type check
ItemAssertion(lambda r: isinstance(r["user_id"], int), "user_id must be an integer")
# Cross-field rule
ItemAssertion(
lambda r: r["shipped_at"] is None or r["shipped_at"] >= r["created_at"],
"shipped_at cannot be before created_at"
)#query_checked()
query_checked() is available on all database plugins. It runs the query and evaluates assertions against every row before returning results.
async def query_checked(
sql: str,
params=None,
assertions: list[ItemAssertion] = None,
) -> list[dict]- When
assertionsisNoneor empty, it behaves identically toquery(). - All assertions are evaluated against all rows before any error is raised — so the
DataQualityErrorcarries the full list of failures, not just the first. - Raises
DataQualityErrorif any assertion has one or more violations.
#Supported plugins
query_checked() is available on every BaseDatabasePlugin:
- SQLite
- PostgreSQL
- MySQL
- Snowflake
#DataQualityError
DataQualityError is raised when one or more assertions fail. It is a PermanentError — agents will not retry the query, because retrying the same query against the same data won't fix bad data.
from daita import DataQualityError
try:
rows = await db.query_checked("SELECT * FROM orders", assertions=[...])
except DataQualityError as e:
print(e.message) # "Data quality violations: ..."
print(e.violations) # list of violation dicts
print(e.table) # table name if provided#Violation structure
Each entry in e.violations is a dict:
{
"description": "All amounts must be positive", # assertion description
"violation_count": 3, # rows that failed
"total_items": 100, # total rows checked
"sample": [ # up to 3 failing rows
{"id": 42, "amount": -10, ...},
...
]
}#Behaviour Details
All violations are collected before raising. If multiple assertions fail, DataQualityError.violations contains an entry for each — you see the full picture in one error, not just the first failure.
Assertions that raise internally are skipped, not propagated. If a check callable throws an exception (e.g. KeyError on a missing field), that assertion is logged at DEBUG level and skipped. Other assertions still run.
DataQualityError is permanent. Agent retry policies will not re-run a tool that raises DataQualityError. The violation must be fixed at the data level.
#Usage with Agents
Assertions are most useful in agent tools that need to guarantee data quality before acting on results:
from daita import Agent, tool, ItemAssertion
from daita.plugins import postgresql
import os
db = postgresql(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
username=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
)
@tool
async def get_active_orders() -> list:
"""Fetch active orders, guaranteed clean."""
return await db.query_checked(
"SELECT * FROM orders WHERE status = 'active'",
assertions=[
ItemAssertion(lambda r: r["total"] > 0, "Order total must be positive"),
ItemAssertion(lambda r: r["customer_id"] is not None, "customer_id required"),
],
)
agent = Agent(
name="Order Processor",
prompt="Process active orders. Do not proceed if data quality fails.",
tools=[get_active_orders],
)#Assertions vs DataQuality Plugin
ItemAssertion + query_checked() | DataQualityPlugin | |
|---|---|---|
| Purpose | Enforcement — guarantee rules at consumption time | Analysis — profile, detect anomalies, report |
| When it runs | At the point of the query call | On-demand, typically by an agent |
| On failure | Raises DataQualityError (blocks execution) | Returns a report (agent decides what to do) |
| Best for | Critical data guarantees in tools and pipelines | Exploratory quality checks and monitoring |
Use ItemAssertion when bad data should stop execution. Use DataQualityPlugin when you want the agent to reason about data quality and decide.
#Next Steps
- DataQuality Plugin — Analytical profiling, anomaly detection, and quality reports
- SQLite Plugin — Lightweight database with
query_checked()support - Error Handling — How
PermanentErrorand retry policies work