Условие
Таблица orders в Postgres растёт по 1 млн строк в день. Полная перезагрузка нереалистична. Какие есть стратегии инкрементальной загрузки? Когда какая работает? Что делать с «опоздавшими» данными и удалениями?
Решение
Стратегии
| Стратегия | Как работает | Источник | Late data | Deletes |
|---|---|---|---|---|
| Append-only | WHERE id > max(id) |
autoincrement id | нет | нет |
| Watermark по updated_at | WHERE updated_at > last_seen - δ |
колонка с updated_at |
окно δ | нет |
| CDC (log-based) | Debezium/log mining | binlog/WAL | да | да |
| Snapshot diff (hash) | сравнить hash строк | полная выгрузка | да | да |
| Full reload | каждый раз TRUNCATE + COPY |
любой | да | да |
Append-only
INSERT INTO target (id, ...)
SELECT id, ...
FROM source
WHERE id > (SELECT COALESCE(MAX(id), 0) FROM target);Прост, но не ловит UPDATE/DELETE и опоздания. Годится для event-логов, не для бизнес-сущностей.
Watermark с overlap
INSERT INTO staging
SELECT *
FROM source
WHERE updated_at > (SELECT MAX(updated_at) FROM target) - INTERVAL '6 hour';
MERGE INTO target t USING staging s ON t.id = s.id
WHEN MATCHED AND s.updated_at > t.updated_at THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT VALUES (...);- INTERVAL '6 hour' — overlap для опаздавших строк. Не ловит delete.
CDC (Debezium)
Слушаем WAL Postgres / binlog MySQL → Kafka → consumer льёт op = c/u/d в DWH. Видим всё: insert, update, delete.
postgres ──WAL──► Debezium ──Kafka──► Sink (S3/BQ) ──merge──► DWH
Плюсы: near-real-time, ловит deletes. Минусы: операционная сложность, доп. сервисы.
Snapshot diff
Каждый день — полная выгрузка с hash:
SELECT id,
md5(concat_ws('|', col1, col2, col3, col4)) AS row_hash
FROM source;Сравниваем со вчерашним снимком: новые id → insert, изменённые hash → update, пропавшие id → delete. Дороже, чем watermark, но проще CDC.
Опоздания
«Заказ создан 1 мая, но updated_at обновился только 5 мая» — watermark на updated_at ловит. Опаснее: если опоздавшая строка не имеет свежего updated_at, нужны:
- window backfill (раз в неделю — полная сверка),
- CDC (ловит UPDATE),
out-of-order toleranceна стороне источника (запрет ретро-вставок).
Удаления
watermark их не видит. Варианты:
- soft-delete (
is_deleted = true) — самое простое, источник должен поддерживать. - CDC —
op = 'd'. - Snapshot diff — есть в источнике, нет в target → пометить как удалённый.
Реализация в dbt
-- models/orders.sql
{{ config(
materialized = 'incremental',
unique_key = 'order_id',
on_schema_change = 'sync_all_columns'
) }}
SELECT * FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT COALESCE(MAX(updated_at), '1970-01-01') FROM {{ this }})
- INTERVAL '6 hour'
{% endif %}Подводные камни
MAX(updated_at)после неудачной загрузки уйдёт «вперёд», и часть данных пропустится — храните last_loaded в отдельной control-таблице.updated_atне обновляется при пересчётах через триггер — частая боль в legacy-схемах.- CDC и tombstones: Kafka после compaction оставляет
nullpayload — не ломайте sink на NULL. - Snapshot diff на 1 млрд строк — дорого; делать на партициях.
- Часовые пояса:
updated_atв UTC vs локальном часовом поясе источника — рассинхрон на 3-12 часов. - DST переходы: окно с overlap должно быть «с запасом» (не 1h, а 6h).
- deletes без CDC и soft-delete не отловить инкрементально — придётся snapshot diff раз в неделю.
Эталонный ответ
Стратегии: append-only (для event-логов), watermark по updated_at с overlap (для бизнес-сущностей с soft-delete), CDC (для near-real-time и удалений), snapshot diff (когда нет ни updated_at, ни CDC). Опоздания — overlap-окно или периодический snapshot. Удаления — soft-delete, CDC или snapshot diff. В dbt — materialized=incremental с unique_key и watermark в is_incremental().