Transformer Plugin
Named SQL transformations with versioning, parameter substitution, dry-run validation, and optional lineage capture. Stores definitions in a graph backend with in-memory fallback.
#Installation
pip install 'daita-agents[data]'#Quick Start
from daita import Agent
from daita.plugins import postgresql, transformer
db = postgresql(host="localhost", database="mydb")
tx = transformer(db=db)
agent = Agent(
name="ETL Agent",
prompt="You manage data transformations.",
tools=[db, tx]
)
await agent.start()
await agent.run("Create a daily orders summary transformation and run it")#Constructor Parameters
transformer(
db=None, # database plugin (required at execution time)
lineage=None, # LineagePlugin for auto lineage capture
backend=None, # graph backend (auto-selected if None)
)#Parameters
db: Database plugin to execute transformations against. Required when callingtransform_runortransform_test. Can be set after construction.lineage: OptionalLineagePlugininstance. When provided, lineage edges are automatically captured on everytransform_run.backend: Optional graph backend for persisting transformation definitions. Auto-selected from registered backends whenNone; falls back to an in-memory dict if none are available.
#Core Operations
#Create a Transformation
Define a named SQL transformation. Source and target tables are extracted from the SQL automatically:
from daita.plugins import postgresql, transformer
db = postgresql(host="localhost", database="mydb")
tx = transformer(db=db)
await tx.transform_create(
"orders_summary",
sql="INSERT INTO orders_summary SELECT customer_id, SUM(amount) FROM orders GROUP BY 1",
description="Aggregate orders by customer"
)#Run a Transformation
Execute a named transformation against the database:
# Simple execution
result = await tx.transform_run("orders_summary")
# With parameter substitution
result = await tx.transform_run(
"orders_by_region",
parameters={"region": "EU", "min_amount": "100"}
)#Test (Dry Run)
Validate SQL without executing it using EXPLAIN:
result = await tx.transform_test("orders_summary")
# Returns: {"success": True, "valid": True, "plan": [...]}#Version History
Every transform_run saves a version snapshot of the current SQL:
# List versions for a transformation
versions = await tx.transform_version("orders_summary")
# Returns: [{"index": 0, "created_at": "...", "sql": "..."}]#Diff Between Versions
Compare SQL between two versions (integer index or "current"):
diff = await tx.transform_diff("orders_summary", from_version=0, to_version="current")
# Returns: {"diff": ["- INSERT INTO ...", "+ INSERT INTO ..."]}#List All Transformations
transformations = await tx.transform_list()
# Returns: [{"name": "orders_summary", "description": "...", "run_count": 5, ...}]#Versioning
Each time a transformation is run, the current SQL is snapped into a version history. Versions are stored as integer indices (0, 1, 2, …) inside the graph node's properties. The live SQL is always accessible as "current".
# Get version 0 vs current SQL diff
diff = await tx.transform_diff("my_transform", from_version=0, to_version="current")
# List available snapshots
snapshots = await tx.transform_version("my_transform")
for v in snapshots:
print(f"v{v['index']}: {v['created_at']}")#Parameter Substitution
Use :param_name placeholders in SQL. Pass a parameters dict at run time:
await tx.transform_create(
"filtered_export",
sql="INSERT INTO export SELECT * FROM events WHERE region = :region AND amount > :min_amount"
)
await tx.transform_run(
"filtered_export",
parameters={"region": "EU", "min_amount": "500"}
)Parameters use :name style (not ? or $1). All values are string-substituted before execution.
#Lineage Integration
When a LineagePlugin is passed, each transform_run automatically creates lineage edges from source to target tables:
from daita.plugins import postgresql, lineage, transformer
db = postgresql(host="localhost", database="mydb")
lin = lineage()
tx = transformer(db=db, lineage=lin)
agent = Agent(
name="ETL Agent",
tools=[db, lin, tx]
)
# Each transform_run creates FLOWS_TO lineage edges automatically
await tx.transform_run("orders_summary")Without a lineage plugin, a minimal regex SQL parser is used to infer table relationships, but lineage edges are not persisted.
#Using with Agents
from daita import Agent
from daita.plugins import postgresql, transformer
db = postgresql(host="localhost", database="analytics")
tx = transformer(db=db)
agent = Agent(
name="ETL Agent",
prompt="You manage data transformations. Create, test, and run SQL transformations as needed.",
llm_provider="openai",
model="gpt-4",
tools=[db, tx]
)
await agent.start()
result = await agent.run("""
Create a transformation called 'daily_revenue' that inserts into
the daily_revenue table the sum of amounts from orders grouped by date,
then test it and run it.
""")
await agent.stop()#Available Tools
| Tool | Description | Key Parameters |
|---|---|---|
transform_create | Define a named SQL transformation | name (required), sql (required), description |
transform_run | Execute a named transformation | name (required), parameters |
transform_test | Dry-run validate SQL using EXPLAIN | name (required) |
transform_version | List version snapshots | name (required) |
transform_diff | Compare SQL between versions | name, from_version, to_version |
transform_list | List all defined transformations | None |
Tool Categories: transformer
Tool Source: plugin
#Error Handling
from daita.plugins import postgresql, transformer
tx = transformer()
try:
await tx.transform_run("nonexistent")
except KeyError as e:
print(f"Transformation not found: {e}")
except ValueError as e:
# Invalid identifier or missing db plugin
print(f"Configuration error: {e}")#Best Practices
- Test before running: Always call
transform_testbeforetransform_runin production pipelines. - Use descriptive names: Transformation names become node IDs in the graph — choose stable, meaningful names.
- Pair with lineage: Use
lineage=linto automatically track data flow across transformations. - Version review: Before overwriting logic, use
transform_diffto understand what changed. - Parameter substitution over string formatting: Use
:paramplaceholders rather than Python f-strings to keep SQL safe and reusable.
#Next Steps
- Data Quality Plugin — Profile and validate data after transformations
- Lineage Plugin — Track data provenance across transformations
- PostgreSQL Plugin — Primary database backend for transformations
- Plugin Overview — All available plugins