Back to Examples
Intermediate

ETL Pipeline Lineage Tracking and Impact Analysis

Use an autonomous agent to build and maintain a data lineage graph, answer lineage questions in plain language, and assess the impact of schema changes before deploying them.

PluginsLineageData Engineering

#Overview

Data engineering teams face the same dangerous question before every deployment: "If I change this table, what breaks?" Without lineage tracking, the answer comes from reading ETL code, querying job logs, and asking colleagues — and it's still often incomplete.

This example builds a lineage system where an agent does the work. You describe your pipeline in natural language or hand it raw SQL, and the agent builds the graph. When a change is proposed, the agent traces downstream, scores the risk, and tells you exactly what will break. The lineage plugin provides the tools; the LLM decides how to use them.

What you will build:

  • An agent that ingests ETL job descriptions and autonomously registers the lineage graph
  • A pre-deployment impact gate that blocks high-risk changes with a plain-language explanation
  • A data steward agent that answers lineage questions and diagrams data flows on demand
  • Automatic lineage capture via SQL parsing and decorators for existing codebases

#Prerequisites

bash
pip install daita-agents openai
bash
export OPENAI_API_KEY=sk-...
export DAITA_API_KEY=...       # Required for cloud deployment

#Building the Lineage Graph with an Agent

The simplest way to get a lineage graph up and running: describe your pipeline in plain language and let the agent register it. No boilerplate, no manual register_flow calls per edge.

python
import asyncio
from daita import Agent
from daita.plugins import lineage
 
async def main():
    tracker = lineage()
 
    agent = Agent(
        name="Lineage-Builder",
        model="gpt-4o-mini",
        prompt="""You are a data lineage engineer. When given pipeline descriptions,
        register the data flows using register_flow or register_pipeline.
 
        Use consistent entity ID conventions:
        - table:name for database tables
        - api:name for external API or service sources
        - model:name for ML model outputs
 
        Always include a descriptive transformation label for each flow.""",
        tools=[tracker]
    )
 
    await agent.start()
 
    # Describe your pipelines in plain language — the agent handles registration
    await agent.run("""
        Register our nightly warehouse pipelines:
 
        Customer 360 pipeline (runs at 6am daily):
        - CRM API pulls customer data into raw_customers
        - raw_customers is cleaned and deduplicated into customers_clean
        - customers_clean is joined with order history and NPS scores into customer_360
        - customer_360 feeds a churn prediction model that writes to churn_risk_scores
 
        Revenue pipeline (runs hourly):
        - Stripe API streams payment events into raw_transactions
        - raw_transactions is aggregated by day into revenue_daily
        - revenue_daily feeds MTD/QTD/YTD rollups into revenue_dashboard
    """)
 
    # Verify what was registered
    response = await agent.run(
        "Trace everything downstream of raw_customers. What systems depend on it?"
    )
    print(response)
 
    await agent.stop()
 
asyncio.run(main())

The agent calls register_pipeline for each described pipeline, infers entity IDs from the description, assigns appropriate flow types, and writes the graph — then immediately uses trace_downstream to answer the verification question.


#Pre-Deployment Impact Gate

The most valuable use of lineage is blocking dangerous changes before they ship. This pattern runs an agent as a gate in your deployment workflow: it traces downstream impact, scores risk, and returns a structured go/no-go decision.

python
import asyncio
from daita import Agent
from daita.plugins import lineage
 
IMPACT_GATE_PROMPT = """You are a data change risk assessor. When given a proposed change,
you must:
1. Call analyze_impact to assess downstream risk
2. Call trace_downstream to enumerate every affected system
3. Return a structured decision: APPROVED, CAUTION, or BLOCKED
 
Format your response as:
DECISION: [APPROVED|CAUTION|BLOCKED]
RISK: [LOW|MEDIUM|HIGH]
AFFECTED SYSTEMS: [count]
REASON: [one sentence]
RECOMMENDATION: [what the team should do next]"""
 
 
async def impact_gate(entity_id: str, change_type: str, change_description: str) -> str:
    tracker = lineage()
 
    agent = Agent(
        name="Impact-Gate",
        model="gpt-4o-mini",
        prompt=IMPACT_GATE_PROMPT,
        tools=[tracker]
    )
 
    await agent.start()
 
    decision = await agent.run(f"""
        Proposed change: {change_description}
        Entity: {entity_id}
        Change type: {change_type}
 
        Assess the downstream impact and return your decision.
    """)
 
    await agent.stop()
    return decision
 
 
