Files
tg-digest/collector.py
Тимур Абайдулин 9fd7d42c6a init
2026-02-07 14:46:15 +03:00

144 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Сбор сообщений из Telegram-каналов через Telethon.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass, field
from telethon import TelegramClient
from telethon.errors import (
FloodWaitError,
ChannelPrivateError,
UsernameNotOccupiedError,
UsernameInvalidError,
)
logger = logging.getLogger(__name__)
@dataclass
class Message:
"""Одно сообщение из канала."""
channel: str
channel_title: str
text: str
date: datetime
views: int = 0
url: str = ""
@dataclass
class CollectorResult:
"""Результат сбора."""
messages: list[Message] = field(default_factory=list)
errors: dict[str, str] = field(default_factory=dict)
class MessageCollector:
def __init__(self, config: dict):
tg = config["telegram"]
self.client = TelegramClient(
tg["session_name"],
tg["api_id"],
tg["api_hash"],
)
self.channels: list[str] = tg["channels"]
self.hours_back: int = tg.get("hours_back", 12)
self.max_per_channel: int = tg.get("max_messages_per_channel", 50)
self.delay: float = tg.get("delay_between_channels", 2)
self.min_length: int = tg.get("min_message_length", 50)
async def auth(self):
"""Интерактивная авторизация (первый запуск)."""
await self.client.start()
me = await self.client.get_me()
logger.info(f"Авторизован как: {me.first_name} ({me.phone})")
await self.client.disconnect()
async def collect(self) -> CollectorResult:
"""Собрать сообщения из всех каналов."""
result = CollectorResult()
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.hours_back)
await self.client.connect()
if not await self.client.is_user_authorized():
raise RuntimeError(
"Сессия не авторизована. Запусти: python main.py --auth"
)
for channel in self.channels:
try:
messages = await self._collect_channel(channel, cutoff)
result.messages.extend(messages)
logger.info(f"{channel}: {len(messages)} сообщений")
except ChannelPrivateError:
err = "Канал приватный или вы не подписаны"
result.errors[channel] = err
logger.warning(f"{channel}: {err}")
except (UsernameNotOccupiedError, UsernameInvalidError):
err = "Канал не найден"
result.errors[channel] = err
logger.warning(f"{channel}: {err}")
except FloodWaitError as e:
logger.warning(f" ⏳ FloodWait: ждём {e.seconds}с")
await asyncio.sleep(e.seconds)
# Повторяем попытку
try:
messages = await self._collect_channel(channel, cutoff)
result.messages.extend(messages)
except Exception as retry_err:
result.errors[channel] = str(retry_err)
except Exception as e:
result.errors[channel] = str(e)
logger.warning(f"{channel}: {e}")
await asyncio.sleep(self.delay)
await self.client.disconnect()
# Сортировка по дате (новые первые)
result.messages.sort(key=lambda m: m.date, reverse=True)
logger.info(
f"Итого: {len(result.messages)} сообщений из "
f"{len(self.channels) - len(result.errors)} каналов"
)
return result
async def _collect_channel(
self, channel: str, cutoff: datetime
) -> list[Message]:
"""Собрать сообщения из одного канала."""
entity = await self.client.get_entity(channel)
channel_title = getattr(entity, "title", channel)
messages = []
async for msg in self.client.iter_messages(
entity, limit=self.max_per_channel
):
# Пропускаем старые
if msg.date < cutoff:
break
# Пропускаем без текста и короткие
if not msg.text or len(msg.text.strip()) < self.min_length:
continue
# Формируем ссылку
username = getattr(entity, "username", None)
url = f"https://t.me/{username}/{msg.id}" if username else ""
messages.append(
Message(
channel=channel,
channel_title=channel_title,
text=msg.text.strip(),
date=msg.date,
views=msg.views or 0,
url=url,
)
)
return messages