444 lines
17 KiB
Python
444 lines
17 KiB
Python
"""
|
||
Модуль для работы с Supabase (PostgreSQL)
|
||
"""
|
||
import os
|
||
from pathlib import Path
|
||
import psycopg2
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(Path(__file__).parent / '.env')
|
||
|
||
|
||
class SupabaseManager:
|
||
"""Менеджер для работы с Supabase"""
|
||
|
||
def __init__(self):
|
||
"""Инициализация подключения к Supabase"""
|
||
self.host = os.getenv('SUPABASE_HOST')
|
||
self.port = os.getenv('SUPABASE_PORT')
|
||
self.user = os.getenv('SUPABASE_USER')
|
||
self.password = os.getenv('SUPABASE_PASSWORD')
|
||
self.database = os.getenv('SUPABASE_DB')
|
||
|
||
self.conn = None
|
||
self.cursor = None
|
||
|
||
def connect(self):
|
||
"""Подключение к базе данных"""
|
||
try:
|
||
self.conn = psycopg2.connect(
|
||
host=self.host,
|
||
port=self.port,
|
||
user=self.user,
|
||
password=self.password,
|
||
database=self.database,
|
||
sslmode='require',
|
||
)
|
||
self.cursor = self.conn.cursor()
|
||
|
||
# Проверка версии
|
||
self.cursor.execute('SELECT version();')
|
||
db_version = self.cursor.fetchone()
|
||
print(f"Подключение к Supabase успешно!\nВерсия PostgreSQL: {db_version[0]}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"✗ Ошибка подключения к Supabase: {e}")
|
||
return False
|
||
|
||
def close(self):
|
||
"""Закрытие соединения"""
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
if self.conn:
|
||
self.conn.close()
|
||
print("✓ Соединение с базой данных закрыто")
|
||
|
||
# ============================================================================
|
||
# ПРОВЕРКА ТАБЛИЦ
|
||
# ============================================================================
|
||
|
||
def table_exists(self, table_name):
|
||
"""Проверка существования таблицы"""
|
||
self.cursor.execute("""
|
||
SELECT EXISTS (
|
||
SELECT FROM information_schema.tables
|
||
WHERE table_schema = 'public'
|
||
AND table_name = %s
|
||
);
|
||
""", (table_name,))
|
||
return self.cursor.fetchone()[0]
|
||
|
||
def table_has_data(self, table_name):
|
||
"""Проверка, что таблица не пустая"""
|
||
self.cursor.execute(f"SELECT EXISTS (SELECT 1 FROM {table_name} LIMIT 1);")
|
||
return self.cursor.fetchone()[0]
|
||
|
||
# ============================================================================
|
||
# СОЗДАНИЕ ТАБЛИЦ
|
||
# ============================================================================
|
||
|
||
def create_employee_table(self):
|
||
"""Создание таблицы employee"""
|
||
self.cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS employee (
|
||
id SERIAL PRIMARY KEY,
|
||
pg_load_dttm TIMESTAMPTZ NOT NULL,
|
||
yatracker_employee_id TEXT NOT NULL,
|
||
value JSONB NOT NULL
|
||
);
|
||
""")
|
||
self.conn.commit()
|
||
print("Таблица 'employee' создана")
|
||
|
||
def create_tasks_table(self):
|
||
"""Создание таблицы tasks"""
|
||
self.cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS tasks (
|
||
id SERIAL PRIMARY KEY,
|
||
pg_load_dttm TIMESTAMPTZ NOT NULL,
|
||
queue TEXT,
|
||
task_key TEXT NOT NULL,
|
||
task_title TEXT,
|
||
status_name TEXT,
|
||
created_at_dttm TIMESTAMPTZ,
|
||
updated_at_dttm TIMESTAMPTZ,
|
||
status_start_dttm TIMESTAMPTZ,
|
||
value JSONB NOT NULL
|
||
);
|
||
""")
|
||
self.conn.commit()
|
||
print("Таблица 'tasks' создана")
|
||
|
||
def create_employee_info_table(self):
|
||
"""Создание таблицы employee_info"""
|
||
self.cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS employee_info (
|
||
id SERIAL PRIMARY KEY,
|
||
pg_load_dttm TIMESTAMPTZ NOT NULL,
|
||
column_d TEXT,
|
||
column_n TEXT
|
||
);
|
||
""")
|
||
self.conn.commit()
|
||
print("Таблица 'employee_info' создана")
|
||
|
||
# ============================================================================
|
||
# ЗАГРУЗКА ДАННЫХ
|
||
# ============================================================================
|
||
|
||
def load_employee_data(self, employee_df):
|
||
"""Загрузка данных сотрудников в базу"""
|
||
if len(employee_df) > 0:
|
||
print("\nЗагружаем данные в таблицу 'employee'...")
|
||
for _, row in employee_df.iterrows():
|
||
self.cursor.execute("""
|
||
INSERT INTO employee (pg_load_dttm, yatracker_employee_id, value)
|
||
VALUES (%s, %s, %s::jsonb)
|
||
""", (row['pg_load_dttm'], row['yatracker_employee_id'], row['value']))
|
||
self.conn.commit()
|
||
print(f"✓ Загружено {len(employee_df)} записей в таблицу 'employee'")
|
||
else:
|
||
print("\nНет данных для загрузки в таблицу 'employee'")
|
||
|
||
def load_tasks_data(self, tasks_df):
|
||
"""Загрузка данных задач в базу"""
|
||
if len(tasks_df) > 0:
|
||
print("\nЗагружаем данные в таблицу 'tasks'...")
|
||
for _, row in tasks_df.iterrows():
|
||
self.cursor.execute("""
|
||
INSERT INTO tasks (pg_load_dttm, queue, task_key, task_title, status_name,
|
||
created_at_dttm, updated_at_dttm, status_start_dttm, value)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb)
|
||
""", (
|
||
row['pg_load_dttm'],
|
||
row['queue'],
|
||
row['task_key'],
|
||
row['task_title'],
|
||
row['status_name'],
|
||
row['created_at_dttm'],
|
||
row['updated_at_dttm'],
|
||
row['status_start_dttm'],
|
||
row['value']
|
||
))
|
||
self.conn.commit()
|
||
print(f"✓ Загружено {len(tasks_df)} записей в таблицу 'tasks'")
|
||
else:
|
||
print("\nНет данных для загрузки в таблицу 'tasks'")
|
||
|
||
def load_employee_info_data(self, employee_info_df):
|
||
"""Загрузка данных employee_info в базу"""
|
||
if len(employee_info_df) > 0:
|
||
print("\nЗагружаем данные в таблицу 'employee_info'...")
|
||
for _, row in employee_info_df.iterrows():
|
||
self.cursor.execute("""
|
||
INSERT INTO employee_info (pg_load_dttm, column_d, column_n)
|
||
VALUES (%s, %s, %s)
|
||
""", (
|
||
row['pg_load_dttm'],
|
||
row['column_d'],
|
||
row['column_n']
|
||
))
|
||
self.conn.commit()
|
||
print(f"✓ Загружено {len(employee_info_df)} записей в таблицу 'employee_info'")
|
||
else:
|
||
print("\nНет данных для загрузки в таблицу 'employee_info'")
|
||
|
||
# ============================================================================
|
||
# ОСНОВНАЯ ЛОГИКА ПРОВЕРКИ И ЗАГРУЗКИ
|
||
# ============================================================================
|
||
|
||
def check_and_prepare_tables(self):
|
||
"""
|
||
Проверка существования таблиц и создание при необходимости
|
||
|
||
Returns:
|
||
dict: Словарь с флагами наличия данных для каждой таблицы
|
||
"""
|
||
tables_status = {}
|
||
|
||
# Проверяем таблицу employee
|
||
employee_exists = self.table_exists('employee')
|
||
employee_has_data = False
|
||
|
||
if employee_exists:
|
||
employee_has_data = self.table_has_data('employee')
|
||
if employee_has_data:
|
||
print("Таблица 'employee' существует и содержит данные. Пропускаем загрузку.")
|
||
else:
|
||
print("Таблица 'employee' существует, но пустая.")
|
||
else:
|
||
self.create_employee_table()
|
||
|
||
tables_status['employee'] = employee_has_data
|
||
|
||
# Проверяем таблицу tasks
|
||
tasks_exists = self.table_exists('tasks')
|
||
tasks_has_data = False
|
||
|
||
if tasks_exists:
|
||
tasks_has_data = self.table_has_data('tasks')
|
||
if tasks_has_data:
|
||
print("Таблица 'tasks' существует и содержит данные. Пропускаем загрузку.")
|
||
else:
|
||
print("Таблица 'tasks' существует, но пустая.")
|
||
else:
|
||
self.create_tasks_table()
|
||
|
||
tables_status['tasks'] = tasks_has_data
|
||
|
||
# Проверяем таблицу employee_info
|
||
employee_info_exists = self.table_exists('employee_info')
|
||
employee_info_has_data = False
|
||
|
||
if employee_info_exists:
|
||
employee_info_has_data = self.table_has_data('employee_info')
|
||
if employee_info_has_data:
|
||
print("Таблица 'employee_info' существует и содержит данные. Пропускаем загрузку.")
|
||
else:
|
||
print("Таблица 'employee_info' существует, но пустая.")
|
||
else:
|
||
self.create_employee_info_table()
|
||
|
||
tables_status['employee_info'] = employee_info_has_data
|
||
|
||
return tables_status
|
||
|
||
def load_all_data(self, tables_status, employee=None, tasks=None, employee_info=None):
|
||
"""
|
||
Загрузка всех данных в базу
|
||
|
||
Args:
|
||
tables_status (dict): Статус таблиц из check_and_prepare_tables
|
||
employee (pd.DataFrame): Данные сотрудников
|
||
tasks (pd.DataFrame): Данные задач
|
||
employee_info (pd.DataFrame): Информация о сотрудниках
|
||
"""
|
||
if employee is not None and not tables_status['employee']:
|
||
self.load_employee_data(employee)
|
||
elif tables_status['employee']:
|
||
print("\nТаблица 'employee' уже содержит данные, пропускаем загрузку")
|
||
|
||
if tasks is not None and not tables_status['tasks']:
|
||
self.load_tasks_data(tasks)
|
||
elif tables_status['tasks']:
|
||
print("\nТаблица 'tasks' уже содержит данные, пропускаем загрузку")
|
||
|
||
if employee_info is not None and not tables_status['employee_info']:
|
||
self.load_employee_info_data(employee_info)
|
||
elif tables_status['employee_info']:
|
||
print("\nТаблица 'employee_info' уже содержит данные, пропускаем загрузку")
|
||
|
||
# ============================================================================
|
||
# ПОЛУЧЕНИЕ ДАННЫХ ДЛЯ АНАЛИЗА
|
||
# ============================================================================
|
||
|
||
def get_tasks_for_analysis(self, task_keys=None):
|
||
"""
|
||
Получение задач для анализа с описанием и исполнителем
|
||
|
||
Args:
|
||
task_keys (list): Список ключей задач для фильтрации (опционально)
|
||
|
||
Returns:
|
||
list: Список словарей с данными задач
|
||
"""
|
||
if task_keys:
|
||
# Формируем строку с ключами для SQL IN
|
||
keys_str = ", ".join([f"'{key}'" for key in task_keys])
|
||
query = f"""
|
||
SELECT *
|
||
FROM
|
||
(SELECT
|
||
t.queue,
|
||
t.task_key,
|
||
t.task_title,
|
||
t.value -> 'description' as description,
|
||
t.value -> 'assignee' ->> 'display' as assignee,
|
||
row_number() over (partition by task_key order by updated_at_dttm desc) as rn
|
||
FROM tasks t
|
||
WHERE task_key IN ({keys_str})) T1
|
||
WHERE rn = 1;
|
||
"""
|
||
else:
|
||
query = """
|
||
SELECT
|
||
t.queue,
|
||
t.task_key,
|
||
t.task_title,
|
||
t.value -> 'description' as description,
|
||
t.value -> 'assignee' ->> 'display' as assignee
|
||
FROM tasks t
|
||
ORDER BY t.updated_at_dttm DESC
|
||
LIMIT 100;
|
||
"""
|
||
|
||
self.cursor.execute(query)
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
rows = self.cursor.fetchall()
|
||
|
||
tasks = []
|
||
for row in rows:
|
||
task_dict = dict(zip(columns, row))
|
||
tasks.append(task_dict)
|
||
|
||
return tasks
|
||
|
||
# ============================================================================
|
||
# WIKI PAGES
|
||
# ============================================================================
|
||
|
||
def create_wiki_pages_table(self):
|
||
"""Создание таблицы wiki_pages"""
|
||
self.cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS wiki_pages (
|
||
id SERIAL PRIMARY KEY,
|
||
pg_load_dttm TIMESTAMPTZ NOT NULL,
|
||
slug TEXT NOT NULL,
|
||
wiki_page_id INTEGER,
|
||
title TEXT,
|
||
page_type TEXT,
|
||
modified_at TIMESTAMPTZ,
|
||
content_hash TEXT NOT NULL,
|
||
value JSONB NOT NULL
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_wiki_pages_slug
|
||
ON wiki_pages (slug);
|
||
CREATE INDEX IF NOT EXISTS idx_wiki_pages_load
|
||
ON wiki_pages (pg_load_dttm DESC);
|
||
""")
|
||
self.conn.commit()
|
||
print("Таблица 'wiki_pages' создана")
|
||
|
||
def get_latest_hashes(self) -> dict[str, str]:
|
||
"""Получить последний content_hash для каждого slug."""
|
||
self.cursor.execute("""
|
||
SELECT DISTINCT ON (slug) slug, content_hash
|
||
FROM wiki_pages
|
||
ORDER BY slug, pg_load_dttm DESC;
|
||
""")
|
||
return {row[0]: row[1] for row in self.cursor.fetchall()}
|
||
|
||
def upsert_wiki_pages(self, pages: list[dict]) -> dict:
|
||
"""
|
||
Вставить новые и изменённые страницы.
|
||
|
||
Логика: сравниваем content_hash с последним сохранённым.
|
||
Если хеш совпадает — пропускаем. Если новый или изменился — вставляем.
|
||
|
||
Returns:
|
||
dict: {'inserted': int, 'unchanged': int}
|
||
"""
|
||
import json
|
||
|
||
if not pages:
|
||
return {'inserted': 0, 'unchanged': 0}
|
||
|
||
if not self.table_exists('wiki_pages'):
|
||
self.create_wiki_pages_table()
|
||
|
||
latest_hashes = self.get_latest_hashes()
|
||
|
||
inserted = 0
|
||
unchanged = 0
|
||
|
||
for page in pages:
|
||
slug = page['slug']
|
||
new_hash = page['content_hash']
|
||
stored_hash = latest_hashes.get(slug)
|
||
|
||
if stored_hash == new_hash:
|
||
unchanged += 1
|
||
continue
|
||
|
||
value = {
|
||
'slug': slug,
|
||
'wiki_page_id': page.get('wiki_page_id'),
|
||
'title': page.get('title'),
|
||
'page_type': page.get('page_type'),
|
||
'modified_at': str(page.get('modified_at') or ''),
|
||
'content': page.get('content', ''),
|
||
}
|
||
|
||
self.cursor.execute("""
|
||
INSERT INTO wiki_pages
|
||
(pg_load_dttm, slug, wiki_page_id, title, page_type, modified_at, content_hash, value)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb)
|
||
""", (
|
||
page['pg_load_dttm'],
|
||
slug,
|
||
page.get('wiki_page_id'),
|
||
page.get('title'),
|
||
page.get('page_type'),
|
||
page.get('modified_at'),
|
||
new_hash,
|
||
json.dumps(value, ensure_ascii=False),
|
||
))
|
||
inserted += 1
|
||
|
||
self.conn.commit()
|
||
return {'inserted': inserted, 'unchanged': unchanged}
|
||
|
||
def execute_sql_file(self, sql_file_path):
|
||
"""
|
||
Выполнение SQL запроса из файла
|
||
|
||
Args:
|
||
sql_file_path (str): Путь к файлу с SQL запросом
|
||
|
||
Returns:
|
||
list: Список словарей с результатами
|
||
"""
|
||
with open(sql_file_path, 'r', encoding='utf-8') as f:
|
||
query = f.read()
|
||
|
||
self.cursor.execute(query)
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
rows = self.cursor.fetchall()
|
||
|
||
results = []
|
||
for row in rows:
|
||
result_dict = dict(zip(columns, row))
|
||
results.append(result_dict)
|
||
|
||
return results
|