Собесов

Aviasales Aviastats — ETL: разворот flights_info в строки

PythonETL и подготовка данныхСредняяMiddle

Условие

Есть «сырые» данные о покупках авиабилетов (data.csv). Поля:

  • booking_id — ID бронирования.
  • booking_month — месяц покупки билета.
  • passengers — количество пассажиров в бронировании.
  • price — полная стоимость билета (на всё бронирование, не на одного пассажира).
  • flights_info — список рейсов внутри билета (JSON-строка). Считаем, что это все рейсы без обратного билета, но по пути могут быть пересадки. Пример:
[
  {"origin": "SVO", "destination": "MCT", "airline": "WY", "baggage": "With baggage"},
  {"origin": "MCT", "destination": "HKT", "airline": "WY", "baggage": "With baggage"}
]

Нужно привести данные так, чтобы на каждый рейс внутри билета была своя строка. Из примера выше получится 2 строки: SVO->MCT и MCT->HKT.

Итоговая таблица должна содержать столбцы:

  • booking_id — ID бронирования.
  • booking_month — месяц покупки.
  • itinerary — маршрут вида {start}-{change_1}-...-{finish} (для примера выше SVO-MCT-HKT).
  • flights_count — число рейсов в маршруте (1 если прямой).
  • flight_id — ID конкретного рейса (md5 от booking_id + индекса рейса).
  • flight_index — номер рейса в маршруте, начиная с 0.
  • origin, destination — коды аэропортов.
  • airline — код авиакомпании.
  • baggage — тариф с багажом или без.
  • passengers, price — продублированы из бронирования.

Можно добавить дополнительные столбцы, если они улучшают данные.

Структура данных

data.csv, ~10–100k строк, формат CSV. В колонке flights_info лежит JSON-массив объектов.

Решение

Подход

  1. Прочитать CSV, распарсить flights_info через json.loads.
  2. Для каждой строки построить itinerary как origin0-dest0-dest1-...-destN (или эквивалентно — все origin подряд + последний destination).
  3. Для каждой строки посчитать flights_count, transfer_count, companies_count, with_baggage, passenger_price.
  4. Развернуть список рейсов через df.explode (или пересобрать вручную через list-comprehension).
  5. Добавить flight_index и flight_id (md5).

Реализация

import json
import hashlib
import pandas as pd
 
df = pd.read_csv("data.csv")
 
# 1. Парсим JSON
df["flights"] = df["flights_info"].apply(json.loads)
 
# 2. Агрегаты на уровне бронирования
def itinerary(flights):
    if not flights:
        return ""
    return "-".join([flights[0]["origin"]] + [f["destination"] for f in flights])
 
df["itinerary"] = df["flights"].apply(itinerary)
df["flights_count"] = df["flights"].apply(len)
df["transfer_count"] = df["flights_count"] - 1
df["companies_count"] = df["flights"].apply(lambda fs: len({f["airline"] for f in fs}))
df["with_baggage"] = df["flights"].apply(
    lambda fs: all(f["baggage"] == "With baggage" for f in fs)
)
df["passenger_price"] = df["price"] / df["passengers"]
 
# 3. Разворачиваем рейсы в строки
keep = ["booking_id", "booking_month", "itinerary", "flights_count",
        "transfer_count", "companies_count", "with_baggage",
        "passengers", "price", "passenger_price"]
 
exploded = df[keep + ["flights"]].explode("flights").reset_index(drop=True)
 
# 4. Достаём поля рейса
flights_df = pd.json_normalize(exploded["flights"])
flights_df.columns = ["origin", "destination", "airline", "baggage"]
 
result = pd.concat(
    [exploded[keep].reset_index(drop=True), flights_df.reset_index(drop=True)],
    axis=1,
)
 
# 5. flight_index — номер рейса внутри маршрута
result["flight_index"] = result.groupby("booking_id").cumcount()
 
# 6. flight_id = md5(booking_id + flight_index)
result["flight_id"] = result.apply(
    lambda r: hashlib.md5(f"{r['booking_id']}{r['flight_index']}".encode()).hexdigest(),
    axis=1,
)
 
# 7. Финальный порядок колонок
cols = ["booking_id", "booking_month", "itinerary", "flights_count",
        "transfer_count", "companies_count", "with_baggage",
        "flight_id", "flight_index", "origin", "destination",
        "airline", "baggage", "passengers", "price", "passenger_price"]
result = result[cols]
print(result.head())

Проверка

Sanity-check после ETL:

# Сумма всех flights_count должна совпасть с числом строк
assert result.shape[0] == df["flights_count"].sum()
 
# В пределах одного booking_id flight_index идёт 0,1,2,... подряд
seq = result.groupby("booking_id")["flight_index"].apply(list)
assert all(s == list(range(len(s))) for s in seq)
 
# itinerary длина = flights_count + 1 (по числу аэропортов в маршруте)
assert (result["itinerary"].str.count("-") + 1 == result["flights_count"] + 1).all()

Подводные камни

  1. flights_info приходит строкой JSON. pd.read_csv не парсит её — обязательно json.loads. Если используете ast.literal_eval — сработает, но безопаснее json.loads.
  2. itinerary собирается не из всех origin или destination. Если просто '-'.join(origin) — потеряете финальный аэропорт. Нужно [origin0] + [destination_i for i in flights].
  3. flights_count = 1 для прямого рейса. Не путайте с transfer_count = 0.
  4. md5-хэш должен быть детерминирован. Если завязать на flights объект напрямую (hash(dict)) — упадёт. Брать booking_id + str(flight_index).
  5. passenger_price считается через деление полной цены на число пассажиров — но налоги/сборы могут быть «на бронь», поэтому это не всегда «честная цена за пассажира». В тестовом — приемлемо.
  6. Дубликаты booking_id в исходных данных — теоретически быть не должно, но проверить через df["booking_id"].is_unique. Если есть дубликаты — cumcount() даст некорректный flight_index.
  7. with_baggage — мы считаем «True если ВСЕ рейсы с багажом». Альтернативно — отдельная колонка на уровне рейса (она у нас уже есть в baggage). На уровне маршрута лучше всего — True/False/Mixed.

Альтернативы

  • Вместо explode + json_normalize можно сделать вручную через цикл:
rows = []
for _, r in df.iterrows():
    for i, f in enumerate(r["flights"]):
        rows.append({**r[keep].to_dict(), **f, "flight_index": i})
result = pd.DataFrame(rows)

Это медленнее на больших данных, но читаемо.

  • Можно сохранить обе таблицы — bookings (без раскрытия) и flights (нормализованная), как в OLAP-схеме. Это удобнее для BI, чем «всё в одной плоской».

Эталонный ответ

Pandas-pipeline: read_csv → json.loads(flights_info) → explode → json_normalize → cumcount + md5 → final columns. Ключевые моменты — корректный itinerary (origin первого + все destination), детерминированный flight_id, sanity-check после преобразования.

Хочешь увидеть разбор?

Зарегистрируйся бесплатно — откроется развёрнутое решение этой задачи и ещё 4 на выбор.

Зарегистрироваться и увидеть разбор
Уже есть аккаунт? Войти