This commit is contained in:
Тимур Абайдулин
2026-02-07 16:31:46 +03:00
commit ba1245a06c
11 changed files with 754 additions and 0 deletions

136
collector.py Normal file
View File

@@ -0,0 +1,136 @@
"""
Сбор сообщений из Telegram-каналов через Telethon.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass, field
from pathlib import Path
from telethon import TelegramClient
from telethon.errors import (
FloodWaitError,
ChannelPrivateError,
UsernameNotOccupiedError,
UsernameInvalidError,
)
logger = logging.getLogger(__name__)
SESSION_DIR = Path("/data")
@dataclass
class Message:
channel: str
channel_title: str
text: str
date: datetime
views: int = 0
url: str = ""
@dataclass
class CollectorResult:
messages: list[Message] = field(default_factory=list)
errors: dict[str, str] = field(default_factory=dict)
class MessageCollector:
def __init__(self, config: dict):
tg = config["telegram"]
session_path = SESSION_DIR / tg["session_name"]
self.client = TelegramClient(
str(session_path), tg["api_id"], tg["api_hash"]
)
self.channels: list[str] = tg["channels"]
self.hours_back: int = tg.get("hours_back", 12)
self.max_per_channel: int = tg.get("max_messages_per_channel", 50)
self.delay: float = tg.get("delay_between_channels", 2)
self.min_length: int = tg.get("min_message_length", 50)
async def auth(self):
"""Интерактивная авторизация (первый запуск)."""
await self.client.start()
me = await self.client.get_me()
logger.info(f"Авторизован как: {me.first_name} ({me.phone})")
await self.client.disconnect()
async def collect(self) -> CollectorResult:
"""Собрать сообщения из всех каналов."""
result = CollectorResult()
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.hours_back)
await self.client.connect()
if not await self.client.is_user_authorized():
raise RuntimeError(
"Сессия не авторизована. Запусти:\n"
"docker compose run --rm app python main.py --auth"
)
for channel in self.channels:
try:
messages = await self._collect_channel(channel, cutoff)
result.messages.extend(messages)
logger.info(f"{channel}: {len(messages)} сообщений")
except ChannelPrivateError:
err = "Канал приватный или вы не подписаны"
result.errors[channel] = err
logger.warning(f"{channel}: {err}")
except (UsernameNotOccupiedError, UsernameInvalidError):
err = "Канал не найден"
result.errors[channel] = err
logger.warning(f"{channel}: {err}")
except FloodWaitError as e:
logger.warning(f" ⏳ FloodWait: ждём {e.seconds}с")
await asyncio.sleep(e.seconds)
try:
messages = await self._collect_channel(channel, cutoff)
result.messages.extend(messages)
except Exception as retry_err:
result.errors[channel] = str(retry_err)
except Exception as e:
result.errors[channel] = str(e)
logger.warning(f"{channel}: {e}")
await asyncio.sleep(self.delay)
await self.client.disconnect()
result.messages.sort(key=lambda m: m.date, reverse=True)
logger.info(
f"Итого: {len(result.messages)} сообщений из "
f"{len(self.channels) - len(result.errors)} каналов"
)
return result
async def _collect_channel(
self, channel: str, cutoff: datetime
) -> list[Message]:
entity = await self.client.get_entity(channel)
channel_title = getattr(entity, "title", channel)
messages = []
async for msg in self.client.iter_messages(
entity, limit=self.max_per_channel
):
if msg.date < cutoff:
break
if not msg.text or len(msg.text.strip()) < self.min_length:
continue
username = getattr(entity, "username", None)
url = f"https://t.me/{username}/{msg.id}" if username else ""
messages.append(
Message(
channel=channel,
channel_title=channel_title,
text=msg.text.strip(),
date=msg.date,
views=msg.views or 0,
url=url,
)
)
return messages