async def main():
    # Scenario 1: renaming a column in a heavily-used table
    print("=== Change Request: Rename customer_id in customers_clean ===")
    decision = await impact_gate(
        entity_id="table:customers_clean",
        change_type="schema_change",
        change_description="Rename column customer_id to cust_id for naming consistency"
    )
    print(decision)
 
    print("\n=== Change Request: Deprecate legacy events table ===")
    decision = await impact_gate(
        entity_id="table:raw_events_v1",
        change_type="deprecation",
        change_description="Decommission raw_events_v1, all consumers should have migrated to v2"
    )
    print(decision)
 
 
asyncio.run(main())

Sample output:

python
=== Change Request: Rename customer_id in customers_clean ===
DECISION: BLOCKED
RISK: HIGH
AFFECTED SYSTEMS: 4
REASON: customer_360, churn_risk_scores, revenue_dashboard, and executive_kpis all
        reference customers_clean — a column rename will break all four pipelines.
RECOMMENDATION: Create a migration plan, add the new column alongside the old one,
                migrate consumers one at a time, then drop the original column.
 
=== Change Request: Deprecate legacy events table ===
DECISION: CAUTION
RISK: MEDIUM
AFFECTED SYSTEMS: 2
REASON: session_events and session_metrics still show active flows from raw_events_v1.
RECOMMENDATION: Confirm both consumers have been updated before proceeding with deprecation.

#Data Steward Agent

For day-to-day lineage questions — tracing a table's origins, generating diagrams for PR reviews, or registering flows from new pipelines — a persistent data steward agent handles everything through natural language.

python
import asyncio
from daita import Agent
from daita.plugins import lineage
 
async def main():
    tracker = lineage()
 
    steward = Agent(
        name="Data-Steward",
        model="gpt-4o-mini",
        prompt="""You are the data steward for our warehouse.
 
        You maintain an accurate lineage graph and answer questions about data flows.
        When tracing lineage, be thorough — use both trace_upstream and trace_downstream
        when the question asks about dependencies in either direction.
        When asked about proposed changes, always run analyze_impact first.
        When asked for a diagram, use export_lineage with format='mermaid'.
 
        Entity ID conventions: table:name, api:name, model:name""",
        tools=[tracker]
    )
 
    await steward.start()
 
    questions = [
        "Where does the data in revenue_dashboard ultimately come from? "
        "Trace it all the way back to the original sources.",
 
        "The ML team wants to add a new flow: reading from customer_360 and writing "
        "churn predictions to model:churn_v3. Register that flow for them.",
 
        "Generate a Mermaid diagram showing the full lineage of customer_360 "
        "in both directions.",
 
        "How risky would it be to change the schema of raw_transactions? "
        "What is the blast radius?",
    ]
 
    for question in questions:
        print(f"Q: {question}\n")
        answer = await steward.run(question)
        print(f"A: {answer}\n")
        print("-" * 60)
 
    await steward.stop()
 
asyncio.run(main())

The steward selects the right tool for each question automatically — trace_upstream for origin questions, register_flow for registration requests, export_lineage for diagrams, analyze_impact for risk assessment — without you specifying which tool to call.


#Automatic Capture from Existing SQL

For existing ETL jobs, capture_sql_lineage parses the SQL and registers flows automatically. Wire it alongside your query execution and lineage builds itself as jobs run — no agent required for this part.

python
import asyncio
from daita.plugins import lineage
 
async def run_etl_job(tracker, sql: str, description: str):
    # Execute your actual query here...
    # await db.execute(sql)
 
    # Capture lineage from the SQL automatically
    result = await tracker.capture_sql_lineage(
        sql=sql,
        transformation=description
    )
    print(f"  Captured: {result['source_tables']} -> {result['target_tables']}")
 
 
