Daita Logo

Transformer Plugin

Named SQL transformations with versioning, parameter substitution, dry-run validation, and optional lineage capture. Stores definitions in a graph backend with in-memory fallback.

#Installation

bash
pip install 'daita-agents[data]'

#Quick Start

python
from daita import Agent
from daita.plugins import postgresql, transformer
 
db = postgresql(host="localhost", database="mydb")
tx = transformer(db=db)
 
agent = Agent(
    name="ETL Agent",
    prompt="You manage data transformations.",
    tools=[db, tx]
)
 
await agent.start()
await agent.run("Create a daily orders summary transformation and run it")

#Constructor Parameters

python
transformer(
    db=None,         # database plugin (required at execution time)
    lineage=None,    # LineagePlugin for auto lineage capture
    backend=None,    # graph backend (auto-selected if None)
)

#Parameters

  • db: Database plugin to execute transformations against. Required when calling transform_run or transform_test. Can be set after construction.
  • lineage: Optional LineagePlugin instance. When provided, lineage edges are automatically captured on every transform_run.
  • backend: Optional graph backend for persisting transformation definitions. Auto-selected from registered backends when None; falls back to an in-memory dict if none are available.

#Core Operations

#Create a Transformation

Define a named SQL transformation. Source and target tables are extracted from the SQL automatically:

python
from daita.plugins import postgresql, transformer
 
db = postgresql(host="localhost", database="mydb")
tx = transformer(db=db)
 
await tx.transform_create(
    "orders_summary",
    sql="INSERT INTO orders_summary SELECT customer_id, SUM(amount) FROM orders GROUP BY 1",
    description="Aggregate orders by customer"
)

#Run a Transformation

Execute a named transformation against the database:

python
# Simple execution
result = await tx.transform_run("orders_summary")
 
# With parameter substitution
result = await tx.transform_run(
    "orders_by_region",
    parameters={"region": "EU", "min_amount": "100"}
)

#Test (Dry Run)

Validate SQL without executing it using EXPLAIN:

python
result = await tx.transform_test("orders_summary")
# Returns: {"success": True, "valid": True, "plan": [...]}

#Version History

Every transform_run saves a version snapshot of the current SQL:

python
# List versions for a transformation
versions = await tx.transform_version("orders_summary")
# Returns: [{"index": 0, "created_at": "...", "sql": "..."}]

#Diff Between Versions

Compare SQL between two versions (integer index or "current"):

python
diff = await tx.transform_diff("orders_summary", from_version=0, to_version="current")
# Returns: {"diff": ["- INSERT INTO ...", "+ INSERT INTO ..."]}

#List All Transformations

python
transformations = await tx.transform_list()
# Returns: [{"name": "orders_summary", "description": "...", "run_count": 5, ...}]

#Versioning

Each time a transformation is run, the current SQL is snapped into a version history. Versions are stored as integer indices (0, 1, 2, …) inside the graph node's properties. The live SQL is always accessible as "current".

python
# Get version 0 vs current SQL diff
diff = await tx.transform_diff("my_transform", from_version=0, to_version="current")
 
# List available snapshots
snapshots = await tx.transform_version("my_transform")
for v in snapshots:
    print(f"v{v['index']}: {v['created_at']}")

#Parameter Substitution

Use :param_name placeholders in SQL. Pass a parameters dict at run time:

python
await tx.transform_create(
    "filtered_export",
    sql="INSERT INTO export SELECT * FROM events WHERE region = :region AND amount > :min_amount"
)
 
await tx.transform_run(
    "filtered_export",
    parameters={"region": "EU", "min_amount": "500"}
)

Parameters use :name style (not ? or $1). All values are string-substituted before execution.

#Lineage Integration

When a LineagePlugin is passed, each transform_run automatically creates lineage edges from source to target tables:

python
from daita.plugins import postgresql, lineage, transformer
 
db = postgresql(host="localhost", database="mydb")
lin = lineage()
tx = transformer(db=db, lineage=lin)
 
agent = Agent(
    name="ETL Agent",
    tools=[db, lin, tx]
)
 
# Each transform_run creates FLOWS_TO lineage edges automatically
await tx.transform_run("orders_summary")

Without a lineage plugin, a minimal regex SQL parser is used to infer table relationships, but lineage edges are not persisted.

#Using with Agents

python
from daita import Agent
from daita.plugins import postgresql, transformer
 
db = postgresql(host="localhost", database="analytics")
tx = transformer(db=db)
 
agent = Agent(
    name="ETL Agent",
    prompt="You manage data transformations. Create, test, and run SQL transformations as needed.",
    llm_provider="openai",
    model="gpt-4",
    tools=[db, tx]
)
 
await agent.start()
 
result = await agent.run("""
Create a transformation called 'daily_revenue' that inserts into
the daily_revenue table the sum of amounts from orders grouped by date,
then test it and run it.
""")
 
await agent.stop()

#Available Tools

ToolDescriptionKey Parameters
transform_createDefine a named SQL transformationname (required), sql (required), description
transform_runExecute a named transformationname (required), parameters
transform_testDry-run validate SQL using EXPLAINname (required)
transform_versionList version snapshotsname (required)
transform_diffCompare SQL between versionsname, from_version, to_version
transform_listList all defined transformationsNone

Tool Categories: transformer Tool Source: plugin

#Error Handling

python
from daita.plugins import postgresql, transformer
 
tx = transformer()
 
try:
    await tx.transform_run("nonexistent")
except KeyError as e:
    print(f"Transformation not found: {e}")
except ValueError as e:
    # Invalid identifier or missing db plugin
    print(f"Configuration error: {e}")

#Best Practices

  • Test before running: Always call transform_test before transform_run in production pipelines.
  • Use descriptive names: Transformation names become node IDs in the graph — choose stable, meaningful names.
  • Pair with lineage: Use lineage=lin to automatically track data flow across transformations.
  • Version review: Before overwriting logic, use transform_diff to understand what changed.
  • Parameter substitution over string formatting: Use :param placeholders rather than Python f-strings to keep SQL safe and reusable.

#Next Steps