213 lines
7.1 KiB
Python
213 lines
7.1 KiB
Python
import os
|
||
from typing import Optional, Dict, List, Any
|
||
from supabase import create_client, Client
|
||
from dotenv import load_dotenv
|
||
|
||
# Загружаем переменные окружения
|
||
load_dotenv()
|
||
|
||
|
||
class SupabaseManager:
|
||
"""
|
||
Класс для управления подключением и операциями с Supabase
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""
|
||
Инициализация подключения к Supabase
|
||
"""
|
||
self.url: str = os.getenv("SUPABASE_URL", "")
|
||
self.key: str = os.getenv("SUPABASE_KEY", "")
|
||
self.client: Optional[Client] = None
|
||
|
||
if not self.url or not self.key:
|
||
raise ValueError(
|
||
"SUPABASE_URL и SUPABASE_KEY должны быть установлены в переменных окружения"
|
||
)
|
||
|
||
self._connect()
|
||
|
||
def _connect(self) -> None:
|
||
"""
|
||
Создает подключение к Supabase
|
||
"""
|
||
try:
|
||
self.client = create_client(self.url, self.key)
|
||
print("✅ Успешное подключение к Supabase")
|
||
except Exception as e:
|
||
print(f"❌ Ошибка подключения к Supabase: {e}")
|
||
raise
|
||
|
||
def get_client(self) -> Client:
|
||
"""
|
||
Возвращает клиент Supabase
|
||
|
||
Returns:
|
||
Client: Клиент Supabase
|
||
"""
|
||
if not self.client:
|
||
raise ConnectionError("Клиент Supabase не инициализирован")
|
||
return self.client
|
||
|
||
# === Методы для работы с данными ===
|
||
|
||
def select(
|
||
self,
|
||
table: str,
|
||
columns: str = "*",
|
||
filters: Optional[Dict[str, Any]] = None,
|
||
limit: Optional[int] = None
|
||
) -> List[Dict]:
|
||
"""
|
||
Выполняет SELECT запрос к таблице
|
||
|
||
Args:
|
||
table: Название таблицы
|
||
columns: Колонки для выборки (по умолчанию все)
|
||
filters: Словарь с фильтрами {column: value}
|
||
limit: Лимит количества записей
|
||
|
||
Returns:
|
||
List[Dict]: Список словарей с данными
|
||
"""
|
||
try:
|
||
query = self.client.table(table).select(columns)
|
||
|
||
if filters:
|
||
for column, value in filters.items():
|
||
query = query.eq(column, value)
|
||
|
||
if limit:
|
||
query = query.limit(limit)
|
||
|
||
response = query.execute()
|
||
return response.data
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при выполнении SELECT: {e}")
|
||
return []
|
||
|
||
def insert(self, table: str, data: Dict[str, Any]) -> Dict:
|
||
"""
|
||
Вставляет запись в таблицу
|
||
|
||
Args:
|
||
table: Название таблицы
|
||
data: Словарь с данными для вставки
|
||
|
||
Returns:
|
||
Dict: Вставленная запись
|
||
"""
|
||
try:
|
||
response = self.client.table(table).insert(data).execute()
|
||
print(f"✅ Запись успешно добавлена в таблицу {table}")
|
||
return response.data[0] if response.data else {}
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при вставке данных: {e}")
|
||
return {}
|
||
|
||
def update(
|
||
self,
|
||
table: str,
|
||
data: Dict[str, Any],
|
||
filters: Dict[str, Any]
|
||
) -> List[Dict]:
|
||
"""
|
||
Обновляет записи в таблице
|
||
|
||
Args:
|
||
table: Название таблицы
|
||
data: Словарь с данными для обновления
|
||
filters: Словарь с фильтрами {column: value}
|
||
|
||
Returns:
|
||
List[Dict]: Обновленные записи
|
||
"""
|
||
try:
|
||
query = self.client.table(table).update(data)
|
||
|
||
for column, value in filters.items():
|
||
query = query.eq(column, value)
|
||
|
||
response = query.execute()
|
||
print(f"✅ Записи успешно обновлены в таблице {table}")
|
||
return response.data
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при обновлении данных: {e}")
|
||
return []
|
||
|
||
def delete(self, table: str, filters: Dict[str, Any]) -> List[Dict]:
|
||
"""
|
||
Удаляет записи из таблицы
|
||
|
||
Args:
|
||
table: Название таблицы
|
||
filters: Словарь с фильтрами {column: value}
|
||
|
||
Returns:
|
||
List[Dict]: Удаленные записи
|
||
"""
|
||
try:
|
||
query = self.client.table(table).delete()
|
||
|
||
for column, value in filters.items():
|
||
query = query.eq(column, value)
|
||
|
||
response = query.execute()
|
||
print(f"✅ Записи успешно удалены из таблицы {table}")
|
||
return response.data
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при удалении данных: {e}")
|
||
return []
|
||
|
||
# === Методы для работы с Storage ===
|
||
|
||
def upload_file(
|
||
self,
|
||
bucket: str,
|
||
file_path: str,
|
||
file_data: bytes,
|
||
content_type: Optional[str] = None
|
||
) -> bool:
|
||
"""
|
||
Загружает файл в Storage
|
||
|
||
Args:
|
||
bucket: Название bucket
|
||
file_path: Путь к файлу в bucket
|
||
file_data: Данные файла в байтах
|
||
content_type: MIME тип файла
|
||
|
||
Returns:
|
||
bool: True если загрузка успешна
|
||
"""
|
||
try:
|
||
options = {"content-type": content_type} if content_type else {}
|
||
self.client.storage.from_(bucket).upload(
|
||
file_path,
|
||
file_data,
|
||
file_options=options
|
||
)
|
||
print(f"✅ Файл {file_path} успешно загружен в bucket {bucket}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при загрузке файла: {e}")
|
||
return False
|
||
|
||
def get_public_url(self, bucket: str, file_path: str) -> str:
|
||
"""
|
||
Получает публичный URL файла
|
||
|
||
Args:
|
||
bucket: Название bucket
|
||
file_path: Путь к файлу в bucket
|
||
|
||
Returns:
|
||
str: Публичный URL
|
||
"""
|
||
try:
|
||
url = self.client.storage.from_(bucket).get_public_url(file_path)
|
||
return url
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при получении URL: {e}")
|
||
return ""
|