Files
wiki_embedding/supabase.py
Тимур Абайдулин 84b8246562 Init
2026-03-10 16:33:39 +03:00

444 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Модуль для работы с Supabase (PostgreSQL)
"""
import os
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / '.env')
class SupabaseManager:
"""Менеджер для работы с Supabase"""
def __init__(self):
"""Инициализация подключения к Supabase"""
self.host = os.getenv('SUPABASE_HOST')
self.port = os.getenv('SUPABASE_PORT')
self.user = os.getenv('SUPABASE_USER')
self.password = os.getenv('SUPABASE_PASSWORD')
self.database = os.getenv('SUPABASE_DB')
self.conn = None
self.cursor = None
def connect(self):
"""Подключение к базе данных"""
try:
self.conn = psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
sslmode='require',
)
self.cursor = self.conn.cursor()
# Проверка версии
self.cursor.execute('SELECT version();')
db_version = self.cursor.fetchone()
print(f"Подключение к Supabase успешно!\nВерсия PostgreSQL: {db_version[0]}")
return True
except Exception as e:
print(f"✗ Ошибка подключения к Supabase: {e}")
return False
def close(self):
"""Закрытие соединения"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("✓ Соединение с базой данных закрыто")
# ============================================================================
# ПРОВЕРКА ТАБЛИЦ
# ============================================================================
def table_exists(self, table_name):
"""Проверка существования таблицы"""
self.cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s
);
""", (table_name,))
return self.cursor.fetchone()[0]
def table_has_data(self, table_name):
"""Проверка, что таблица не пустая"""
self.cursor.execute(f"SELECT EXISTS (SELECT 1 FROM {table_name} LIMIT 1);")
return self.cursor.fetchone()[0]
# ============================================================================
# СОЗДАНИЕ ТАБЛИЦ
# ============================================================================
def create_employee_table(self):
"""Создание таблицы employee"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS employee (
id SERIAL PRIMARY KEY,
pg_load_dttm TIMESTAMPTZ NOT NULL,
yatracker_employee_id TEXT NOT NULL,
value JSONB NOT NULL
);
""")
self.conn.commit()
print("Таблица 'employee' создана")
def create_tasks_table(self):
"""Создание таблицы tasks"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id SERIAL PRIMARY KEY,
pg_load_dttm TIMESTAMPTZ NOT NULL,
queue TEXT,
task_key TEXT NOT NULL,
task_title TEXT,
status_name TEXT,
created_at_dttm TIMESTAMPTZ,
updated_at_dttm TIMESTAMPTZ,
status_start_dttm TIMESTAMPTZ,
value JSONB NOT NULL
);
""")
self.conn.commit()
print("Таблица 'tasks' создана")
def create_employee_info_table(self):
"""Создание таблицы employee_info"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS employee_info (
id SERIAL PRIMARY KEY,
pg_load_dttm TIMESTAMPTZ NOT NULL,
column_d TEXT,
column_n TEXT
);
""")
self.conn.commit()
print("Таблица 'employee_info' создана")
# ============================================================================
# ЗАГРУЗКА ДАННЫХ
# ============================================================================
def load_employee_data(self, employee_df):
"""Загрузка данных сотрудников в базу"""
if len(employee_df) > 0:
print("\nЗагружаем данные в таблицу 'employee'...")
for _, row in employee_df.iterrows():
self.cursor.execute("""
INSERT INTO employee (pg_load_dttm, yatracker_employee_id, value)
VALUES (%s, %s, %s::jsonb)
""", (row['pg_load_dttm'], row['yatracker_employee_id'], row['value']))
self.conn.commit()
print(f"✓ Загружено {len(employee_df)} записей в таблицу 'employee'")
else:
print("\nНет данных для загрузки в таблицу 'employee'")
def load_tasks_data(self, tasks_df):
"""Загрузка данных задач в базу"""
if len(tasks_df) > 0:
print("\nЗагружаем данные в таблицу 'tasks'...")
for _, row in tasks_df.iterrows():
self.cursor.execute("""
INSERT INTO tasks (pg_load_dttm, queue, task_key, task_title, status_name,
created_at_dttm, updated_at_dttm, status_start_dttm, value)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb)
""", (
row['pg_load_dttm'],
row['queue'],
row['task_key'],
row['task_title'],
row['status_name'],
row['created_at_dttm'],
row['updated_at_dttm'],
row['status_start_dttm'],
row['value']
))
self.conn.commit()
print(f"✓ Загружено {len(tasks_df)} записей в таблицу 'tasks'")
else:
print("\nНет данных для загрузки в таблицу 'tasks'")
def load_employee_info_data(self, employee_info_df):
"""Загрузка данных employee_info в базу"""
if len(employee_info_df) > 0:
print("\nЗагружаем данные в таблицу 'employee_info'...")
for _, row in employee_info_df.iterrows():
self.cursor.execute("""
INSERT INTO employee_info (pg_load_dttm, column_d, column_n)
VALUES (%s, %s, %s)
""", (
row['pg_load_dttm'],
row['column_d'],
row['column_n']
))
self.conn.commit()
print(f"✓ Загружено {len(employee_info_df)} записей в таблицу 'employee_info'")
else:
print("\nНет данных для загрузки в таблицу 'employee_info'")
# ============================================================================
# ОСНОВНАЯ ЛОГИКА ПРОВЕРКИ И ЗАГРУЗКИ
# ============================================================================
def check_and_prepare_tables(self):
"""
Проверка существования таблиц и создание при необходимости
Returns:
dict: Словарь с флагами наличия данных для каждой таблицы
"""
tables_status = {}
# Проверяем таблицу employee
employee_exists = self.table_exists('employee')
employee_has_data = False
if employee_exists:
employee_has_data = self.table_has_data('employee')
if employee_has_data:
print("Таблица 'employee' существует и содержит данные. Пропускаем загрузку.")
else:
print("Таблица 'employee' существует, но пустая.")
else:
self.create_employee_table()
tables_status['employee'] = employee_has_data
# Проверяем таблицу tasks
tasks_exists = self.table_exists('tasks')
tasks_has_data = False
if tasks_exists:
tasks_has_data = self.table_has_data('tasks')
if tasks_has_data:
print("Таблица 'tasks' существует и содержит данные. Пропускаем загрузку.")
else:
print("Таблица 'tasks' существует, но пустая.")
else:
self.create_tasks_table()
tables_status['tasks'] = tasks_has_data
# Проверяем таблицу employee_info
employee_info_exists = self.table_exists('employee_info')
employee_info_has_data = False
if employee_info_exists:
employee_info_has_data = self.table_has_data('employee_info')
if employee_info_has_data:
print("Таблица 'employee_info' существует и содержит данные. Пропускаем загрузку.")
else:
print("Таблица 'employee_info' существует, но пустая.")
else:
self.create_employee_info_table()
tables_status['employee_info'] = employee_info_has_data
return tables_status
def load_all_data(self, tables_status, employee=None, tasks=None, employee_info=None):
"""
Загрузка всех данных в базу
Args:
tables_status (dict): Статус таблиц из check_and_prepare_tables
employee (pd.DataFrame): Данные сотрудников
tasks (pd.DataFrame): Данные задач
employee_info (pd.DataFrame): Информация о сотрудниках
"""
if employee is not None and not tables_status['employee']:
self.load_employee_data(employee)
elif tables_status['employee']:
print("\nТаблица 'employee' уже содержит данные, пропускаем загрузку")
if tasks is not None and not tables_status['tasks']:
self.load_tasks_data(tasks)
elif tables_status['tasks']:
print("\nТаблица 'tasks' уже содержит данные, пропускаем загрузку")
if employee_info is not None and not tables_status['employee_info']:
self.load_employee_info_data(employee_info)
elif tables_status['employee_info']:
print("\nТаблица 'employee_info' уже содержит данные, пропускаем загрузку")
# ============================================================================
# ПОЛУЧЕНИЕ ДАННЫХ ДЛЯ АНАЛИЗА
# ============================================================================
def get_tasks_for_analysis(self, task_keys=None):
"""
Получение задач для анализа с описанием и исполнителем
Args:
task_keys (list): Список ключей задач для фильтрации (опционально)
Returns:
list: Список словарей с данными задач
"""
if task_keys:
# Формируем строку с ключами для SQL IN
keys_str = ", ".join([f"'{key}'" for key in task_keys])
query = f"""
SELECT *
FROM
(SELECT
t.queue,
t.task_key,
t.task_title,
t.value -> 'description' as description,
t.value -> 'assignee' ->> 'display' as assignee,
row_number() over (partition by task_key order by updated_at_dttm desc) as rn
FROM tasks t
WHERE task_key IN ({keys_str})) T1
WHERE rn = 1;
"""
else:
query = """
SELECT
t.queue,
t.task_key,
t.task_title,
t.value -> 'description' as description,
t.value -> 'assignee' ->> 'display' as assignee
FROM tasks t
ORDER BY t.updated_at_dttm DESC
LIMIT 100;
"""
self.cursor.execute(query)
columns = [desc[0] for desc in self.cursor.description]
rows = self.cursor.fetchall()
tasks = []
for row in rows:
task_dict = dict(zip(columns, row))
tasks.append(task_dict)
return tasks
# ============================================================================
# WIKI PAGES
# ============================================================================
def create_wiki_pages_table(self):
"""Создание таблицы wiki_pages"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS wiki_pages (
id SERIAL PRIMARY KEY,
pg_load_dttm TIMESTAMPTZ NOT NULL,
slug TEXT NOT NULL,
wiki_page_id INTEGER,
title TEXT,
page_type TEXT,
modified_at TIMESTAMPTZ,
content_hash TEXT NOT NULL,
value JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_wiki_pages_slug
ON wiki_pages (slug);
CREATE INDEX IF NOT EXISTS idx_wiki_pages_load
ON wiki_pages (pg_load_dttm DESC);
""")
self.conn.commit()
print("Таблица 'wiki_pages' создана")
def get_latest_hashes(self) -> dict[str, str]:
"""Получить последний content_hash для каждого slug."""
self.cursor.execute("""
SELECT DISTINCT ON (slug) slug, content_hash
FROM wiki_pages
ORDER BY slug, pg_load_dttm DESC;
""")
return {row[0]: row[1] for row in self.cursor.fetchall()}
def upsert_wiki_pages(self, pages: list[dict]) -> dict:
"""
Вставить новые и изменённые страницы.
Логика: сравниваем content_hash с последним сохранённым.
Если хеш совпадает — пропускаем. Если новый или изменился — вставляем.
Returns:
dict: {'inserted': int, 'unchanged': int}
"""
import json
if not pages:
return {'inserted': 0, 'unchanged': 0}
if not self.table_exists('wiki_pages'):
self.create_wiki_pages_table()
latest_hashes = self.get_latest_hashes()
inserted = 0
unchanged = 0
for page in pages:
slug = page['slug']
new_hash = page['content_hash']
stored_hash = latest_hashes.get(slug)
if stored_hash == new_hash:
unchanged += 1
continue
value = {
'slug': slug,
'wiki_page_id': page.get('wiki_page_id'),
'title': page.get('title'),
'page_type': page.get('page_type'),
'modified_at': str(page.get('modified_at') or ''),
'content': page.get('content', ''),
}
self.cursor.execute("""
INSERT INTO wiki_pages
(pg_load_dttm, slug, wiki_page_id, title, page_type, modified_at, content_hash, value)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb)
""", (
page['pg_load_dttm'],
slug,
page.get('wiki_page_id'),
page.get('title'),
page.get('page_type'),
page.get('modified_at'),
new_hash,
json.dumps(value, ensure_ascii=False),
))
inserted += 1
self.conn.commit()
return {'inserted': inserted, 'unchanged': unchanged}
def execute_sql_file(self, sql_file_path):
"""
Выполнение SQL запроса из файла
Args:
sql_file_path (str): Путь к файлу с SQL запросом
Returns:
list: Список словарей с результатами
"""
with open(sql_file_path, 'r', encoding='utf-8') as f:
query = f.read()
self.cursor.execute(query)
columns = [desc[0] for desc in self.cursor.description]
rows = self.cursor.fetchall()
results = []
for row in rows:
result_dict = dict(zip(columns, row))
results.append(result_dict)
return results