Upstream Lineage: Sources
Trace the origins of data - answer "Where does this data come from?"
Lineage Investigation
Step 1: Identify the Target Type
Determine what we're tracing:
- •Table: Trace what populates this table
- •Column: Trace where this specific column comes from
- •DAG: Trace what data sources this DAG reads from
Step 2: Find the Producing DAG
Tables are typically populated by Airflow DAGs. Find the connection:
- •
Search DAGs by name: Use
list_dagsand look for DAG names matching the table name- •
load_customers->customerstable - •
etl_daily_orders->orderstable
- •
- •
Explore DAG source code: Use
get_dag_sourceto read the DAG definition- •Look for INSERT, MERGE, CREATE TABLE statements
- •Find the target table in the code
- •
Check DAG tasks: Use
list_tasksto see what operations the DAG performs
Step 3: Trace Data Sources
From the DAG code, identify source tables and systems:
SQL Sources (look for FROM clauses):
# In DAG code: SELECT * FROM source_schema.source_table # <- This is an upstream source
External Sources (look for connection references):
- •
S3Operator-> S3 bucket source - •
PostgresOperator-> Postgres database source - •
SalesforceOperator-> Salesforce API source - •
HttpOperator-> REST API source
File Sources:
- •CSV/Parquet files in object storage
- •SFTP drops
- •Local file paths
Step 4: Build the Lineage Chain
Recursively trace each source:
TARGET: analytics.orders_daily
^
+-- DAG: etl_daily_orders
^
+-- SOURCE: raw.orders (table)
| ^
| +-- DAG: ingest_orders
| ^
| +-- SOURCE: Salesforce API (external)
|
+-- SOURCE: dim.customers (table)
^
+-- DAG: load_customers
^
+-- SOURCE: PostgreSQL (external DB)
Step 5: Check Source Health
For each upstream source:
- •Tables: Check freshness with the checking-freshness skill
- •DAGs: Check recent run status with
get_dag_stats - •External systems: Note connection info from DAG code
Lineage for Columns
When tracing a specific column:
- •Find the column in the target table schema
- •Search DAG source code for references to that column name
- •Trace through transformations:
- •Direct mappings:
source.col AS target_col - •Transformations:
COALESCE(a.col, b.col) AS target_col - •Aggregations:
SUM(detail.amount) AS total_amount
- •Direct mappings:
Output: Lineage Report
Summary
One-line answer: "This table is populated by DAG X from sources Y and Z"
Lineage Diagram
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
| |
DAG: ingest_sfdc DAG: transform_sales
Source Details
| Source | Type | Connection | Freshness | Owner |
|---|---|---|---|---|
| raw.orders | Table | Internal | 2h ago | data-team |
| Salesforce | API | salesforce_conn | Real-time | sales-ops |
Transformation Chain
Describe how data flows and transforms:
- •Raw data lands in
raw.ordersvia Salesforce API sync - •DAG
transform_orderscleans and dedupes intostg.orders - •DAG
build_order_factsjoins with dimensions intofct.orders
Data Quality Implications
- •Single points of failure?
- •Stale upstream sources?
- •Complex transformation chains that could break?
Related Skills
- •Check source freshness: checking-freshness skill
- •Debug source DAG: debugging-dags skill
- •Trace downstream impacts: tracing-downstream-lineage skill
- •Add manual lineage annotations: annotating-task-lineage skill
- •Build custom lineage extractors: creating-openlineage-extractors skill