144 lines
5.0 KiB
Python
144 lines
5.0 KiB
Python
"""
|
||
Сбор сообщений из Telegram-каналов через Telethon.
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
from datetime import datetime, timedelta, timezone
|
||
from dataclasses import dataclass, field
|
||
|
||
from telethon import TelegramClient
|
||
from telethon.errors import (
|
||
FloodWaitError,
|
||
ChannelPrivateError,
|
||
UsernameNotOccupiedError,
|
||
UsernameInvalidError,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class Message:
|
||
"""Одно сообщение из канала."""
|
||
channel: str
|
||
channel_title: str
|
||
text: str
|
||
date: datetime
|
||
views: int = 0
|
||
url: str = ""
|
||
|
||
|
||
@dataclass
|
||
class CollectorResult:
|
||
"""Результат сбора."""
|
||
messages: list[Message] = field(default_factory=list)
|
||
errors: dict[str, str] = field(default_factory=dict)
|
||
|
||
|
||
class MessageCollector:
|
||
def __init__(self, config: dict):
|
||
tg = config["telegram"]
|
||
self.client = TelegramClient(
|
||
tg["session_name"],
|
||
tg["api_id"],
|
||
tg["api_hash"],
|
||
)
|
||
self.channels: list[str] = tg["channels"]
|
||
self.hours_back: int = tg.get("hours_back", 12)
|
||
self.max_per_channel: int = tg.get("max_messages_per_channel", 50)
|
||
self.delay: float = tg.get("delay_between_channels", 2)
|
||
self.min_length: int = tg.get("min_message_length", 50)
|
||
|
||
async def auth(self):
|
||
"""Интерактивная авторизация (первый запуск)."""
|
||
await self.client.start()
|
||
me = await self.client.get_me()
|
||
logger.info(f"Авторизован как: {me.first_name} ({me.phone})")
|
||
await self.client.disconnect()
|
||
|
||
async def collect(self) -> CollectorResult:
|
||
"""Собрать сообщения из всех каналов."""
|
||
result = CollectorResult()
|
||
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.hours_back)
|
||
|
||
await self.client.connect()
|
||
if not await self.client.is_user_authorized():
|
||
raise RuntimeError(
|
||
"Сессия не авторизована. Запусти: python main.py --auth"
|
||
)
|
||
|
||
for channel in self.channels:
|
||
try:
|
||
messages = await self._collect_channel(channel, cutoff)
|
||
result.messages.extend(messages)
|
||
logger.info(f" ✓ {channel}: {len(messages)} сообщений")
|
||
except ChannelPrivateError:
|
||
err = "Канал приватный или вы не подписаны"
|
||
result.errors[channel] = err
|
||
logger.warning(f" ✗ {channel}: {err}")
|
||
except (UsernameNotOccupiedError, UsernameInvalidError):
|
||
err = "Канал не найден"
|
||
result.errors[channel] = err
|
||
logger.warning(f" ✗ {channel}: {err}")
|
||
except FloodWaitError as e:
|
||
logger.warning(f" ⏳ FloodWait: ждём {e.seconds}с")
|
||
await asyncio.sleep(e.seconds)
|
||
# Повторяем попытку
|
||
try:
|
||
messages = await self._collect_channel(channel, cutoff)
|
||
result.messages.extend(messages)
|
||
except Exception as retry_err:
|
||
result.errors[channel] = str(retry_err)
|
||
except Exception as e:
|
||
result.errors[channel] = str(e)
|
||
logger.warning(f" ✗ {channel}: {e}")
|
||
|
||
await asyncio.sleep(self.delay)
|
||
|
||
await self.client.disconnect()
|
||
|
||
# Сортировка по дате (новые первые)
|
||
result.messages.sort(key=lambda m: m.date, reverse=True)
|
||
logger.info(
|
||
f"Итого: {len(result.messages)} сообщений из "
|
||
f"{len(self.channels) - len(result.errors)} каналов"
|
||
)
|
||
return result
|
||
|
||
async def _collect_channel(
|
||
self, channel: str, cutoff: datetime
|
||
) -> list[Message]:
|
||
"""Собрать сообщения из одного канала."""
|
||
entity = await self.client.get_entity(channel)
|
||
channel_title = getattr(entity, "title", channel)
|
||
messages = []
|
||
|
||
async for msg in self.client.iter_messages(
|
||
entity, limit=self.max_per_channel
|
||
):
|
||
# Пропускаем старые
|
||
if msg.date < cutoff:
|
||
break
|
||
|
||
# Пропускаем без текста и короткие
|
||
if not msg.text or len(msg.text.strip()) < self.min_length:
|
||
continue
|
||
|
||
# Формируем ссылку
|
||
username = getattr(entity, "username", None)
|
||
url = f"https://t.me/{username}/{msg.id}" if username else ""
|
||
|
||
messages.append(
|
||
Message(
|
||
channel=channel,
|
||
channel_title=channel_title,
|
||
text=msg.text.strip(),
|
||
date=msg.date,
|
||
views=msg.views or 0,
|
||
url=url,
|
||
)
|
||
)
|
||
|
||
return messages
|