""" Модуль для работы с Supabase (PostgreSQL) """ import os from pathlib import Path import psycopg2 from dotenv import load_dotenv load_dotenv(Path(__file__).parent / '.env') class SupabaseManager: """Менеджер для работы с Supabase""" def __init__(self): """Инициализация подключения к Supabase""" self.host = os.getenv('SUPABASE_HOST') self.port = os.getenv('SUPABASE_PORT') self.user = os.getenv('SUPABASE_USER') self.password = os.getenv('SUPABASE_PASSWORD') self.database = os.getenv('SUPABASE_DB') self.conn = None self.cursor = None def connect(self): """Подключение к базе данных""" try: self.conn = psycopg2.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, sslmode='require', ) self.cursor = self.conn.cursor() # Проверка версии self.cursor.execute('SELECT version();') db_version = self.cursor.fetchone() print(f"Подключение к Supabase успешно!\nВерсия PostgreSQL: {db_version[0]}") return True except Exception as e: print(f"✗ Ошибка подключения к Supabase: {e}") return False def close(self): """Закрытие соединения""" if self.cursor: self.cursor.close() if self.conn: self.conn.close() print("✓ Соединение с базой данных закрыто") # ============================================================================ # ПРОВЕРКА ТАБЛИЦ # ============================================================================ def table_exists(self, table_name): """Проверка существования таблицы""" self.cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = %s ); """, (table_name,)) return self.cursor.fetchone()[0] def table_has_data(self, table_name): """Проверка, что таблица не пустая""" self.cursor.execute(f"SELECT EXISTS (SELECT 1 FROM {table_name} LIMIT 1);") return self.cursor.fetchone()[0] # ============================================================================ # СОЗДАНИЕ ТАБЛИЦ # ============================================================================ def create_employee_table(self): """Создание таблицы employee""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS employee ( id SERIAL PRIMARY KEY, pg_load_dttm TIMESTAMPTZ NOT NULL, yatracker_employee_id TEXT NOT NULL, value JSONB NOT NULL ); """) self.conn.commit() print("Таблица 'employee' создана") def create_tasks_table(self): """Создание таблицы tasks""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id SERIAL PRIMARY KEY, pg_load_dttm TIMESTAMPTZ NOT NULL, queue TEXT, task_key TEXT NOT NULL, task_title TEXT, status_name TEXT, created_at_dttm TIMESTAMPTZ, updated_at_dttm TIMESTAMPTZ, status_start_dttm TIMESTAMPTZ, value JSONB NOT NULL ); """) self.conn.commit() print("Таблица 'tasks' создана") def create_employee_info_table(self): """Создание таблицы employee_info""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS employee_info ( id SERIAL PRIMARY KEY, pg_load_dttm TIMESTAMPTZ NOT NULL, column_d TEXT, column_n TEXT ); """) self.conn.commit() print("Таблица 'employee_info' создана") # ============================================================================ # ЗАГРУЗКА ДАННЫХ # ============================================================================ def load_employee_data(self, employee_df): """Загрузка данных сотрудников в базу""" if len(employee_df) > 0: print("\nЗагружаем данные в таблицу 'employee'...") for _, row in employee_df.iterrows(): self.cursor.execute(""" INSERT INTO employee (pg_load_dttm, yatracker_employee_id, value) VALUES (%s, %s, %s::jsonb) """, (row['pg_load_dttm'], row['yatracker_employee_id'], row['value'])) self.conn.commit() print(f"✓ Загружено {len(employee_df)} записей в таблицу 'employee'") else: print("\nНет данных для загрузки в таблицу 'employee'") def load_tasks_data(self, tasks_df): """Загрузка данных задач в базу""" if len(tasks_df) > 0: print("\nЗагружаем данные в таблицу 'tasks'...") for _, row in tasks_df.iterrows(): self.cursor.execute(""" INSERT INTO tasks (pg_load_dttm, queue, task_key, task_title, status_name, created_at_dttm, updated_at_dttm, status_start_dttm, value) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb) """, ( row['pg_load_dttm'], row['queue'], row['task_key'], row['task_title'], row['status_name'], row['created_at_dttm'], row['updated_at_dttm'], row['status_start_dttm'], row['value'] )) self.conn.commit() print(f"✓ Загружено {len(tasks_df)} записей в таблицу 'tasks'") else: print("\nНет данных для загрузки в таблицу 'tasks'") def load_employee_info_data(self, employee_info_df): """Загрузка данных employee_info в базу""" if len(employee_info_df) > 0: print("\nЗагружаем данные в таблицу 'employee_info'...") for _, row in employee_info_df.iterrows(): self.cursor.execute(""" INSERT INTO employee_info (pg_load_dttm, column_d, column_n) VALUES (%s, %s, %s) """, ( row['pg_load_dttm'], row['column_d'], row['column_n'] )) self.conn.commit() print(f"✓ Загружено {len(employee_info_df)} записей в таблицу 'employee_info'") else: print("\nНет данных для загрузки в таблицу 'employee_info'") # ============================================================================ # ОСНОВНАЯ ЛОГИКА ПРОВЕРКИ И ЗАГРУЗКИ # ============================================================================ def check_and_prepare_tables(self): """ Проверка существования таблиц и создание при необходимости Returns: dict: Словарь с флагами наличия данных для каждой таблицы """ tables_status = {} # Проверяем таблицу employee employee_exists = self.table_exists('employee') employee_has_data = False if employee_exists: employee_has_data = self.table_has_data('employee') if employee_has_data: print("Таблица 'employee' существует и содержит данные. Пропускаем загрузку.") else: print("Таблица 'employee' существует, но пустая.") else: self.create_employee_table() tables_status['employee'] = employee_has_data # Проверяем таблицу tasks tasks_exists = self.table_exists('tasks') tasks_has_data = False if tasks_exists: tasks_has_data = self.table_has_data('tasks') if tasks_has_data: print("Таблица 'tasks' существует и содержит данные. Пропускаем загрузку.") else: print("Таблица 'tasks' существует, но пустая.") else: self.create_tasks_table() tables_status['tasks'] = tasks_has_data # Проверяем таблицу employee_info employee_info_exists = self.table_exists('employee_info') employee_info_has_data = False if employee_info_exists: employee_info_has_data = self.table_has_data('employee_info') if employee_info_has_data: print("Таблица 'employee_info' существует и содержит данные. Пропускаем загрузку.") else: print("Таблица 'employee_info' существует, но пустая.") else: self.create_employee_info_table() tables_status['employee_info'] = employee_info_has_data return tables_status def load_all_data(self, tables_status, employee=None, tasks=None, employee_info=None): """ Загрузка всех данных в базу Args: tables_status (dict): Статус таблиц из check_and_prepare_tables employee (pd.DataFrame): Данные сотрудников tasks (pd.DataFrame): Данные задач employee_info (pd.DataFrame): Информация о сотрудниках """ if employee is not None and not tables_status['employee']: self.load_employee_data(employee) elif tables_status['employee']: print("\nТаблица 'employee' уже содержит данные, пропускаем загрузку") if tasks is not None and not tables_status['tasks']: self.load_tasks_data(tasks) elif tables_status['tasks']: print("\nТаблица 'tasks' уже содержит данные, пропускаем загрузку") if employee_info is not None and not tables_status['employee_info']: self.load_employee_info_data(employee_info) elif tables_status['employee_info']: print("\nТаблица 'employee_info' уже содержит данные, пропускаем загрузку") # ============================================================================ # ПОЛУЧЕНИЕ ДАННЫХ ДЛЯ АНАЛИЗА # ============================================================================ def get_tasks_for_analysis(self, task_keys=None): """ Получение задач для анализа с описанием и исполнителем Args: task_keys (list): Список ключей задач для фильтрации (опционально) Returns: list: Список словарей с данными задач """ if task_keys: # Формируем строку с ключами для SQL IN keys_str = ", ".join([f"'{key}'" for key in task_keys]) query = f""" SELECT * FROM (SELECT t.queue, t.task_key, t.task_title, t.value -> 'description' as description, t.value -> 'assignee' ->> 'display' as assignee, row_number() over (partition by task_key order by updated_at_dttm desc) as rn FROM tasks t WHERE task_key IN ({keys_str})) T1 WHERE rn = 1; """ else: query = """ SELECT t.queue, t.task_key, t.task_title, t.value -> 'description' as description, t.value -> 'assignee' ->> 'display' as assignee FROM tasks t ORDER BY t.updated_at_dttm DESC LIMIT 100; """ self.cursor.execute(query) columns = [desc[0] for desc in self.cursor.description] rows = self.cursor.fetchall() tasks = [] for row in rows: task_dict = dict(zip(columns, row)) tasks.append(task_dict) return tasks # ============================================================================ # WIKI PAGES # ============================================================================ def create_wiki_pages_table(self): """Создание таблицы wiki_pages""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS wiki_pages ( id SERIAL PRIMARY KEY, pg_load_dttm TIMESTAMPTZ NOT NULL, slug TEXT NOT NULL, wiki_page_id INTEGER, title TEXT, page_type TEXT, modified_at TIMESTAMPTZ, content_hash TEXT NOT NULL, value JSONB NOT NULL ); CREATE INDEX IF NOT EXISTS idx_wiki_pages_slug ON wiki_pages (slug); CREATE INDEX IF NOT EXISTS idx_wiki_pages_load ON wiki_pages (pg_load_dttm DESC); """) self.conn.commit() print("Таблица 'wiki_pages' создана") def get_latest_hashes(self) -> dict[str, str]: """Получить последний content_hash для каждого slug.""" self.cursor.execute(""" SELECT DISTINCT ON (slug) slug, content_hash FROM wiki_pages ORDER BY slug, pg_load_dttm DESC; """) return {row[0]: row[1] for row in self.cursor.fetchall()} def upsert_wiki_pages(self, pages: list[dict]) -> dict: """ Вставить новые и изменённые страницы. Логика: сравниваем content_hash с последним сохранённым. Если хеш совпадает — пропускаем. Если новый или изменился — вставляем. Returns: dict: {'inserted': int, 'unchanged': int} """ import json if not pages: return {'inserted': 0, 'unchanged': 0} if not self.table_exists('wiki_pages'): self.create_wiki_pages_table() latest_hashes = self.get_latest_hashes() inserted = 0 unchanged = 0 for page in pages: slug = page['slug'] new_hash = page['content_hash'] stored_hash = latest_hashes.get(slug) if stored_hash == new_hash: unchanged += 1 continue value = { 'slug': slug, 'wiki_page_id': page.get('wiki_page_id'), 'title': page.get('title'), 'page_type': page.get('page_type'), 'modified_at': str(page.get('modified_at') or ''), 'content': page.get('content', ''), } self.cursor.execute(""" INSERT INTO wiki_pages (pg_load_dttm, slug, wiki_page_id, title, page_type, modified_at, content_hash, value) VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb) """, ( page['pg_load_dttm'], slug, page.get('wiki_page_id'), page.get('title'), page.get('page_type'), page.get('modified_at'), new_hash, json.dumps(value, ensure_ascii=False), )) inserted += 1 self.conn.commit() return {'inserted': inserted, 'unchanged': unchanged} def execute_sql_file(self, sql_file_path): """ Выполнение SQL запроса из файла Args: sql_file_path (str): Путь к файлу с SQL запросом Returns: list: Список словарей с результатами """ with open(sql_file_path, 'r', encoding='utf-8') as f: query = f.read() self.cursor.execute(query) columns = [desc[0] for desc in self.cursor.description] rows = self.cursor.fetchall() results = [] for row in rows: result_dict = dict(zip(columns, row)) results.append(result_dict) return results