Условие
В S3 лежит 200 ГБ событий в parquet, партиционированных по dt=YYYY-MM-DD. Нужно за период [d1, d2] посчитать DAU и среднюю длительность сессии по странам. RAM 16 ГБ.
Решение
Вариант 1: Dask
import dask.dataframe as dd
ddf = dd.read_parquet(
's3://bucket/events/',
filters=[('dt', '>=', '2024-01-01'), ('dt', '<=', '2024-01-31')],
columns=['user_id', 'session_id', 'country', 'duration'],
)
result = (
ddf.groupby(['dt', 'country'])
.agg({'user_id': 'nunique', 'duration': 'mean'})
.compute()
)filters= важен: Dask применит push-down к parquet и не будет читать ненужные партиции.
Вариант 2: чистый pandas по партициям
import pandas as pd
from pathlib import Path
results = []
for day in pd.date_range('2024-01-01', '2024-01-31'):
path = f's3://bucket/events/dt={day:%Y-%m-%d}/'
df = pd.read_parquet(path, columns=['user_id', 'country', 'duration'])
agg = df.groupby('country').agg(
dau=('user_id', 'nunique'),
mean_dur=('duration', 'mean'),
).assign(dt=day)
results.append(agg)
combined = pd.concat(results).reset_index()Вариант 3: DuckDB
import duckdb
duckdb.sql("""
SELECT dt, country,
COUNT(DISTINCT user_id) AS dau,
AVG(duration) AS mean_dur
FROM read_parquet('s3://bucket/events/*/*.parquet',
hive_partitioning=1)
WHERE dt BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY dt, country
""").df()DuckDB на одной машине часто быстрее Dask за счёт оптимизатора и колоночного исполнения.
Подводные камни
- Без
filters=Dask прочитает все партиции — потратите время и деньги (S3 egress). nuniqueв Dask — приближённый по умолчанию (HyperLogLog). Точноеnuniqueтребует shuffle всего ключа.- Партиционирование по дате — счастье. По стране — обычно нет, т.к. сильный skew.
read_parquetна S3 безs3fs/pyarrowупадёт — установить зависимости.- Если все агрегаты по партиции дисъюнктны (день × страна) — pandas по партициям часто проще и предсказуемее Dask.
Эталонный ответ
Dask с filters= для push-down ИЛИ цикл по партициям в pandas ИЛИ один SQL в DuckDB. Не читать всё в память.