Условие
Спроектируйте Airflow DAG для ежедневного ETL: выгрузить вчерашние заказы из Postgres → положить в S3 как Parquet → загрузить в DWH (Snowflake) → пересобрать dbt-марты → отправить отчёт в Slack. Учтите идемпотентность, retries, мониторинг.
Решение
Подход
Принципы хорошего DAG.
- Идемпотентность. Повторный запуск за тот же день не должен дублировать данные. Достигается через partition по дате и
WHERE date = '{ds}'илиMERGEвместоINSERT. - Каждая таска делает одно действие. Отдельная задача на extract, отдельная на load, отдельная на transform.
- Зависимости явные через
>>. Никаких «таска А пишет файл, таска Б надеется, что он там есть». - Retries с экспоненциальным backoff для сетевых операций.
- Алерты на failures и на SLA нарушения (
sla_miss_callback).
DAG-скелет.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-eng",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"sla": timedelta(hours=2),
"email_on_failure": True,
}
with DAG(
dag_id="daily_orders_etl",
default_args=default_args,
schedule="0 3 * * *", # 3:00 UTC ежедневно
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
tags=["etl", "orders"],
) as dag:
def extract_to_s3(ds, **kwargs):
# ds = логическая дата запуска, '2025-05-25'
hook = PostgresHook("orders_db")
df = hook.get_pandas_df(
"SELECT * FROM orders WHERE order_date::date = %(ds)s",
parameters={"ds": ds},
)
df.to_parquet(f"s3://lake/raw/orders/dt={ds}/data.parquet")
extract = PythonOperator(
task_id="extract_to_s3",
python_callable=extract_to_s3,
)
copy_to_snowflake = SnowflakeOperator(
task_id="copy_to_snowflake",
sql="""
COPY INTO raw.orders
FROM 's3://lake/raw/orders/dt={{ ds }}/'
FILE_FORMAT = (TYPE = PARQUET)
FORCE = FALSE
""",
)
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/dbt && dbt run --select tag:daily --vars '{date: {{ ds }}}'",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="cd /opt/dbt && dbt test --select tag:daily",
)
notify = SlackAPIPostOperator(
task_id="notify_slack",
text="Daily orders ETL finished for {{ ds }}.",
channel="#data-alerts",
)
extract >> copy_to_snowflake >> dbt_run >> dbt_test >> notifyИдемпотентность на каждом шаге
- S3: путь содержит
dt={{ ds }}— повторный запуск перепишет тот же файл. - Snowflake COPY:
FORCE = FALSEпропустит уже загруженные файлы. Альтернатива —TRUNCATEпартиции перед загрузкой, илиMERGEвместоCOPY. - dbt: модели должны быть incremental с
unique_key = order_id— повторный run обновит только пересекающиеся записи. - Slack: уведомления отправляются после успешного выполнения, повтор = повторное уведомление (это допустимо).
Мониторинг
- SLA: 2 часа — если не успели, алерт.
- Data quality:
dbt_testпадает приnot_null/uniqueнарушении, что блокирует следующие шаги. - Row count check: дополнительная задача с порогом «вчера загрузили 100k заказов, сегодня — 10. Это аномалия» (great-expectations или ручной SQL-check).
Подводные камни
catchup=True— Airflow попытается запустить DAG за все пропущенные дни. На исторических данных это может стать DDoS-атакой на Postgres. По умолчанию ставьтеcatchup=False.max_active_runs=1— критично для ETL: иначе параллельные запуски будут перезаписывать друг другу данные.{{ ds }}vs{{ data_interval_start }}. В Airflow 2.x — разные семантики; для ежедневного DAGds= логическая дата, это вчера, если schedule в 3:00.- State в таске. Не хранить состояние в памяти Python-процесса. Если retry — новый процесс ничего не знает. Передавайте через XCom (для маленьких) или общий S3/storage.
- Не использовать PythonOperator для тяжёлой обработки — он держит Airflow worker. Лучше — KubernetesPodOperator или EmrAddStepsOperator.
Эталонный ответ
DAG с явными тасками extract → load → transform (dbt run) → test → notify; идемпотентность через partition-by-date в S3, FORCE=FALSE в COPY и unique_key в incremental dbt-моделях; retries с backoff; catchup=False, max_active_runs=1; SLA-алерты и data-quality тесты обязательны. Тяжёлая обработка вынесена в Spark/Snowflake, Airflow только оркестрирует.