""" Сбор сообщений из Telegram-каналов через Telethon. """ import asyncio import logging from datetime import datetime, timedelta, timezone from dataclasses import dataclass, field from pathlib import Path from telethon import TelegramClient from telethon.errors import ( FloodWaitError, ChannelPrivateError, UsernameNotOccupiedError, UsernameInvalidError, ) logger = logging.getLogger(__name__) SESSION_DIR = Path("/data") @dataclass class Message: channel: str channel_title: str text: str date: datetime views: int = 0 url: str = "" @dataclass class CollectorResult: messages: list[Message] = field(default_factory=list) errors: dict[str, str] = field(default_factory=dict) class MessageCollector: def __init__(self, config: dict): tg = config["telegram"] session_path = SESSION_DIR / tg["session_name"] self.client = TelegramClient( str(session_path), tg["api_id"], tg["api_hash"] ) self.channels: list[str] = tg["channels"] self.hours_back: int = tg.get("hours_back", 12) self.max_per_channel: int = tg.get("max_messages_per_channel", 50) self.delay: float = tg.get("delay_between_channels", 2) self.min_length: int = tg.get("min_message_length", 50) async def auth(self): """Интерактивная авторизация (первый запуск).""" await self.client.start() me = await self.client.get_me() logger.info(f"Авторизован как: {me.first_name} ({me.phone})") await self.client.disconnect() async def collect(self) -> CollectorResult: """Собрать сообщения из всех каналов.""" result = CollectorResult() cutoff = datetime.now(timezone.utc) - timedelta(hours=self.hours_back) await self.client.connect() if not await self.client.is_user_authorized(): raise RuntimeError( "Сессия не авторизована. Запусти:\n" "docker compose run --rm app python main.py --auth" ) for channel in self.channels: try: messages = await self._collect_channel(channel, cutoff) result.messages.extend(messages) logger.info(f" ✓ {channel}: {len(messages)} сообщений") except ChannelPrivateError: err = "Канал приватный или вы не подписаны" result.errors[channel] = err logger.warning(f" ✗ {channel}: {err}") except (UsernameNotOccupiedError, UsernameInvalidError): err = "Канал не найден" result.errors[channel] = err logger.warning(f" ✗ {channel}: {err}") except FloodWaitError as e: logger.warning(f" ⏳ FloodWait: ждём {e.seconds}с") await asyncio.sleep(e.seconds) try: messages = await self._collect_channel(channel, cutoff) result.messages.extend(messages) except Exception as retry_err: result.errors[channel] = str(retry_err) except Exception as e: result.errors[channel] = str(e) logger.warning(f" ✗ {channel}: {e}") await asyncio.sleep(self.delay) await self.client.disconnect() result.messages.sort(key=lambda m: m.date, reverse=True) logger.info( f"Итого: {len(result.messages)} сообщений из " f"{len(self.channels) - len(result.errors)} каналов" ) return result async def _collect_channel( self, channel: str, cutoff: datetime ) -> list[Message]: entity = await self.client.get_entity(channel) channel_title = getattr(entity, "title", channel) messages = [] async for msg in self.client.iter_messages( entity, limit=self.max_per_channel ): if msg.date < cutoff: break if not msg.text or len(msg.text.strip()) < self.min_length: continue username = getattr(entity, "username", None) url = f"https://t.me/{username}/{msg.id}" if username else "" messages.append( Message( channel=channel, channel_title=channel_title, text=msg.text.strip(), date=msg.date, views=msg.views or 0, url=url, ) ) return messages