Собесов

OBenner/data-engineering: спроектировать ежедневный DAG

Кейсы и метрикиData EngineeringСредняяMiddle

Условие

Спроектируйте Airflow DAG для ежедневного ETL: выгрузить вчерашние заказы из Postgres → положить в S3 как Parquet → загрузить в DWH (Snowflake) → пересобрать dbt-марты → отправить отчёт в Slack. Учтите идемпотентность, retries, мониторинг.

Решение

Подход

Принципы хорошего DAG.

  1. Идемпотентность. Повторный запуск за тот же день не должен дублировать данные. Достигается через partition по дате и WHERE date = '{ds}' или MERGE вместо INSERT.
  2. Каждая таска делает одно действие. Отдельная задача на extract, отдельная на load, отдельная на transform.
  3. Зависимости явные через >>. Никаких «таска А пишет файл, таска Б надеется, что он там есть».
  4. Retries с экспоненциальным backoff для сетевых операций.
  5. Алерты на failures и на SLA нарушения (sla_miss_callback).

DAG-скелет.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
 
default_args = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "sla": timedelta(hours=2),
    "email_on_failure": True,
}
 
with DAG(
    dag_id="daily_orders_etl",
    default_args=default_args,
    schedule="0 3 * * *",          # 3:00 UTC ежедневно
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["etl", "orders"],
) as dag:
 
    def extract_to_s3(ds, **kwargs):
        # ds = логическая дата запуска, '2025-05-25'
        hook = PostgresHook("orders_db")
        df = hook.get_pandas_df(
            "SELECT * FROM orders WHERE order_date::date = %(ds)s",
            parameters={"ds": ds},
        )
        df.to_parquet(f"s3://lake/raw/orders/dt={ds}/data.parquet")
 
    extract = PythonOperator(
        task_id="extract_to_s3",
        python_callable=extract_to_s3,
    )
 
    copy_to_snowflake = SnowflakeOperator(
        task_id="copy_to_snowflake",
        sql="""
            COPY INTO raw.orders
            FROM 's3://lake/raw/orders/dt={{ ds }}/'
            FILE_FORMAT = (TYPE = PARQUET)
            FORCE = FALSE
        """,
    )
 
    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command="cd /opt/dbt && dbt run --select tag:daily --vars '{date: {{ ds }}}'",
    )
 
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command="cd /opt/dbt && dbt test --select tag:daily",
    )
 
    notify = SlackAPIPostOperator(
        task_id="notify_slack",
        text="Daily orders ETL finished for {{ ds }}.",
        channel="#data-alerts",
    )
 
    extract >> copy_to_snowflake >> dbt_run >> dbt_test >> notify

Идемпотентность на каждом шаге

  1. S3: путь содержит dt={{ ds }} — повторный запуск перепишет тот же файл.
  2. Snowflake COPY: FORCE = FALSE пропустит уже загруженные файлы. Альтернатива — TRUNCATE партиции перед загрузкой, или MERGE вместо COPY.
  3. dbt: модели должны быть incremental с unique_key = order_id — повторный run обновит только пересекающиеся записи.
  4. Slack: уведомления отправляются после успешного выполнения, повтор = повторное уведомление (это допустимо).

Мониторинг

  • SLA: 2 часа — если не успели, алерт.
  • Data quality: dbt_test падает при not_null / unique нарушении, что блокирует следующие шаги.
  • Row count check: дополнительная задача с порогом «вчера загрузили 100k заказов, сегодня — 10. Это аномалия» (great-expectations или ручной SQL-check).

Подводные камни

  1. catchup=True — Airflow попытается запустить DAG за все пропущенные дни. На исторических данных это может стать DDoS-атакой на Postgres. По умолчанию ставьте catchup=False.
  2. max_active_runs=1 — критично для ETL: иначе параллельные запуски будут перезаписывать друг другу данные.
  3. {{ ds }} vs {{ data_interval_start }}. В Airflow 2.x — разные семантики; для ежедневного DAG ds = логическая дата, это вчера, если schedule в 3:00.
  4. State в таске. Не хранить состояние в памяти Python-процесса. Если retry — новый процесс ничего не знает. Передавайте через XCom (для маленьких) или общий S3/storage.
  5. Не использовать PythonOperator для тяжёлой обработки — он держит Airflow worker. Лучше — KubernetesPodOperator или EmrAddStepsOperator.

Эталонный ответ

DAG с явными тасками extract → load → transform (dbt run) → test → notify; идемпотентность через partition-by-date в S3, FORCE=FALSE в COPY и unique_key в incremental dbt-моделях; retries с backoff; catchup=False, max_active_runs=1; SLA-алерты и data-quality тесты обязательны. Тяжёлая обработка вынесена в Spark/Snowflake, Airflow только оркестрирует.

Хочешь увидеть разбор?

Зарегистрируйся бесплатно — откроется развёрнутое решение этой задачи и ещё 4 на выбор.

Зарегистрироваться и увидеть разбор
Уже есть аккаунт? Войти