Собесов

Python — Union-Find для связанных записей по id/phone/mail

PythonGraph / Union-FindСложнаяMiddle

Условие

Дана таблица:

key id phone mail
1 12345 89997776655 test@mail.ru
2 54321 87778885566 two@mail.ru
3 98765 87776664577 three@mail
4 66678 87778885566 four@mail.ru
5 34567 84547895566 four@mail.ru
6 34567 89087545678 five@mail.ru

Нужно: по заданному значению поля (id, phone или mail) вернуть все связанные записи — транзитивную связную компоненту, где две записи связаны, если у них совпадает хотя бы одно из значений id, phone, mail.

Для phone = 87778885566 ответ: записи 2, 4, 5, 6.

Парная задача с SQL-версией (sber-sql-connected-records-graph) — здесь решаем на Python.

Решение

Подход

Правильная структура для задач связной компоненты — DSU (Disjoint Set Union, Union-Find):

  • хранит для каждого элемента «лидера» его компоненты;
  • поддерживает операции find(x) (найти лидера) и union(x, y) (объединить компоненты);
  • работает за почти константное время на операцию (амортизированно).

Алгоритм:

  1. Каждая запись — отдельный элемент.
  2. Для каждого общего значения (id / phone / mail) объединяем все записи, у которых это значение совпадает.
  3. Для запроса находим компоненту, в которой лежит хотя бы одна стартовая запись.

Реализация

import pandas as pd
 
class DSU:
    def __init__(self, n):
        self.parent = list(range(n))
        self.rank = [0] * n
 
    def find(self, x):
        while self.parent[x] != x:
            self.parent[x] = self.parent[self.parent[x]]   # path compression
            x = self.parent[x]
        return x
 
    def union(self, a, b):
        ra, rb = self.find(a), self.find(b)
        if ra == rb:
            return
        if self.rank[ra] < self.rank[rb]:
            ra, rb = rb, ra
        self.parent[rb] = ra
        if self.rank[ra] == self.rank[rb]:
            self.rank[ra] += 1
 
 
def find_related(df: pd.DataFrame, search_field: str, value) -> pd.DataFrame:
    """
    df — таблица с колонками key, id, phone, mail.
    search_field — 'id' / 'phone' / 'mail'.
    value — искомое значение.
    """
    df = df.reset_index(drop=True).copy()
    n = len(df)
    dsu = DSU(n)
 
    # Объединяем записи по каждому из трёх полей
    for col in ("id", "phone", "mail"):
        groups = df.groupby(col).indices       # значение -> список индексов
        for val, idx_list in groups.items():
            if val is None or pd.isna(val):
                continue
            base = idx_list[0]
            for i in idx_list[1:]:
                dsu.union(base, i)
 
    # Находим стартовые индексы
    seed = df.index[df[search_field] == value].tolist()
    if not seed:
        return df.iloc[0:0]
 
    target_root = dsu.find(seed[0])
    # Все записи, у которых корень совпадает с целевым
    mask = [dsu.find(i) == target_root for i in df.index]
    return df.loc[mask].sort_values("key").reset_index(drop=True)
 
 
# Пример использования
data = pd.DataFrame({
    "key":   [1, 2, 3, 4, 5, 6],
    "id":    [12345, 54321, 98765, 66678, 34567, 34567],
    "phone": ["89997776655", "87778885566", "87776664577", "87778885566", "84547895566", "89087545678"],
    "mail":  ["test@mail.ru", "two@mail.ru", "three@mail", "four@mail.ru", "four@mail.ru", "five@mail.ru"],
})
 
print(find_related(data, "phone", "87778885566"))
#    key     id        phone           mail
# 0    2  54321  87778885566   two@mail.ru
# 1    4  66678  87778885566   four@mail.ru
# 2    5  34567  84547895566   four@mail.ru
# 3    6  34567  89087545678   five@mail.ru

Сложность

  • Построение DSU: O(N · α(N))O(N) на проход по таблице, α — обратная функция Аккермана (практически 4 для реальных размеров).
  • Каждый запрос — O(N) на фильтрацию записей с тем же корнем.
  • Память — O(N).

При стриминговом потоке записей DSU поддерживает инкрементальный апдейт: новая запись объединяется с уже существующими по совпадениям, и поиск компоненты остаётся быстрым.

Альтернатива: BFS/DFS на графе

Можно вместо DSU построить граф (ребро между записями с общим значением) и запустить BFS из стартовой вершины. На малых данных это проще, но для больших таблиц явный граф съедает память: при N записей и одном «общем» значении у K записей ребро — K2K^2. DSU обходит эту проблему: не строим явные рёбра, объединяем сразу в одну компоненту.

from collections import defaultdict, deque
 
def find_related_bfs(df, search_field, value):
    # value -> список key
    bucket = {col: defaultdict(list) for col in ("id", "phone", "mail")}
    for i, row in df.iterrows():
        for col in ("id", "phone", "mail"):
            bucket[col][row[col]].append(i)
 
    seed = df.index[df[search_field] == value].tolist()
    visited = set(seed)
    queue = deque(seed)
    while queue:
        i = queue.popleft()
        for col in ("id", "phone", "mail"):
            for j in bucket[col][df.at[i, col]]:
                if j not in visited:
                    visited.add(j)
                    queue.append(j)
    return df.loc[sorted(visited)]

Подводные камни

  1. NaN / None как «общее значение». Если у двух записей mail = NaN, BFS их объединит (если их положить в один bucket[NaN]). Это ошибка: «нет почты» — не признак связи. В DSU отдельно проверяем pd.isna(val) и пропускаем.
  2. «Грязные» значения. 'three@mail' без .ru, пробелы, телефон с/без +7 — нормализуйте до Union-Find: df.mail = df.mail.str.lower().str.strip().
  3. Path compression обязательна. Без неё DSU деградирует до O(N) на операцию на «вытянутых» цепочках.
  4. Стримовые обновления. Если данные приходят непрерывно, DSU поддерживает онлайн: новую запись объединяем по матчам со старыми. Удаления компоненты не поддерживает — для этого нужна другая структура.
  5. Большие данные. При >10M записей: DSU всё ещё работает, но memory-bound (массив parent). На Spark есть встроенный connected_components через GraphFrames, на Postgres — рекурсивный CTE (см. парную SQL-задачу).
  6. Несколько seed. Если по запросу нашлось несколько записей-стартов, проверьте, что они в одной компоненте (по идее должны быть — но это даст санити-проверку). Если в разных — одно из значений не уникально (например, phone ввели один и тот же двум разным людям).

Эталонный ответ

DSU + проход по таблице, объединяющий записи с общим id, phone, mail (с пропуском NaN).

def find_related(df, field, value):
    dsu = DSU(len(df))
    for col in ("id", "phone", "mail"):
        for val, idx in df.groupby(col).indices.items():
            if pd.isna(val): continue
            for i in idx[1:]:
                dsu.union(idx[0], i)
    seed = df.index[df[field] == value]
    if seed.empty: return df.iloc[0:0]
    root = dsu.find(seed[0])
    return df[[dsu.find(i) == root for i in df.index]]

Время: O(N·α(N)) для построения, O(N) на запрос. Правильная структура — DSU; BFS возможен, но менее эффективен на больших данных.

Хочешь увидеть разбор?

Зарегистрируйся бесплатно — откроется развёрнутое решение этой задачи и ещё 4 на выбор.

Зарегистрироваться и увидеть разбор
Уже есть аккаунт? Войти