Условие
Дана таблица:
| key | id | phone | |
|---|---|---|---|
| 1 | 12345 | 89997776655 | test@mail.ru |
| 2 | 54321 | 87778885566 | two@mail.ru |
| 3 | 98765 | 87776664577 | three@mail |
| 4 | 66678 | 87778885566 | four@mail.ru |
| 5 | 34567 | 84547895566 | four@mail.ru |
| 6 | 34567 | 89087545678 | five@mail.ru |
Нужно: по заданному значению поля (id, phone или mail) вернуть все связанные записи — транзитивную связную компоненту, где две записи связаны, если у них совпадает хотя бы одно из значений id, phone, mail.
Для phone = 87778885566 ответ: записи 2, 4, 5, 6.
Парная задача с SQL-версией (sber-sql-connected-records-graph) — здесь решаем на Python.
Решение
Подход
Правильная структура для задач связной компоненты — DSU (Disjoint Set Union, Union-Find):
- хранит для каждого элемента «лидера» его компоненты;
- поддерживает операции
find(x)(найти лидера) иunion(x, y)(объединить компоненты); - работает за почти константное время на операцию (амортизированно).
Алгоритм:
- Каждая запись — отдельный элемент.
- Для каждого общего значения (
id/phone/mail) объединяем все записи, у которых это значение совпадает. - Для запроса находим компоненту, в которой лежит хотя бы одна стартовая запись.
Реализация
import pandas as pd
class DSU:
def __init__(self, n):
self.parent = list(range(n))
self.rank = [0] * n
def find(self, x):
while self.parent[x] != x:
self.parent[x] = self.parent[self.parent[x]] # path compression
x = self.parent[x]
return x
def union(self, a, b):
ra, rb = self.find(a), self.find(b)
if ra == rb:
return
if self.rank[ra] < self.rank[rb]:
ra, rb = rb, ra
self.parent[rb] = ra
if self.rank[ra] == self.rank[rb]:
self.rank[ra] += 1
def find_related(df: pd.DataFrame, search_field: str, value) -> pd.DataFrame:
"""
df — таблица с колонками key, id, phone, mail.
search_field — 'id' / 'phone' / 'mail'.
value — искомое значение.
"""
df = df.reset_index(drop=True).copy()
n = len(df)
dsu = DSU(n)
# Объединяем записи по каждому из трёх полей
for col in ("id", "phone", "mail"):
groups = df.groupby(col).indices # значение -> список индексов
for val, idx_list in groups.items():
if val is None or pd.isna(val):
continue
base = idx_list[0]
for i in idx_list[1:]:
dsu.union(base, i)
# Находим стартовые индексы
seed = df.index[df[search_field] == value].tolist()
if not seed:
return df.iloc[0:0]
target_root = dsu.find(seed[0])
# Все записи, у которых корень совпадает с целевым
mask = [dsu.find(i) == target_root for i in df.index]
return df.loc[mask].sort_values("key").reset_index(drop=True)
# Пример использования
data = pd.DataFrame({
"key": [1, 2, 3, 4, 5, 6],
"id": [12345, 54321, 98765, 66678, 34567, 34567],
"phone": ["89997776655", "87778885566", "87776664577", "87778885566", "84547895566", "89087545678"],
"mail": ["test@mail.ru", "two@mail.ru", "three@mail", "four@mail.ru", "four@mail.ru", "five@mail.ru"],
})
print(find_related(data, "phone", "87778885566"))
# key id phone mail
# 0 2 54321 87778885566 two@mail.ru
# 1 4 66678 87778885566 four@mail.ru
# 2 5 34567 84547895566 four@mail.ru
# 3 6 34567 89087545678 five@mail.ruСложность
- Построение DSU:
O(N · α(N))≈O(N)на проход по таблице,α— обратная функция Аккермана (практически 4 для реальных размеров). - Каждый запрос —
O(N)на фильтрацию записей с тем же корнем. - Память —
O(N).
При стриминговом потоке записей DSU поддерживает инкрементальный апдейт: новая запись объединяется с уже существующими по совпадениям, и поиск компоненты остаётся быстрым.
Альтернатива: BFS/DFS на графе
Можно вместо DSU построить граф (ребро между записями с общим значением) и запустить BFS из стартовой вершины. На малых данных это проще, но для больших таблиц явный граф съедает память: при N записей и одном «общем» значении у K записей ребро — . DSU обходит эту проблему: не строим явные рёбра, объединяем сразу в одну компоненту.
from collections import defaultdict, deque
def find_related_bfs(df, search_field, value):
# value -> список key
bucket = {col: defaultdict(list) for col in ("id", "phone", "mail")}
for i, row in df.iterrows():
for col in ("id", "phone", "mail"):
bucket[col][row[col]].append(i)
seed = df.index[df[search_field] == value].tolist()
visited = set(seed)
queue = deque(seed)
while queue:
i = queue.popleft()
for col in ("id", "phone", "mail"):
for j in bucket[col][df.at[i, col]]:
if j not in visited:
visited.add(j)
queue.append(j)
return df.loc[sorted(visited)]Подводные камни
NaN/Noneкак «общее значение». Если у двух записейmail = NaN, BFS их объединит (если их положить в одинbucket[NaN]). Это ошибка: «нет почты» — не признак связи. В DSU отдельно проверяемpd.isna(val)и пропускаем.- «Грязные» значения.
'three@mail'без.ru, пробелы, телефон с/без+7— нормализуйте до Union-Find:df.mail = df.mail.str.lower().str.strip(). - Path compression обязательна. Без неё DSU деградирует до
O(N)на операцию на «вытянутых» цепочках. - Стримовые обновления. Если данные приходят непрерывно, DSU поддерживает онлайн: новую запись объединяем по матчам со старыми. Удаления компоненты не поддерживает — для этого нужна другая структура.
- Большие данные. При >10M записей: DSU всё ещё работает, но memory-bound (массив
parent). На Spark есть встроенныйconnected_componentsчерез GraphFrames, на Postgres — рекурсивный CTE (см. парную SQL-задачу). - Несколько
seed. Если по запросу нашлось несколько записей-стартов, проверьте, что они в одной компоненте (по идее должны быть — но это даст санити-проверку). Если в разных — одно из значений не уникально (например,phoneввели один и тот же двум разным людям).
Эталонный ответ
DSU + проход по таблице, объединяющий записи с общим id, phone, mail (с пропуском NaN).
def find_related(df, field, value):
dsu = DSU(len(df))
for col in ("id", "phone", "mail"):
for val, idx in df.groupby(col).indices.items():
if pd.isna(val): continue
for i in idx[1:]:
dsu.union(idx[0], i)
seed = df.index[df[field] == value]
if seed.empty: return df.iloc[0:0]
root = dsu.find(seed[0])
return df[[dsu.find(i) == root for i in df.index]]Время: O(N·α(N)) для построения, O(N) на запрос. Правильная структура — DSU; BFS возможен, но менее эффективен на больших данных.