AgentSkillsCN

airflow-3x-migration

全面指南与模式,助您将 Apache Airflow 2.x 工作流迁移到 Airflow 3.x,涵盖导入变更、已弃用的功能,以及资产调度与 TaskFlow API 等全新范式。

SKILL.md
--- frontmatter
name: airflow-3x-migration
description: Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.
version: 1.0.0

Airflow 3.x Skills

Import Path Changes

Operators

python
# Airflow 2.x
from airflow.operators.python import PythonOperator

# Airflow 3.x
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator

Sensors

python
# Airflow 3.x
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.time import TimeSensor

Removed Features

RemovedReplacement
SubDagOperatorTaskGroup
packaged_dag_processorUse standard DAG loading
airflow.contrib.*Provider packages
schedule_interval paramschedule param

DAG Definition Changes

python
# Airflow 3.x preferred
from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="my_dag",
    schedule="@daily",  # Not schedule_interval
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["betting", "sports"],
) as dag:
    ...

TaskFlow API (Preferred)

python
from airflow.decorators import dag, task

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def betting_workflow():

    @task
    def download_games(sport: str) -> list:
        # Returns are automatically passed via XCom
        return fetch_games(sport)

    @task
    def update_elo(games: list) -> dict:
        return calculate_elo(games)

    # Chain tasks
    games = download_games("nba")
    ratings = update_elo(games)

betting_dag = betting_workflow()

Asset-Based Scheduling (Replaces Dataset)

python
from airflow.sdk import Asset

# Define assets
games_data = Asset("games_data")
elo_ratings = Asset("elo_ratings")

# Producer DAG
@dag(schedule="@daily")
def download_dag():
    @task(outlets=[games_data])
    def download():
        ...

# Consumer DAG - triggers when asset updates
@dag(schedule=[games_data])
def process_dag():
    @task
    def process():
        ...

Setup/Teardown Tasks

python
@task
def setup_db_connection():
    return create_connection()

@task
def cleanup_connection(conn):
    conn.close()

@task
def process_data(conn):
    ...

# Define setup/teardown relationship
with dag:
    conn = setup_db_connection()
    process_data(conn) >> cleanup_connection(conn)

    # Or use context manager style
    conn.as_setup() >> process_data(conn) >> conn.as_teardown()

DAG Versioning

python
from airflow import DAG

with DAG(
    dag_id="betting_workflow",
    version="2.0.0",  # New in 3.x
    schedule="@daily",
) as dag:
    ...

Backfill Changes

bash
# Airflow 3.x - use REST API
curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \
  -H "Content-Type: application/json" \
  -d '{"logical_date": "2024-01-15T00:00:00Z"}'

# Or use new backfill command
airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-15

New REST API Endpoints

python
import requests

# Get DAG runs
response = requests.get(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    auth=("admin", "admin")
)

# Trigger DAG
response = requests.post(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    json={"conf": {"sport": "nba"}},
    auth=("admin", "admin")
)

Edge Labels

python
from airflow.utils.edgemodifier import Label

download >> Label("success") >> process
download >> Label("failure") >> alert

Migration Checklist

  • Update all operator imports to provider packages
  • Replace schedule_interval with schedule
  • Convert SubDags to TaskGroups
  • Replace Dataset with Asset
  • Test DAG parsing with python dags/my_dag.py
  • Update docker-compose to Airflow 3.x image

Files to Reference

Airflow 3.x CLI

  • Has changed significantly since Airflow 2.
  • Please look at latest docs before running CLI commands