Собесов

Сценарий: чанки и Dask для агрегата по большим parquet

PythonOptimisation и big dataСредняяMiddle

Условие

В S3 лежит 200 ГБ событий в parquet, партиционированных по dt=YYYY-MM-DD. Нужно за период [d1, d2] посчитать DAU и среднюю длительность сессии по странам. RAM 16 ГБ.

Решение

Вариант 1: Dask

import dask.dataframe as dd
 
ddf = dd.read_parquet(
    's3://bucket/events/',
    filters=[('dt', '>=', '2024-01-01'), ('dt', '<=', '2024-01-31')],
    columns=['user_id', 'session_id', 'country', 'duration'],
)
 
result = (
    ddf.groupby(['dt', 'country'])
       .agg({'user_id': 'nunique', 'duration': 'mean'})
       .compute()
)

filters= важен: Dask применит push-down к parquet и не будет читать ненужные партиции.

Вариант 2: чистый pandas по партициям

import pandas as pd
from pathlib import Path
 
results = []
for day in pd.date_range('2024-01-01', '2024-01-31'):
    path = f's3://bucket/events/dt={day:%Y-%m-%d}/'
    df = pd.read_parquet(path, columns=['user_id', 'country', 'duration'])
    agg = df.groupby('country').agg(
        dau=('user_id', 'nunique'),
        mean_dur=('duration', 'mean'),
    ).assign(dt=day)
    results.append(agg)
 
combined = pd.concat(results).reset_index()

Вариант 3: DuckDB

import duckdb
duckdb.sql("""
SELECT dt, country,
       COUNT(DISTINCT user_id) AS dau,
       AVG(duration) AS mean_dur
FROM read_parquet('s3://bucket/events/*/*.parquet',
                  hive_partitioning=1)
WHERE dt BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY dt, country
""").df()

DuckDB на одной машине часто быстрее Dask за счёт оптимизатора и колоночного исполнения.

Подводные камни

  1. Без filters= Dask прочитает все партиции — потратите время и деньги (S3 egress).
  2. nunique в Dask — приближённый по умолчанию (HyperLogLog). Точное nunique требует shuffle всего ключа.
  3. Партиционирование по дате — счастье. По стране — обычно нет, т.к. сильный skew.
  4. read_parquet на S3 без s3fs/pyarrow упадёт — установить зависимости.
  5. Если все агрегаты по партиции дисъюнктны (день × страна) — pandas по партициям часто проще и предсказуемее Dask.

Эталонный ответ

Dask с filters= для push-down ИЛИ цикл по партициям в pandas ИЛИ один SQL в DuckDB. Не читать всё в память.

Хочешь увидеть разбор?

Зарегистрируйся бесплатно — откроется развёрнутое решение этой задачи и ещё 4 на выбор.

Зарегистрироваться и увидеть разбор
Уже есть аккаунт? Войти