Собесов

DataLearn DE-101: Airflow DAG для ежедневной загрузки заказов

Кейсы и метрикиOrchestrationСредняяMiddle

Условие

Нужно каждый день в 03:00 МСК выгружать новые заказы из Postgres за вчерашний день, складывать сырьё в S3, потом грузить в Snowflake и пересчитывать витрину mart_orders_daily. Напишите Airflow DAG. Опишите, как обрабатываются ошибки и backfill.

Решение

Подход

DAG из четырёх шагов с idempotent поведением (можно перезапускать) и партиционированием по ds (Airflow execution date):

extract_pg → upload_s3 → copy_into_snowflake → refresh_mart

DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.python import PythonOperator
import pandas as pd, io
 
default_args = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": timedelta(minutes=10),
    "email_on_failure": True,
    "email": ["data-alerts@company.io"],
}
 
def extract_pg(**ctx):
    ds = ctx["ds"]                                  # YYYY-MM-DD
    sql = """
        SELECT order_id, user_id, created_at, amount, status
        FROM public.orders
        WHERE created_at::date = %(ds)s
    """
    df = PostgresHook("pg_oltp").get_pandas_df(sql, parameters={"ds": ds})
    ctx["ti"].xcom_push(key="rows", value=len(df))
    buf = io.BytesIO()
    df.to_parquet(buf, index=False)
    return buf.getvalue()
 
def upload_s3(**ctx):
    body = ctx["ti"].xcom_pull(task_ids="extract_pg")
    key  = f"orders/dt={ctx['ds']}/orders.parquet"
    S3Hook("s3_default").load_bytes(body, key=key, bucket_name="bronze", replace=True)
 
with DAG(
    dag_id="orders_daily",
    start_date=datetime(2025, 1, 1),
    schedule="0 0 * * *",            # 03:00 МСК ≈ 00:00 UTC
    catchup=True,
    max_active_runs=1,
    default_args=default_args,
    tags=["orders", "etl"],
) as dag:
 
    extract = PythonOperator(task_id="extract_pg",  python_callable=extract_pg)
    upload  = PythonOperator(task_id="upload_s3",   python_callable=upload_s3)
 
    copy = SnowflakeOperator(
        task_id="copy_into_snowflake",
        snowflake_conn_id="snowflake_default",
        sql="""
            BEGIN;
            DELETE FROM raw.orders WHERE dt = '{{ ds }}';
            COPY INTO raw.orders
            FROM @bronze_stage/orders/dt={{ ds }}/
            FILE_FORMAT = (TYPE = PARQUET) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
            COMMIT;
        """,
    )
 
    refresh = SnowflakeOperator(
        task_id="refresh_mart",
        snowflake_conn_id="snowflake_default",
        sql="""
            MERGE INTO mart.orders_daily t
            USING (
                SELECT date_trunc('day', created_at)::date AS dt,
                       COUNT(*)                            AS n_orders,
                       SUM(amount)                         AS gmv,
                       COUNT(DISTINCT user_id)             AS dau
                FROM raw.orders
                WHERE dt = '{{ ds }}'
                GROUP BY 1
            ) s
            ON t.dt = s.dt
            WHEN MATCHED THEN UPDATE SET
                 n_orders = s.n_orders, gmv = s.gmv, dau = s.dau, updated_at = CURRENT_TIMESTAMP
            WHEN NOT MATCHED THEN INSERT VALUES (s.dt, s.n_orders, s.gmv, s.dau, CURRENT_TIMESTAMP);
        """,
    )
 
    extract >> upload >> copy >> refresh

Идемпотентность

  • DELETE WHERE dt = '{{ ds }}' перед COPY INTO — повторный запуск не дублирует строки.
  • MERGE в витрину — update-or-insert, повторный запуск безопасен.
  • S3 ключ с dt=... партицией — повторная заливка перетирает партицию.

Backfill

# одиночная дата
airflow dags backfill orders_daily -s 2025-04-01 -e 2025-04-01
 
# диапазон с ограничением параллелизма
airflow dags backfill orders_daily -s 2025-01-01 -e 2025-04-30 --reset-dagruns

С catchup=True Airflow сам прогонит DAG за пропущенные даты.

Подводные камни

  1. catchup=True без max_active_runs=1 — при первом запуске DAG может стартануть 365 параллельных runs и положить Postgres.
  2. schedule_interval vs start_date — DAG за ds=2025-05-26 запустится в начале следующего интервала, т.е. в 00:00 UTC 27 мая.
  3. Большие данные через XCom — анти-паттерн. Передавать через S3-ключ, а не байты.
  4. Часовые пояса: Airflow по умолчанию в UTC. Хочешь МСК — pendulum.timezone("Europe/Moscow") в start_date.
  5. Idempotency при retry: если extract_pg упал после загрузки в S3, повторный запуск не должен дублировать. Поэтому S3 ключ детерминированный по ds.
  6. SLA / alerts: sla=timedelta(hours=4) на DAG; алерт в Slack/Telegram через on_failure_callback.
  7. Параметризация ds: использовать Jinja-шаблоны ({{ ds }}), а не datetime.now() — иначе backfill сломается.

Эталонный ответ

DAG из четырёх задач: extract из Postgres → upload в S3 (партиция dt=ds) → DELETE + COPY INTO в Snowflake → MERGE в витрину. Идемпотентность — через детерминированные ключи S3, DELETE WHERE dt = ds и MERGE. Backfill — airflow dags backfill с catchup=True и max_active_runs=1. Алерты — on_failure_callback, retries=3, SLA на DAG.

Хочешь увидеть разбор?

Зарегистрируйся бесплатно — откроется развёрнутое решение этой задачи и ещё 4 на выбор.

Зарегистрироваться и увидеть разбор
Уже есть аккаунт? Войти