Lineage Plugin

Data lineage and flow tracking for understanding data dependencies across systems.

#Quick Start

python
from daita import Agent
from daita.plugins import lineage
 
# Create lineage plugin
lin = lineage()
 
# Agent uses lineage tools autonomously
agent = Agent(
    name="Lineage Analyst",
    prompt="You are a data lineage expert. Help users understand data flows and dependencies.",
    tools=[lin]
)
 
await agent.start()
result = await agent.run("Trace the lineage of the orders table")

#Direct Usage

The plugin can be used directly for programmatic lineage tracking:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Register a data flow
await lin.register_flow(
    source_id="table:raw_orders",
    target_id="table:processed_orders",
    flow_type="TRANSFORMS",
    transformation="Clean and deduplicate orders"
)
 
# Trace lineage
lineage_data = await lin.trace_lineage("table:processed_orders", direction="both")
print(f"Upstream sources: {lineage_data['upstream_count']}")
print(f"Downstream consumers: {lineage_data['downstream_count']}")

#Configuration Parameters

python
lineage(
    storage: Optional[Any] = None,
    organization_id: Optional[int] = None,
    backend: Optional[Any] = None,
    risk_thresholds: Optional[Dict[str, int]] = None
)

#Parameters

  • storage (Any): Optional legacy storage backend for persistent lineage
  • organization_id (int): Optional organization ID for multi-tenant storage
  • backend (Any): Optional graph backend override. If None, the backend is selected automatically based on the runtime environment (local or cloud)
  • risk_thresholds (dict): Custom risk scoring thresholds for analyze_impact. Keys: "HIGH" and "MEDIUM", values are integer downstream counts. Defaults to {"HIGH": 20, "MEDIUM": 5}

#Lineage Operations

#Register Data Flows

Track data flows between entities:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Register ETL flow
await lin.register_flow(
    source_id="table:raw_data",
    target_id="table:clean_data",
    flow_type="FLOWS_TO",
    transformation="Data cleaning and validation",
    schedule="0 */6 * * *"  # Every 6 hours
)

Flow Types:

  • FLOWS_TO - General data flow
  • SYNCS_TO - Data synchronization
  • TRIGGERS - Event-driven transformation
  • TRANSFORMS - Data transformation

#Trace Lineage

Discover upstream sources and downstream consumers:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Trace both directions
lineage_data = await lin.trace_lineage(
    entity_id="table:users",
    direction="both",
    max_depth=5
)
 
# Upstream only
upstream = await lin.trace_lineage("table:analytics", direction="upstream", max_depth=3)
print(f"Sources: {upstream['upstream_count']}")
 
# Downstream only
downstream = await lin.trace_lineage("table:raw_events", direction="downstream")
print(f"Consumers: {downstream['downstream_count']}")

#Impact Analysis

Analyze the impact of changes to data entities:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Analyze impact of schema change
impact = await lin.analyze_impact(
    entity_id="table:customers",
    change_type="schema_change"
)
 
print(f"Risk level: {impact['risk_level']}")
print(f"Directly affected: {impact['directly_affected_count']}")
print(f"Total affected: {impact['total_affected_count']}")
print(f"Recommendation: {impact['recommendation']}")

Change Types:

  • schema_change - Database schema modifications
  • deprecation - Entity deprecation
  • deletion - Entity removal
  • data_quality - Data quality issues

#Pipeline Registration

Register multi-step pipelines:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Define pipeline
await lin.register_pipeline(
    name="customer_analytics",
    steps=[
        {
            "source_id": "table:raw_customers",
            "target_id": "table:clean_customers",
            "transformation": "Clean and validate"
        },
        {
            "source_id": "table:clean_customers",
            "target_id": "table:customer_metrics",
            "transformation": "Calculate metrics"
        },
        {
            "source_id": "table:customer_metrics",
            "target_id": "table:customer_segments",
            "transformation": "Segment customers"
        }
    ],
    schedule="0 2 * * *"  # Daily at 2 AM
)

#SQL Lineage Parsing

Automatically extract lineage from SQL queries:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Parse SQL to extract dependencies
parsed = lin.parse_sql_lineage(
    sql="INSERT INTO analytics SELECT * FROM raw_data WHERE created_at > '2024-01-01'",
    context_table="analytics"
)
 
print(f"Source tables: {parsed['source_tables']}")
print(f"Target tables: {parsed['target_tables']}")
print(f"Is read-only: {parsed['is_read_only']}")
 
# Automatically capture lineage from query execution
await lin.capture_sql_lineage(
    sql="INSERT INTO orders_summary SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id",
    transformation="Aggregate order counts by customer"
)

#Export Lineage

Export lineage as visual diagrams:

python
from daita.plugins import lineage
 
lin = lineage()
 
# Export as Mermaid diagram
mermaid = await lin.export_lineage(
    entity_id="table:orders",
    format="mermaid",
    direction="both"
)
print(mermaid['diagram'])
 
# Export as DOT format
dot = await lin.export_lineage(
    entity_id="table:orders",
    format="dot",
    direction="downstream"
)
print(dot['diagram'])

#Decorator Pattern

Track lineage automatically using function decorators:

python
from daita.plugins import lineage
 
lin = lineage()
 
@lin.track(source="table:raw_data", target="table:processed_data")
async def transform_data(input_df):
    # Process data
    return processed_df
 
# Lineage automatically recorded when function executes
result = await transform_data(df)

#Using with Agents

The Lineage plugin exposes tracking operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import lineage
 
# Create lineage plugin
lin = lineage()
 
# Agent with lineage tools
agent = Agent(
    name="Lineage Tracker",
    prompt="You are a data lineage expert. Help users understand data flows and dependencies.",
    llm_provider="openai",
    model="gpt-4",
    tools=[lin]
)
 
await agent.start()
 
# Agent autonomously traces lineage and analyzes impact
result = await agent.run("""
Trace the lineage of the customers table.
Then analyze the impact if we change the schema.
What downstream systems would be affected?
""")
 
print(result)
await agent.stop()

#Available Tools

The Lineage plugin exposes these tools to agents:

ToolDescriptionParameters
trace_lineageTrace full lineageentity_id, direction, max_depth
trace_upstreamGet upstream sourcesentity_id, max_depth
trace_downstreamGet downstream consumersentity_id, max_depth
register_flowRegister data flowsource_id, target_id, flow_type
register_pipelineRegister multi-step pipelinename, steps, schedule
analyze_impactAnalyze change impactentity_id, change_type
export_lineageExport lineage diagramentity_id, format, direction
prune_stale_lineageRemove old lineage edgesmax_age_hours

#Best Practices

Flow Registration:

  • Use consistent entity ID naming (e.g., table:name, api:endpoint)
  • Register flows as they're created, not retrospectively
  • Include descriptive transformations for clarity

Impact Analysis:

  • Run impact analysis before schema changes
  • Track high-risk changes with additional monitoring
  • Use recommendations to plan migrations

Performance:

  • Limit max_depth for large lineage graphs
  • Use specific directions (upstream/downstream) when possible
  • Consider storage backends for production use

#Next Steps