Собесов

DataLearn DE-101: Загрузка данных в Snowflake через COPY INTO

SQLSnowflakeСредняяMiddle

Условие

В S3 каждый день складываются Parquet-файлы заказов в s3://bronze/orders/dt=YYYY-MM-DD/*.parquet. Настройте загрузку в Snowflake-таблицу raw.orders так, чтобы:

  1. Использовался external stage с IAM-ролью.
  2. Каждый файл загружался ровно один раз (идемпотентность).
  3. При ошибке в одной строке — fail только этой строки, не всей загрузки.
  4. Был автоматический trigger через Snowpipe.

Решение

Шаги

-- 1. Storage integration (one-time, by admin)
CREATE OR REPLACE STORAGE INTEGRATION s3_bronze
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = 'S3'
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/snowflake-bronze'
    STORAGE_ALLOWED_LOCATIONS = ('s3://bronze/');
 
-- 2. File format
CREATE OR REPLACE FILE FORMAT ff_parquet
    TYPE = PARQUET
    COMPRESSION = SNAPPY;
 
-- 3. External stage
CREATE OR REPLACE STAGE bronze_orders_stage
    URL = 's3://bronze/orders/'
    STORAGE_INTEGRATION = s3_bronze
    FILE_FORMAT = ff_parquet;
 
-- 4. Target table
CREATE OR REPLACE TABLE raw.orders (
    order_id      BIGINT,
    user_id       BIGINT,
    created_at    TIMESTAMP_NTZ,
    amount        NUMBER(18, 2),
    status        STRING,
    dt            DATE,                          -- partition column
    _loaded_at    TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

COPY INTO с идемпотентностью

COPY INTO raw.orders (order_id, user_id, created_at, amount, status, dt)
FROM (
    SELECT
        $1:order_id::BIGINT,
        $1:user_id::BIGINT,
        $1:created_at::TIMESTAMP_NTZ,
        $1:amount::NUMBER(18, 2),
        $1:status::STRING,
        TO_DATE(REGEXP_SUBSTR(METADATA$FILENAME, 'dt=(\\d{4}-\\d{2}-\\d{2})', 1, 1, 'e', 1))
    FROM @bronze_orders_stage
)
PATTERN = '.*orders/dt=.*\\.parquet'
ON_ERROR = 'CONTINUE'                  -- битые строки скипаются
FORCE = FALSE                          -- не перезагружать уже видимые файлы
MATCH_BY_COLUMN_NAME = NONE;
  • FORCE = FALSE (default) → Snowflake помнит загруженные файлы 64 дня и не дублирует.
  • ON_ERROR = 'CONTINUE' → проблемная строка попадает в LOAD_HISTORY с пометкой, остальные грузятся.
  • PATTERN фильтрует только нужные файлы внутри stage.

Альтернатива — MERGE для повторной загрузки той же даты

Если файл переписали (исправленные данные), FORCE = TRUE или явный DELETE + COPY:

BEGIN;
DELETE FROM raw.orders WHERE dt = '2025-05-26';
COPY INTO raw.orders
FROM @bronze_orders_stage/dt=2025-05-26/
FILE_FORMAT = (FORMAT_NAME = ff_parquet)
FORCE = TRUE;
COMMIT;

Snowpipe для автозагрузки

CREATE OR REPLACE PIPE pipe_orders
    AUTO_INGEST = TRUE
    AS
COPY INTO raw.orders (order_id, user_id, created_at, amount, status, dt)
FROM (
    SELECT $1:order_id::BIGINT,
           $1:user_id::BIGINT,
           $1:created_at::TIMESTAMP_NTZ,
           $1:amount::NUMBER(18,2),
           $1:status::STRING,
           TO_DATE(REGEXP_SUBSTR(METADATA$FILENAME,'dt=(\\d{4}-\\d{2}-\\d{2})',1,1,'e',1))
    FROM @bronze_orders_stage
)
ON_ERROR = 'CONTINUE';
 
-- получить ARN SNS-уведомлений
DESC PIPE pipe_orders;

На S3 настраивается S3 Event Notification → SNS → Snowflake. Файл попадает в bucket — через ~1 минуту строки в таблице.

Контроль загрузок

-- что и когда грузилось
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
    TABLE_NAME => 'raw.orders',
    START_TIME => DATEADD(day, -7, CURRENT_TIMESTAMP())));
 
-- ошибочные строки
SELECT * FROM TABLE(VALIDATE(raw.orders, JOB_ID => '_last'));

Подводные камни

  1. ON_ERROR = 'ABORT_STATEMENT' (default) — одна битая строка отменит всю партию. Для прода чаще CONTINUE + мониторинг VALIDATE.
  2. FORCE = TRUE убивает дедупликацию по filename — использовать только при осознанной перезагрузке.
  3. METADATA$FILENAME даёт полный путь от bucket'а; partition column парсится регуляркой.
  4. MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE работает только для Parquet/ORC/Avro с именованными колонками; для CSV — позиционно.
  5. Snowpipe ≠ real-time: задержка 30 сек – пара минут. Для миллисекунд — Snowpipe Streaming / Kafka connector.
  6. Объёмы файлов: оптимально 100-250 МБ сжатых на файл; меньше — overhead, больше — медленный COPY.
  7. MERGE с _loaded_at не дедуплицирует строки внутри файла — для этого нужен PK и явный MERGE после COPY.

Эталонный ответ

External stage с storage integration → COPY INTO с PATTERN + ON_ERROR='CONTINUE' + FORCE=FALSE (Snowflake запоминает файлы 64 дня — идемпотентно). Для автозагрузки — PIPE ... AUTO_INGEST=TRUE с S3 event notifications через SNS. Partition column парсится из METADATA$FILENAME. Контроль — COPY_HISTORY + VALIDATE.

Хочешь увидеть разбор?

Зарегистрируйся бесплатно — откроется развёрнутое решение этой задачи и ещё 4 на выбор.

Зарегистрироваться и увидеть разбор
Уже есть аккаунт? Войти