Условие
Нужно каждый день в 03:00 МСК выгружать новые заказы из Postgres за вчерашний день, складывать сырьё в S3, потом грузить в Snowflake и пересчитывать витрину mart_orders_daily. Напишите Airflow DAG. Опишите, как обрабатываются ошибки и backfill.
Решение
Подход
DAG из четырёх шагов с idempotent поведением (можно перезапускать) и партиционированием по ds (Airflow execution date):
extract_pg → upload_s3 → copy_into_snowflake → refresh_mart
DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.python import PythonOperator
import pandas as pd, io
default_args = {
"owner": "data-eng",
"retries": 3,
"retry_delay": timedelta(minutes=10),
"email_on_failure": True,
"email": ["data-alerts@company.io"],
}
def extract_pg(**ctx):
ds = ctx["ds"] # YYYY-MM-DD
sql = """
SELECT order_id, user_id, created_at, amount, status
FROM public.orders
WHERE created_at::date = %(ds)s
"""
df = PostgresHook("pg_oltp").get_pandas_df(sql, parameters={"ds": ds})
ctx["ti"].xcom_push(key="rows", value=len(df))
buf = io.BytesIO()
df.to_parquet(buf, index=False)
return buf.getvalue()
def upload_s3(**ctx):
body = ctx["ti"].xcom_pull(task_ids="extract_pg")
key = f"orders/dt={ctx['ds']}/orders.parquet"
S3Hook("s3_default").load_bytes(body, key=key, bucket_name="bronze", replace=True)
with DAG(
dag_id="orders_daily",
start_date=datetime(2025, 1, 1),
schedule="0 0 * * *", # 03:00 МСК ≈ 00:00 UTC
catchup=True,
max_active_runs=1,
default_args=default_args,
tags=["orders", "etl"],
) as dag:
extract = PythonOperator(task_id="extract_pg", python_callable=extract_pg)
upload = PythonOperator(task_id="upload_s3", python_callable=upload_s3)
copy = SnowflakeOperator(
task_id="copy_into_snowflake",
snowflake_conn_id="snowflake_default",
sql="""
BEGIN;
DELETE FROM raw.orders WHERE dt = '{{ ds }}';
COPY INTO raw.orders
FROM @bronze_stage/orders/dt={{ ds }}/
FILE_FORMAT = (TYPE = PARQUET) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
COMMIT;
""",
)
refresh = SnowflakeOperator(
task_id="refresh_mart",
snowflake_conn_id="snowflake_default",
sql="""
MERGE INTO mart.orders_daily t
USING (
SELECT date_trunc('day', created_at)::date AS dt,
COUNT(*) AS n_orders,
SUM(amount) AS gmv,
COUNT(DISTINCT user_id) AS dau
FROM raw.orders
WHERE dt = '{{ ds }}'
GROUP BY 1
) s
ON t.dt = s.dt
WHEN MATCHED THEN UPDATE SET
n_orders = s.n_orders, gmv = s.gmv, dau = s.dau, updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN INSERT VALUES (s.dt, s.n_orders, s.gmv, s.dau, CURRENT_TIMESTAMP);
""",
)
extract >> upload >> copy >> refreshИдемпотентность
DELETE WHERE dt = '{{ ds }}'передCOPY INTO— повторный запуск не дублирует строки.MERGEв витрину — update-or-insert, повторный запуск безопасен.- S3 ключ с
dt=...партицией — повторная заливка перетирает партицию.
Backfill
# одиночная дата
airflow dags backfill orders_daily -s 2025-04-01 -e 2025-04-01
# диапазон с ограничением параллелизма
airflow dags backfill orders_daily -s 2025-01-01 -e 2025-04-30 --reset-dagrunsС catchup=True Airflow сам прогонит DAG за пропущенные даты.
Подводные камни
catchup=Trueбезmax_active_runs=1— при первом запуске DAG может стартануть 365 параллельных runs и положить Postgres.schedule_intervalvsstart_date— DAG заds=2025-05-26запустится в начале следующего интервала, т.е. в 00:00 UTC 27 мая.- Большие данные через XCom — анти-паттерн. Передавать через S3-ключ, а не байты.
- Часовые пояса: Airflow по умолчанию в UTC. Хочешь МСК —
pendulum.timezone("Europe/Moscow")вstart_date. - Idempotency при retry: если
extract_pgупал после загрузки в S3, повторный запуск не должен дублировать. Поэтому S3 ключ детерминированный поds. - SLA / alerts:
sla=timedelta(hours=4)на DAG; алерт в Slack/Telegram черезon_failure_callback. - Параметризация ds: использовать Jinja-шаблоны (
{{ ds }}), а неdatetime.now()— иначе backfill сломается.
Эталонный ответ
DAG из четырёх задач: extract из Postgres → upload в S3 (партиция dt=ds) → DELETE + COPY INTO в Snowflake → MERGE в витрину. Идемпотентность — через детерминированные ключи S3, DELETE WHERE dt = ds и MERGE. Backfill — airflow dags backfill с catchup=True и max_active_runs=1. Алерты — on_failure_callback, retries=3, SLA на DAG.