#Overview
Data engineering teams face the same dangerous question before every deployment: "If I change this table, what breaks?" Without lineage tracking, the answer comes from reading ETL code, querying job logs, and asking colleagues — and it's still often incomplete.
This example builds a lineage system where an agent does the work. You describe your pipeline in natural language or hand it raw SQL, and the agent builds the graph. When a change is proposed, the agent traces downstream, scores the risk, and tells you exactly what will break. The lineage plugin provides the tools; the LLM decides how to use them.
What you will build:
- An agent that ingests ETL job descriptions and autonomously registers the lineage graph
- A pre-deployment impact gate that blocks high-risk changes with a plain-language explanation
- A data steward agent that answers lineage questions and diagrams data flows on demand
- Automatic lineage capture via SQL parsing and decorators for existing codebases
#Prerequisites
pip install daita-agents openaiexport OPENAI_API_KEY=sk-...
export DAITA_API_KEY=... # Required for cloud deployment#Building the Lineage Graph with an Agent
The simplest way to get a lineage graph up and running: describe your pipeline in plain language and let the agent register it. No boilerplate, no manual register_flow calls per edge.
import asyncio
from daita import Agent
from daita.plugins import lineage
async def main():
tracker = lineage()
agent = Agent(
name="Lineage-Builder",
model="gpt-4o-mini",
prompt="""You are a data lineage engineer. When given pipeline descriptions,
register the data flows using register_flow or register_pipeline.
Use consistent entity ID conventions:
- table:name for database tables
- api:name for external API or service sources
- model:name for ML model outputs
Always include a descriptive transformation label for each flow.""",
tools=[tracker]
)
await agent.start()
# Describe your pipelines in plain language — the agent handles registration
await agent.run("""
Register our nightly warehouse pipelines:
Customer 360 pipeline (runs at 6am daily):
- CRM API pulls customer data into raw_customers
- raw_customers is cleaned and deduplicated into customers_clean
- customers_clean is joined with order history and NPS scores into customer_360
- customer_360 feeds a churn prediction model that writes to churn_risk_scores
Revenue pipeline (runs hourly):
- Stripe API streams payment events into raw_transactions
- raw_transactions is aggregated by day into revenue_daily
- revenue_daily feeds MTD/QTD/YTD rollups into revenue_dashboard
""")
# Verify what was registered
response = await agent.run(
"Trace everything downstream of raw_customers. What systems depend on it?"
)
print(response)
await agent.stop()
asyncio.run(main())The agent calls register_pipeline for each described pipeline, infers entity IDs from the description, assigns appropriate flow types, and writes the graph — then immediately uses trace_downstream to answer the verification question.
#Pre-Deployment Impact Gate
The most valuable use of lineage is blocking dangerous changes before they ship. This pattern runs an agent as a gate in your deployment workflow: it traces downstream impact, scores risk, and returns a structured go/no-go decision.
import asyncio
from daita import Agent
from daita.plugins import lineage
IMPACT_GATE_PROMPT = """You are a data change risk assessor. When given a proposed change,
you must:
1. Call analyze_impact to assess downstream risk
2. Call trace_downstream to enumerate every affected system
3. Return a structured decision: APPROVED, CAUTION, or BLOCKED
Format your response as:
DECISION: [APPROVED|CAUTION|BLOCKED]
RISK: [LOW|MEDIUM|HIGH]
AFFECTED SYSTEMS: [count]
REASON: [one sentence]
RECOMMENDATION: [what the team should do next]"""
async def impact_gate(entity_id: str, change_type: str, change_description: str) -> str:
tracker = lineage()
agent = Agent(
name="Impact-Gate",
model="gpt-4o-mini",
prompt=IMPACT_GATE_PROMPT,
tools=[tracker]
)
await agent.start()
decision = await agent.run(f"""
Proposed change: {change_description}
Entity: {entity_id}
Change type: {change_type}
Assess the downstream impact and return your decision.
""")
await agent.stop()
return decision
async def main():
# Scenario 1: renaming a column in a heavily-used table
print("=== Change Request: Rename customer_id in customers_clean ===")
decision = await impact_gate(
entity_id="table:customers_clean",
change_type="schema_change",
change_description="Rename column customer_id to cust_id for naming consistency"
)
print(decision)
print("\n=== Change Request: Deprecate legacy events table ===")
decision = await impact_gate(
entity_id="table:raw_events_v1",
change_type="deprecation",
change_description="Decommission raw_events_v1, all consumers should have migrated to v2"
)
print(decision)
asyncio.run(main())Sample output:
=== Change Request: Rename customer_id in customers_clean ===
DECISION: BLOCKED
RISK: HIGH
AFFECTED SYSTEMS: 4
REASON: customer_360, churn_risk_scores, revenue_dashboard, and executive_kpis all
reference customers_clean — a column rename will break all four pipelines.
RECOMMENDATION: Create a migration plan, add the new column alongside the old one,
migrate consumers one at a time, then drop the original column.
=== Change Request: Deprecate legacy events table ===
DECISION: CAUTION
RISK: MEDIUM
AFFECTED SYSTEMS: 2
REASON: session_events and session_metrics still show active flows from raw_events_v1.
RECOMMENDATION: Confirm both consumers have been updated before proceeding with deprecation.#Data Steward Agent
For day-to-day lineage questions — tracing a table's origins, generating diagrams for PR reviews, or registering flows from new pipelines — a persistent data steward agent handles everything through natural language.
import asyncio
from daita import Agent
from daita.plugins import lineage
async def main():
tracker = lineage()
steward = Agent(
name="Data-Steward",
model="gpt-4o-mini",
prompt="""You are the data steward for our warehouse.
You maintain an accurate lineage graph and answer questions about data flows.
When tracing lineage, be thorough — use both trace_upstream and trace_downstream
when the question asks about dependencies in either direction.
When asked about proposed changes, always run analyze_impact first.
When asked for a diagram, use export_lineage with format='mermaid'.
Entity ID conventions: table:name, api:name, model:name""",
tools=[tracker]
)
await steward.start()
questions = [
"Where does the data in revenue_dashboard ultimately come from? "
"Trace it all the way back to the original sources.",
"The ML team wants to add a new flow: reading from customer_360 and writing "
"churn predictions to model:churn_v3. Register that flow for them.",
"Generate a Mermaid diagram showing the full lineage of customer_360 "
"in both directions.",
"How risky would it be to change the schema of raw_transactions? "
"What is the blast radius?",
]
for question in questions:
print(f"Q: {question}\n")
answer = await steward.run(question)
print(f"A: {answer}\n")
print("-" * 60)
await steward.stop()
asyncio.run(main())The steward selects the right tool for each question automatically — trace_upstream for origin questions, register_flow for registration requests, export_lineage for diagrams, analyze_impact for risk assessment — without you specifying which tool to call.
#Automatic Capture from Existing SQL
For existing ETL jobs, capture_sql_lineage parses the SQL and registers flows automatically. Wire it alongside your query execution and lineage builds itself as jobs run — no agent required for this part.
import asyncio
from daita.plugins import lineage
async def run_etl_job(tracker, sql: str, description: str):
# Execute your actual query here...
# await db.execute(sql)
# Capture lineage from the SQL automatically
result = await tracker.capture_sql_lineage(
sql=sql,
transformation=description
)
print(f" Captured: {result['source_tables']} -> {result['target_tables']}")
async def main():
tracker = lineage()
etl_jobs = [
(
"""INSERT INTO orders_clean
SELECT order_id, customer_id, amount_cents / 100.0 AS amount_usd, created_at
FROM raw_orders
WHERE amount_cents > 0 AND customer_id IS NOT NULL""",
"Clean and normalize raw orders"
),
(
"""INSERT INTO customer_order_summary
SELECT c.customer_id, c.email,
COUNT(o.order_id) AS total_orders,
SUM(o.amount_usd) AS lifetime_value
FROM orders_clean o
JOIN customers c ON o.customer_id = c.customer_id
GROUP BY c.customer_id, c.email""",
"Aggregate customer order history"
),
]
print("Running ETL jobs with automatic lineage capture:")
for sql, description in etl_jobs:
await run_etl_job(tracker, sql, description)
asyncio.run(main())For Python transforms, the @track decorator does the same thing at the function level:
from daita.plugins import lineage
tracker = lineage()
@tracker.track(
source="table:raw_events",
target="table:session_metrics",
transformation="Parse events, filter bots, compute session duration and conversion"
)
async def compute_session_metrics(raw_df):
# Lineage is captured automatically when this function is called
...#Deploying to Production
In production the Lineage Plugin upgrades to a managed graph backend automatically after daita push. The graph persists across invocations, multiple agents can write concurrently, and queries remain performant as the graph grows.
daita push productionThe entity IDs, agent prompts, and tool calls are identical in both environments. No code changes needed to move from local development to production.
#Complete Example
A self-contained run that uses an agent for the full workflow: build the graph, run a pre-change impact check, answer a lineage question, and export a diagram.
import asyncio
from daita import Agent
from daita.plugins import lineage
STEWARD_PROMPT = """You are a data lineage engineer and change risk assessor.
When registering pipelines, use entity ID conventions:
table:name, api:name, model:name
When assessing change impact:
1. Call analyze_impact to get the risk score
2. Call trace_downstream to enumerate affected systems
3. Give a clear APPROVED / CAUTION / BLOCKED recommendation with your reasoning
When asked for a diagram, use export_lineage with format='mermaid'."""
async def main():
tracker = lineage()
agent = Agent(
name="Data-Steward",
model="gpt-4o-mini",
prompt=STEWARD_PROMPT,
tools=[tracker]
)
await agent.start()
# Step 1: Build the lineage graph from a pipeline description
await agent.run("""
Register our nightly warehouse pipeline:
- Postgres production DB extracts into raw_orders
- raw_orders is cleaned and normalized into orders_clean
- orders_clean is aggregated into revenue_daily
- revenue_daily feeds MTD/QTD/YTD rollups into executive_kpis
Schedule: nightly at 2am
""")
# Step 2: Capture an additional flow from SQL
await tracker.capture_sql_lineage(
sql="""INSERT INTO at_risk_accounts
SELECT a.account_id, a.mrr FROM accounts a
JOIN orders_clean o ON a.account_id = o.account_id
WHERE o.created_at < NOW() - INTERVAL '60 days'""",
transformation="Flag accounts with no orders in 60 days"
)
# Step 3: Impact assessment for a proposed change
impact_response = await agent.run("""
The team wants to change the schema of orders_clean — adding a new required
column. Assess the downstream impact and tell me if we should proceed.
""")
print("Impact Assessment:")
print(impact_response)
# Step 4: Generate a diagram for the PR description
diagram_response = await agent.run(
"Generate a Mermaid diagram showing the full upstream lineage of executive_kpis."
)
print("\nLineage Diagram:")
print(diagram_response)
await agent.stop()
asyncio.run(main())