Daita Logo

Transformer Plugin

Versioned SQL transformation management for agents — create, run, test, diff, and list named SQL transforms backed by a persistent graph store.

#Quick Start

python
from daita import Agent
from daita.plugins import sqlite, transformer
 
db = sqlite(path="app.db")
tx = transformer(db=db)
 
agent = Agent(
    name="Transform Agent",
    prompt="You are a data engineer. Create and manage SQL transformations.",
    tools=[db, tx]
)
 
await agent.start()
result = await agent.run("Create a daily_revenue transformation that aggregates orders by day")

#Configuration

python
transformer(
    db=None,       # Any BaseDatabasePlugin — required at execution time
    lineage=None,  # Optional LineagePlugin for auto lineage capture on transform_run
    backend=None,  # Optional graph backend (auto-selected at agent start if None)
)

#Parameters

  • db (BaseDatabasePlugin): The database plugin to run transformations against. Required when tools are called.
  • lineage (LineagePlugin): Optional lineage plugin. When provided, source/target tables are automatically captured as lineage edges on each transform_run.
  • backend: Optional graph backend for persisting transformation definitions and version history. Auto-selected at agent start if not provided.

#Usage

#Create a Transformation

Register a named SQL transformation:

python
from daita.plugins import sqlite, transformer
 
async with sqlite(path="app.db") as db:
    tx = transformer(db=db)
    result = await tx.transform_create(
        name="daily_revenue",
        sql="INSERT INTO daily_revenue SELECT date(created_at), SUM(total) FROM orders GROUP BY 1",
        description="Aggregates order totals by day",
    )
    # Source and target tables are auto-parsed from the SQL

#Run a Transformation

Execute a registered transformation against the database:

python
result = await tx.transform_run(db, "daily_revenue")
# Executes the SQL and returns rows affected + execution metadata

#Test a Transformation (Dry Run)

Validate the transformation SQL using EXPLAIN without executing it:

python
result = await tx.transform_test(db, "daily_revenue")
# Sends EXPLAIN <sql> to the database — returns the query plan or an error

#Snapshot a Version

Save the current SQL as a new version snapshot:

python
result = await tx.transform_version("daily_revenue")
# Appends the current SQL as a new snapshot; returns the snapshot index

#Diff Versions

Compare two versions of a transformation:

python
diff = await tx.transform_diff("daily_revenue", version_a=1, version_b=2)
# Returns a line-by-line diff of the SQL between versions

#List Transformations

List all registered transformations:

python
transforms = await tx.transform_list()
# Returns names, descriptions, source/target tables, and version counts

#Using with Agents

python
from daita import Agent
from daita.plugins import postgresql, transformer, lineage
import os
 
db = postgresql(
    host=os.getenv("DB_HOST"),
    database=os.getenv("DB_NAME"),
    username=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
)
lin = lineage()  # Optional — for automatic lineage capture
tx = transformer(db=db, lineage=lin)
 
agent = Agent(
    name="Data Engineer",
    prompt="You are a data engineer. Build and manage SQL transformations.",
    tools=[db, tx, lin]
)
 
await agent.start()
result = await agent.run("""
    1. Create a transformation called weekly_signups that counts new users per week
    2. Test it to validate the SQL
    3. Run it to populate the weekly_signups table
""")
await agent.stop()

#Available Tools

ToolDescriptionKey Parameters
transform_createRegister a named SQL transformationname, sql (required); description (optional)
transform_runExecute a transformation against the databasename (required); parameters (optional :param substitutions)
transform_testValidate SQL via EXPLAIN without executingname (required)
transform_versionSnapshot the current SQL as a new versionname (required)
transform_diffDiff SQL between two versionsname, version_a, version_b (required; use integer index or "current")
transform_listList all registered transformationsNone

#Lineage Integration

When a LineagePlugin is provided, transform_run automatically captures source → target table relationships as lineage edges:

python
from daita.plugins import postgresql, transformer, lineage
 
db = postgresql(host="localhost", database="warehouse")
lin = lineage()
tx = transformer(db=db, lineage=lin)
 
# Source/target tables are auto-parsed from the SQL at create time
# Running the transform automatically writes lineage edges: orders → daily_revenue
await tx.transform_run(db, "daily_revenue")

See the Lineage Plugin docs for more on lineage capture and querying.

#Version Control Pattern

Transformations support an evolving SQL history. Call transform_version to snapshot the current SQL before updating it, then use transform_diff to compare:

python
# Create initial transformation
await tx.transform_create("revenue", sql="SELECT date, SUM(total) FROM orders GROUP BY 1")
 
# Snapshot current SQL as version 0 before changing it
await tx.transform_version("revenue")
 
# Update the SQL (overwrites the live SQL, snapshot is preserved)
await tx.transform_create("revenue", sql="SELECT date, SUM(total), COUNT(*) FROM orders GROUP BY 1")
 
# Snapshot updated SQL as version 1
await tx.transform_version("revenue")
 
# Diff snapshot 0 vs snapshot 1
diff = await tx.transform_diff("revenue", version_a=0, version_b=1)
 
# Diff snapshot 0 vs the live unsaved SQL
diff = await tx.transform_diff("revenue", version_a=0, version_b="current")

#Error Handling

python
from daita.plugins import sqlite, transformer
 
db = sqlite(path="app.db")
tx = transformer(db=db)
 
try:
    await tx.transform_run("nonexistent")
except ValueError as e:
    print(f"Transform not found: {e}")
except Exception as e:
    print(f"Execution error: {e}")

#Best Practices

  • Always transform_test before transform_run in production pipelines
  • Write SQL with clear INSERT INTO/SELECT FROM patterns — source and target tables are auto-parsed from SQL for lineage
  • Use transform_version to snapshot SQL before updating it, so diffs remain meaningful
  • Use descriptive name and description values so agents can discover and reason about available transforms
  • Pair with DataQualityPlugin to profile target tables after running transformations

#Next Steps