async def main():
    tracker = lineage()
 
    etl_jobs = [
        (
            """INSERT INTO orders_clean
               SELECT order_id, customer_id, amount_cents / 100.0 AS amount_usd, created_at
               FROM raw_orders
               WHERE amount_cents > 0 AND customer_id IS NOT NULL""",
            "Clean and normalize raw orders"
        ),
        (
            """INSERT INTO customer_order_summary
               SELECT c.customer_id, c.email,
                      COUNT(o.order_id) AS total_orders,
                      SUM(o.amount_usd) AS lifetime_value
               FROM orders_clean o
               JOIN customers c ON o.customer_id = c.customer_id
               GROUP BY c.customer_id, c.email""",
            "Aggregate customer order history"
        ),
    ]
 
    print("Running ETL jobs with automatic lineage capture:")
    for sql, description in etl_jobs:
        await run_etl_job(tracker, sql, description)
 
 
asyncio.run(main())

For Python transforms, the @track decorator does the same thing at the function level:

python
from daita.plugins import lineage
 
tracker = lineage()
 
@tracker.track(
    source="table:raw_events",
    target="table:session_metrics",
    transformation="Parse events, filter bots, compute session duration and conversion"
)
async def compute_session_metrics(raw_df):
    # Lineage is captured automatically when this function is called
    ...

#Deploying to Production

In production the Lineage Plugin upgrades to a managed graph backend automatically after daita push. The graph persists across invocations, multiple agents can write concurrently, and queries remain performant as the graph grows.

bash
daita push production

The entity IDs, agent prompts, and tool calls are identical in both environments. No code changes needed to move from local development to production.


#Complete Example

A self-contained run that uses an agent for the full workflow: build the graph, run a pre-change impact check, answer a lineage question, and export a diagram.

python
import asyncio
from daita import Agent
from daita.plugins import lineage
 
 
STEWARD_PROMPT = """You are a data lineage engineer and change risk assessor.
 
When registering pipelines, use entity ID conventions:
table:name, api:name, model:name
 
When assessing change impact:
1. Call analyze_impact to get the risk score
2. Call trace_downstream to enumerate affected systems
3. Give a clear APPROVED / CAUTION / BLOCKED recommendation with your reasoning
 
When asked for a diagram, use export_lineage with format='mermaid'."""
 
 
async def main():
    tracker = lineage()
 
    agent = Agent(
        name="Data-Steward",
        model="gpt-4o-mini",
        prompt=STEWARD_PROMPT,
        tools=[tracker]
    )
 
    await agent.start()
 
    # Step 1: Build the lineage graph from a pipeline description
    await agent.run("""
        Register our nightly warehouse pipeline:
        - Postgres production DB extracts into raw_orders
        - raw_orders is cleaned and normalized into orders_clean
        - orders_clean is aggregated into revenue_daily
        - revenue_daily feeds MTD/QTD/YTD rollups into executive_kpis
        Schedule: nightly at 2am
    """)
 
    # Step 2: Capture an additional flow from SQL
    await tracker.capture_sql_lineage(
        sql="""INSERT INTO at_risk_accounts
               SELECT a.account_id, a.mrr FROM accounts a
               JOIN orders_clean o ON a.account_id = o.account_id
               WHERE o.created_at < NOW() - INTERVAL '60 days'""",
        transformation="Flag accounts with no orders in 60 days"
    )
 
    # Step 3: Impact assessment for a proposed change
    impact_response = await agent.run("""
        The team wants to change the schema of orders_clean — adding a new required
        column. Assess the downstream impact and tell me if we should proceed.
    """)
    print("Impact Assessment:")
    print(impact_response)
 
    # Step 4: Generate a diagram for the PR description
    diagram_response = await agent.run(
        "Generate a Mermaid diagram showing the full upstream lineage of executive_kpis."
    )
    print("\nLineage Diagram:")
    print(diagram_response)
 
    await agent.stop()
 
 
asyncio.run(main())