Условие
Есть «сырые» данные о покупках авиабилетов (data.csv). Поля:
booking_id— ID бронирования.booking_month— месяц покупки билета.passengers— количество пассажиров в бронировании.price— полная стоимость билета (на всё бронирование, не на одного пассажира).flights_info— список рейсов внутри билета (JSON-строка). Считаем, что это все рейсы без обратного билета, но по пути могут быть пересадки. Пример:
[
{"origin": "SVO", "destination": "MCT", "airline": "WY", "baggage": "With baggage"},
{"origin": "MCT", "destination": "HKT", "airline": "WY", "baggage": "With baggage"}
]Нужно привести данные так, чтобы на каждый рейс внутри билета была своя строка. Из примера выше получится 2 строки: SVO->MCT и MCT->HKT.
Итоговая таблица должна содержать столбцы:
booking_id— ID бронирования.booking_month— месяц покупки.itinerary— маршрут вида{start}-{change_1}-...-{finish}(для примера вышеSVO-MCT-HKT).flights_count— число рейсов в маршруте (1 если прямой).flight_id— ID конкретного рейса (md5 отbooking_id+ индекса рейса).flight_index— номер рейса в маршруте, начиная с 0.origin,destination— коды аэропортов.airline— код авиакомпании.baggage— тариф с багажом или без.passengers,price— продублированы из бронирования.
Можно добавить дополнительные столбцы, если они улучшают данные.
Структура данных
data.csv, ~10–100k строк, формат CSV. В колонке flights_info лежит JSON-массив объектов.
Решение
Подход
- Прочитать CSV, распарсить
flights_infoчерезjson.loads. - Для каждой строки построить
itineraryкакorigin0-dest0-dest1-...-destN(или эквивалентно — всеoriginподряд + последнийdestination). - Для каждой строки посчитать
flights_count,transfer_count,companies_count,with_baggage,passenger_price. - Развернуть список рейсов через
df.explode(или пересобрать вручную через list-comprehension). - Добавить
flight_indexиflight_id(md5).
Реализация
import json
import hashlib
import pandas as pd
df = pd.read_csv("data.csv")
# 1. Парсим JSON
df["flights"] = df["flights_info"].apply(json.loads)
# 2. Агрегаты на уровне бронирования
def itinerary(flights):
if not flights:
return ""
return "-".join([flights[0]["origin"]] + [f["destination"] for f in flights])
df["itinerary"] = df["flights"].apply(itinerary)
df["flights_count"] = df["flights"].apply(len)
df["transfer_count"] = df["flights_count"] - 1
df["companies_count"] = df["flights"].apply(lambda fs: len({f["airline"] for f in fs}))
df["with_baggage"] = df["flights"].apply(
lambda fs: all(f["baggage"] == "With baggage" for f in fs)
)
df["passenger_price"] = df["price"] / df["passengers"]
# 3. Разворачиваем рейсы в строки
keep = ["booking_id", "booking_month", "itinerary", "flights_count",
"transfer_count", "companies_count", "with_baggage",
"passengers", "price", "passenger_price"]
exploded = df[keep + ["flights"]].explode("flights").reset_index(drop=True)
# 4. Достаём поля рейса
flights_df = pd.json_normalize(exploded["flights"])
flights_df.columns = ["origin", "destination", "airline", "baggage"]
result = pd.concat(
[exploded[keep].reset_index(drop=True), flights_df.reset_index(drop=True)],
axis=1,
)
# 5. flight_index — номер рейса внутри маршрута
result["flight_index"] = result.groupby("booking_id").cumcount()
# 6. flight_id = md5(booking_id + flight_index)
result["flight_id"] = result.apply(
lambda r: hashlib.md5(f"{r['booking_id']}{r['flight_index']}".encode()).hexdigest(),
axis=1,
)
# 7. Финальный порядок колонок
cols = ["booking_id", "booking_month", "itinerary", "flights_count",
"transfer_count", "companies_count", "with_baggage",
"flight_id", "flight_index", "origin", "destination",
"airline", "baggage", "passengers", "price", "passenger_price"]
result = result[cols]
print(result.head())Проверка
Sanity-check после ETL:
# Сумма всех flights_count должна совпасть с числом строк
assert result.shape[0] == df["flights_count"].sum()
# В пределах одного booking_id flight_index идёт 0,1,2,... подряд
seq = result.groupby("booking_id")["flight_index"].apply(list)
assert all(s == list(range(len(s))) for s in seq)
# itinerary длина = flights_count + 1 (по числу аэропортов в маршруте)
assert (result["itinerary"].str.count("-") + 1 == result["flights_count"] + 1).all()Подводные камни
flights_infoприходит строкой JSON.pd.read_csvне парсит её — обязательноjson.loads. Если используетеast.literal_eval— сработает, но безопаснееjson.loads.itineraryсобирается не из всехoriginилиdestination. Если просто'-'.join(origin)— потеряете финальный аэропорт. Нужно[origin0] + [destination_i for i in flights].flights_count = 1для прямого рейса. Не путайте сtransfer_count = 0.md5-хэш должен быть детерминирован. Если завязать наflightsобъект напрямую (hash(dict)) — упадёт. Братьbooking_id + str(flight_index).passenger_priceсчитается через деление полной цены на число пассажиров — но налоги/сборы могут быть «на бронь», поэтому это не всегда «честная цена за пассажира». В тестовом — приемлемо.- Дубликаты
booking_idв исходных данных — теоретически быть не должно, но проверить черезdf["booking_id"].is_unique. Если есть дубликаты —cumcount()даст некорректныйflight_index. with_baggage— мы считаем «Trueесли ВСЕ рейсы с багажом». Альтернативно — отдельная колонка на уровне рейса (она у нас уже есть вbaggage). На уровне маршрута лучше всего —True/False/Mixed.
Альтернативы
- Вместо
explode+json_normalizeможно сделать вручную через цикл:
rows = []
for _, r in df.iterrows():
for i, f in enumerate(r["flights"]):
rows.append({**r[keep].to_dict(), **f, "flight_index": i})
result = pd.DataFrame(rows)Это медленнее на больших данных, но читаемо.
- Можно сохранить обе таблицы —
bookings(без раскрытия) иflights(нормализованная), как в OLAP-схеме. Это удобнее для BI, чем «всё в одной плоской».
Эталонный ответ
Pandas-pipeline: read_csv → json.loads(flights_info) → explode → json_normalize → cumcount + md5 → final columns. Ключевые моменты — корректный itinerary (origin первого + все destination), детерминированный flight_id, sanity-check после преобразования.