AgentSkillsCN

creating-openlineage-extractors

为 Airflow 算子自定义 OpenLineage 提取器。适用于用户需要从不支持的算子或第三方算子中获取血缘信息、希望实现列级血缘,或需要超越入口/出口功能的复杂提取逻辑时。

SKILL.md
--- frontmatter
name: creating-openlineage-extractors
description: Create custom OpenLineage extractors for Airflow operators. Use when the user needs lineage from unsupported or third-party operators, wants column-level lineage, or needs complex extraction logic beyond what inlets/outlets provide.

Creating OpenLineage Extractors

This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that do not have built-in support.

When to Use Each Approach

ScenarioApproach
Operator you own or maintainOpenLineage Methods (recommended)
Third-party operator you cannot modifyCustom Extractor
Need column-level lineageOpenLineage Methods or Custom Extractor
Complex extraction logicOpenLineage Methods or Custom Extractor
Simple table-level lineageInlets/Outlets (simplest, lowest priority)

Approach 1: OpenLineage Methods (recommended)

python
from airflow.models import BaseOperator

class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0

    def execute(self, context):
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

Approach 2: Custom Extractors

python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset

class MyOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://mydb:5432", name=f"public.{source_table}")],
            outputs=[Dataset(namespace="postgres://mydb:5432", name=f"public.{target_table}")],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        return None

Registering Extractors

Configuration file:

ini
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

Environment variable:

bash
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'