Условие
Ежедневный ETL должен класть результирующий DataFrame в S3 в parquet, партиционируя по dt и country. Как сделать это надёжно и атомарно?
Решение
Простой путь — pandas + s3fs
df.to_parquet(
's3://mybucket/marts/orders/dt=2024-01-15/country=RU/data.parquet',
engine='pyarrow',
compression='snappy',
)Через s3fs/fsspec pandas пишет напрямую. Один файл на путь — атомарно (S3 PUT либо есть, либо нет).
Многофайловое партиционирование — awswrangler
import awswrangler as wr
wr.s3.to_parquet(
df=df,
path='s3://mybucket/marts/orders/',
dataset=True,
mode='overwrite_partitions', # перепишет только затронутые партиции
partition_cols=['dt', 'country'],
compression='snappy',
)mode='overwrite_partitions' критично для идемпотентного ETL — повторный прогон не задвоит данные.
Атомарность
S3 не транзакционен. Решения:
- Писать в
marts/orders/_tmp/..., потомwr.s3.copy_objectsв финальный путь. - Использовать Iceberg/Delta Lake — даёт ACID на S3.
- Писать партицию целиком, не дописывать в существующий префикс.
Glue Catalog
wr.s3.to_parquet(
df=df,
path='s3://mybucket/marts/orders/',
dataset=True,
database='analytics',
table='orders',
partition_cols=['dt', 'country'],
)Создаст/обновит таблицу в Glue Catalog — её сразу видно из Athena/Spectrum.
Подводные камни
to_parquetбезdataset=Trueпишет один файл. Партиций не получится — нуженwr.s3.to_parquet(dataset=True).mode='append'дублирует данные при повторном запуске. Для идемпотентности —overwrite_partitions.- Большой партиционный список (
dt × country × city) → тысячи мелких файлов → медленные запросы. Партиционируйте экономно. - IAM роль ETL-машины должна иметь права на
s3:PutObjectиglue:*для таблицы. s3fsиногда отваливается на больших файлах — установите свежую версию или используйтеboto3.upload_file.
Эталонный ответ
awswrangler.s3.to_parquet(dataset=True, mode='overwrite_partitions', partition_cols=...) — идемпотентно, регистрирует в Glue, корректно по партициям.