Условие
В S3 каждый день складываются Parquet-файлы заказов в s3://bronze/orders/dt=YYYY-MM-DD/*.parquet. Настройте загрузку в Snowflake-таблицу raw.orders так, чтобы:
- Использовался external stage с IAM-ролью.
- Каждый файл загружался ровно один раз (идемпотентность).
- При ошибке в одной строке — fail только этой строки, не всей загрузки.
- Был автоматический trigger через Snowpipe.
Решение
Шаги
-- 1. Storage integration (one-time, by admin)
CREATE OR REPLACE STORAGE INTEGRATION s3_bronze
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/snowflake-bronze'
STORAGE_ALLOWED_LOCATIONS = ('s3://bronze/');
-- 2. File format
CREATE OR REPLACE FILE FORMAT ff_parquet
TYPE = PARQUET
COMPRESSION = SNAPPY;
-- 3. External stage
CREATE OR REPLACE STAGE bronze_orders_stage
URL = 's3://bronze/orders/'
STORAGE_INTEGRATION = s3_bronze
FILE_FORMAT = ff_parquet;
-- 4. Target table
CREATE OR REPLACE TABLE raw.orders (
order_id BIGINT,
user_id BIGINT,
created_at TIMESTAMP_NTZ,
amount NUMBER(18, 2),
status STRING,
dt DATE, -- partition column
_loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);COPY INTO с идемпотентностью
COPY INTO raw.orders (order_id, user_id, created_at, amount, status, dt)
FROM (
SELECT
$1:order_id::BIGINT,
$1:user_id::BIGINT,
$1:created_at::TIMESTAMP_NTZ,
$1:amount::NUMBER(18, 2),
$1:status::STRING,
TO_DATE(REGEXP_SUBSTR(METADATA$FILENAME, 'dt=(\\d{4}-\\d{2}-\\d{2})', 1, 1, 'e', 1))
FROM @bronze_orders_stage
)
PATTERN = '.*orders/dt=.*\\.parquet'
ON_ERROR = 'CONTINUE' -- битые строки скипаются
FORCE = FALSE -- не перезагружать уже видимые файлы
MATCH_BY_COLUMN_NAME = NONE;FORCE = FALSE(default) → Snowflake помнит загруженные файлы 64 дня и не дублирует.ON_ERROR = 'CONTINUE'→ проблемная строка попадает вLOAD_HISTORYс пометкой, остальные грузятся.PATTERNфильтрует только нужные файлы внутри stage.
Альтернатива — MERGE для повторной загрузки той же даты
Если файл переписали (исправленные данные), FORCE = TRUE или явный DELETE + COPY:
BEGIN;
DELETE FROM raw.orders WHERE dt = '2025-05-26';
COPY INTO raw.orders
FROM @bronze_orders_stage/dt=2025-05-26/
FILE_FORMAT = (FORMAT_NAME = ff_parquet)
FORCE = TRUE;
COMMIT;Snowpipe для автозагрузки
CREATE OR REPLACE PIPE pipe_orders
AUTO_INGEST = TRUE
AS
COPY INTO raw.orders (order_id, user_id, created_at, amount, status, dt)
FROM (
SELECT $1:order_id::BIGINT,
$1:user_id::BIGINT,
$1:created_at::TIMESTAMP_NTZ,
$1:amount::NUMBER(18,2),
$1:status::STRING,
TO_DATE(REGEXP_SUBSTR(METADATA$FILENAME,'dt=(\\d{4}-\\d{2}-\\d{2})',1,1,'e',1))
FROM @bronze_orders_stage
)
ON_ERROR = 'CONTINUE';
-- получить ARN SNS-уведомлений
DESC PIPE pipe_orders;На S3 настраивается S3 Event Notification → SNS → Snowflake. Файл попадает в bucket — через ~1 минуту строки в таблице.
Контроль загрузок
-- что и когда грузилось
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'raw.orders',
START_TIME => DATEADD(day, -7, CURRENT_TIMESTAMP())));
-- ошибочные строки
SELECT * FROM TABLE(VALIDATE(raw.orders, JOB_ID => '_last'));Подводные камни
ON_ERROR = 'ABORT_STATEMENT'(default) — одна битая строка отменит всю партию. Для прода чащеCONTINUE+ мониторингVALIDATE.FORCE = TRUEубивает дедупликацию по filename — использовать только при осознанной перезагрузке.METADATA$FILENAMEдаёт полный путь от bucket'а; partition column парсится регуляркой.MATCH_BY_COLUMN_NAME = CASE_INSENSITIVEработает только для Parquet/ORC/Avro с именованными колонками; для CSV — позиционно.- Snowpipe ≠ real-time: задержка 30 сек – пара минут. Для миллисекунд — Snowpipe Streaming / Kafka connector.
- Объёмы файлов: оптимально 100-250 МБ сжатых на файл; меньше — overhead, больше — медленный COPY.
MERGEс_loaded_atне дедуплицирует строки внутри файла — для этого нужен PK и явныйMERGEпосле COPY.
Эталонный ответ
External stage с storage integration → COPY INTO с PATTERN + ON_ERROR='CONTINUE' + FORCE=FALSE (Snowflake запоминает файлы 64 дня — идемпотентно). Для автозагрузки — PIPE ... AUTO_INGEST=TRUE с S3 event notifications через SNS. Partition column парсится из METADATA$FILENAME. Контроль — COPY_HISTORY + VALIDATE.