Transformer Plugin
Versioned SQL transformation management for agents — create, run, test, diff, and list named SQL transforms backed by a persistent graph store.
#Quick Start
from daita import Agent
from daita.plugins import sqlite, transformer
db = sqlite(path="app.db")
tx = transformer(db=db)
agent = Agent(
name="Transform Agent",
prompt="You are a data engineer. Create and manage SQL transformations.",
tools=[db, tx]
)
await agent.start()
result = await agent.run("Create a daily_revenue transformation that aggregates orders by day")#Configuration
transformer(
db=None, # Any BaseDatabasePlugin — required at execution time
lineage=None, # Optional LineagePlugin for auto lineage capture on transform_run
backend=None, # Optional graph backend (auto-selected at agent start if None)
)#Parameters
db(BaseDatabasePlugin): The database plugin to run transformations against. Required when tools are called.lineage(LineagePlugin): Optional lineage plugin. When provided, source/target tables are automatically captured as lineage edges on eachtransform_run.backend: Optional graph backend for persisting transformation definitions and version history. Auto-selected at agent start if not provided.
#Usage
#Create a Transformation
Register a named SQL transformation:
from daita.plugins import sqlite, transformer
async with sqlite(path="app.db") as db:
tx = transformer(db=db)
result = await tx.transform_create(
name="daily_revenue",
sql="INSERT INTO daily_revenue SELECT date(created_at), SUM(total) FROM orders GROUP BY 1",
description="Aggregates order totals by day",
)
# Source and target tables are auto-parsed from the SQL#Run a Transformation
Execute a registered transformation against the database:
result = await tx.transform_run(db, "daily_revenue")
# Executes the SQL and returns rows affected + execution metadata#Test a Transformation (Dry Run)
Validate the transformation SQL using EXPLAIN without executing it:
result = await tx.transform_test(db, "daily_revenue")
# Sends EXPLAIN <sql> to the database — returns the query plan or an error#Snapshot a Version
Save the current SQL as a new version snapshot:
result = await tx.transform_version("daily_revenue")
# Appends the current SQL as a new snapshot; returns the snapshot index#Diff Versions
Compare two versions of a transformation:
diff = await tx.transform_diff("daily_revenue", version_a=1, version_b=2)
# Returns a line-by-line diff of the SQL between versions#List Transformations
List all registered transformations:
transforms = await tx.transform_list()
# Returns names, descriptions, source/target tables, and version counts#Using with Agents
#Tool-Based Integration (Recommended)
from daita import Agent
from daita.plugins import postgresql, transformer, lineage
import os
db = postgresql(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
username=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
)
lin = lineage() # Optional — for automatic lineage capture
tx = transformer(db=db, lineage=lin)
agent = Agent(
name="Data Engineer",
prompt="You are a data engineer. Build and manage SQL transformations.",
tools=[db, tx, lin]
)
await agent.start()
result = await agent.run("""
1. Create a transformation called weekly_signups that counts new users per week
2. Test it to validate the SQL
3. Run it to populate the weekly_signups table
""")
await agent.stop()#Available Tools
| Tool | Description | Key Parameters |
|---|---|---|
transform_create | Register a named SQL transformation | name, sql (required); description (optional) |
transform_run | Execute a transformation against the database | name (required); parameters (optional :param substitutions) |
transform_test | Validate SQL via EXPLAIN without executing | name (required) |
transform_version | Snapshot the current SQL as a new version | name (required) |
transform_diff | Diff SQL between two versions | name, version_a, version_b (required; use integer index or "current") |
transform_list | List all registered transformations | None |
#Lineage Integration
When a LineagePlugin is provided, transform_run automatically captures source → target table relationships as lineage edges:
from daita.plugins import postgresql, transformer, lineage
db = postgresql(host="localhost", database="warehouse")
lin = lineage()
tx = transformer(db=db, lineage=lin)
# Source/target tables are auto-parsed from the SQL at create time
# Running the transform automatically writes lineage edges: orders → daily_revenue
await tx.transform_run(db, "daily_revenue")See the Lineage Plugin docs for more on lineage capture and querying.
#Version Control Pattern
Transformations support an evolving SQL history. Call transform_version to snapshot the current SQL before updating it, then use transform_diff to compare:
# Create initial transformation
await tx.transform_create("revenue", sql="SELECT date, SUM(total) FROM orders GROUP BY 1")
# Snapshot current SQL as version 0 before changing it
await tx.transform_version("revenue")
# Update the SQL (overwrites the live SQL, snapshot is preserved)
await tx.transform_create("revenue", sql="SELECT date, SUM(total), COUNT(*) FROM orders GROUP BY 1")
# Snapshot updated SQL as version 1
await tx.transform_version("revenue")
# Diff snapshot 0 vs snapshot 1
diff = await tx.transform_diff("revenue", version_a=0, version_b=1)
# Diff snapshot 0 vs the live unsaved SQL
diff = await tx.transform_diff("revenue", version_a=0, version_b="current")#Error Handling
from daita.plugins import sqlite, transformer
db = sqlite(path="app.db")
tx = transformer(db=db)
try:
await tx.transform_run("nonexistent")
except ValueError as e:
print(f"Transform not found: {e}")
except Exception as e:
print(f"Execution error: {e}")#Best Practices
- Always
transform_testbeforetransform_runin production pipelines - Write SQL with clear
INSERT INTO/SELECT FROMpatterns — source and target tables are auto-parsed from SQL for lineage - Use
transform_versionto snapshot SQL before updating it, so diffs remain meaningful - Use descriptive
nameanddescriptionvalues so agents can discover and reason about available transforms - Pair with
DataQualityPluginto profile target tables after running transformations
#Next Steps
- DataQuality Plugin — Profile and validate output tables after transforms
- Lineage Plugin — Automatic lineage capture for data pipelines
- SQLite Plugin — Lightweight database to pair with Transformer
- Plugin Overview — Learn about other plugins