Lineage Plugin
Data lineage and flow tracking for understanding data dependencies across systems.
#Quick Start
python
from daita import Agent
from daita.plugins import lineage
# Create lineage plugin
lin = lineage()
# Agent uses lineage tools autonomously
agent = Agent(
name="Lineage Analyst",
prompt="You are a data lineage expert. Help users understand data flows and dependencies.",
tools=[lin]
)
await agent.start()
result = await agent.run("Trace the lineage of the orders table")#Direct Usage
The plugin can be used directly for programmatic lineage tracking:
python
from daita.plugins import lineage
lin = lineage()
# Register a data flow
await lin.register_flow(
source_id="table:raw_orders",
target_id="table:processed_orders",
flow_type="TRANSFORMS",
transformation="Clean and deduplicate orders"
)
# Trace lineage
lineage_data = await lin.trace_lineage("table:processed_orders", direction="both")
print(f"Upstream sources: {lineage_data['upstream_count']}")
print(f"Downstream consumers: {lineage_data['downstream_count']}")#Configuration Parameters
python
lineage(
storage: Optional[Any] = None,
organization_id: Optional[int] = None,
backend: Optional[Any] = None,
risk_thresholds: Optional[Dict[str, int]] = None
)#Parameters
storage(Any): Optional legacy storage backend for persistent lineageorganization_id(int): Optional organization ID for multi-tenant storagebackend(Any): Optional graph backend override. IfNone, the backend is selected automatically based on the runtime environment (local or cloud)risk_thresholds(dict): Custom risk scoring thresholds foranalyze_impact. Keys:"HIGH"and"MEDIUM", values are integer downstream counts. Defaults to{"HIGH": 20, "MEDIUM": 5}
#Lineage Operations
#Register Data Flows
Track data flows between entities:
python
from daita.plugins import lineage
lin = lineage()
# Register ETL flow
await lin.register_flow(
source_id="table:raw_data",
target_id="table:clean_data",
flow_type="FLOWS_TO",
transformation="Data cleaning and validation",
schedule="0 */6 * * *" # Every 6 hours
)Flow Types:
FLOWS_TO- General data flowSYNCS_TO- Data synchronizationTRIGGERS- Event-driven transformationTRANSFORMS- Data transformation
#Trace Lineage
Discover upstream sources and downstream consumers:
python
from daita.plugins import lineage
lin = lineage()
# Trace both directions
lineage_data = await lin.trace_lineage(
entity_id="table:users",
direction="both",
max_depth=5
)
# Upstream only
upstream = await lin.trace_lineage("table:analytics", direction="upstream", max_depth=3)
print(f"Sources: {upstream['upstream_count']}")
# Downstream only
downstream = await lin.trace_lineage("table:raw_events", direction="downstream")
print(f"Consumers: {downstream['downstream_count']}")#Impact Analysis
Analyze the impact of changes to data entities:
python
from daita.plugins import lineage
lin = lineage()
# Analyze impact of schema change
impact = await lin.analyze_impact(
entity_id="table:customers",
change_type="schema_change"
)
print(f"Risk level: {impact['risk_level']}")
print(f"Directly affected: {impact['directly_affected_count']}")
print(f"Total affected: {impact['total_affected_count']}")
print(f"Recommendation: {impact['recommendation']}")Change Types:
schema_change- Database schema modificationsdeprecation- Entity deprecationdeletion- Entity removaldata_quality- Data quality issues
#Pipeline Registration
Register multi-step pipelines:
python
from daita.plugins import lineage
lin = lineage()
# Define pipeline
await lin.register_pipeline(
name="customer_analytics",
steps=[
{
"source_id": "table:raw_customers",
"target_id": "table:clean_customers",
"transformation": "Clean and validate"
},
{
"source_id": "table:clean_customers",
"target_id": "table:customer_metrics",
"transformation": "Calculate metrics"
},
{
"source_id": "table:customer_metrics",
"target_id": "table:customer_segments",
"transformation": "Segment customers"
}
],
schedule="0 2 * * *" # Daily at 2 AM
)#SQL Lineage Parsing
Automatically extract lineage from SQL queries:
python
from daita.plugins import lineage
lin = lineage()
# Parse SQL to extract dependencies
parsed = lin.parse_sql_lineage(
sql="INSERT INTO analytics SELECT * FROM raw_data WHERE created_at > '2024-01-01'",
context_table="analytics"
)
print(f"Source tables: {parsed['source_tables']}")
print(f"Target tables: {parsed['target_tables']}")
print(f"Is read-only: {parsed['is_read_only']}")
# Automatically capture lineage from query execution
await lin.capture_sql_lineage(
sql="INSERT INTO orders_summary SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id",
transformation="Aggregate order counts by customer"
)#Export Lineage
Export lineage as visual diagrams:
python
from daita.plugins import lineage
lin = lineage()
# Export as Mermaid diagram
mermaid = await lin.export_lineage(
entity_id="table:orders",
format="mermaid",
direction="both"
)
print(mermaid['diagram'])
# Export as DOT format
dot = await lin.export_lineage(
entity_id="table:orders",
format="dot",
direction="downstream"
)
print(dot['diagram'])#Decorator Pattern
Track lineage automatically using function decorators:
python
from daita.plugins import lineage
lin = lineage()
@lin.track(source="table:raw_data", target="table:processed_data")
async def transform_data(input_df):
# Process data
return processed_df
# Lineage automatically recorded when function executes
result = await transform_data(df)#Using with Agents
The Lineage plugin exposes tracking operations as tools that agents can use autonomously:
python
from daita import Agent
from daita.plugins import lineage
# Create lineage plugin
lin = lineage()
# Agent with lineage tools
agent = Agent(
name="Lineage Tracker",
prompt="You are a data lineage expert. Help users understand data flows and dependencies.",
llm_provider="openai",
model="gpt-4",
tools=[lin]
)
await agent.start()
# Agent autonomously traces lineage and analyzes impact
result = await agent.run("""
Trace the lineage of the customers table.
Then analyze the impact if we change the schema.
What downstream systems would be affected?
""")
print(result)
await agent.stop()#Available Tools
The Lineage plugin exposes these tools to agents:
| Tool | Description | Parameters |
|---|---|---|
| trace_lineage | Trace full lineage | entity_id, direction, max_depth |
| trace_upstream | Get upstream sources | entity_id, max_depth |
| trace_downstream | Get downstream consumers | entity_id, max_depth |
| register_flow | Register data flow | source_id, target_id, flow_type |
| register_pipeline | Register multi-step pipeline | name, steps, schedule |
| analyze_impact | Analyze change impact | entity_id, change_type |
| export_lineage | Export lineage diagram | entity_id, format, direction |
| prune_stale_lineage | Remove old lineage edges | max_age_hours |
#Best Practices
Flow Registration:
- Use consistent entity ID naming (e.g.,
table:name,api:endpoint) - Register flows as they're created, not retrospectively
- Include descriptive transformations for clarity
Impact Analysis:
- Run impact analysis before schema changes
- Track high-risk changes with additional monitoring
- Use recommendations to plan migrations
Performance:
- Limit max_depth for large lineage graphs
- Use specific directions (upstream/downstream) when possible
- Consider storage backends for production use
#Next Steps
- Catalog Plugin - Discover database schemas
- Orchestrator Plugin - Coordinate multi-agent workflows
- Workflows - Use lineage in workflow orchestration