Files
tg-digest-docker/collector.py
Тимур Абайдулин ba1245a06c init
2026-02-07 16:31:46 +03:00

137 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Сбор сообщений из Telegram-каналов через Telethon.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass, field
from pathlib import Path
from telethon import TelegramClient
from telethon.errors import (
FloodWaitError,
ChannelPrivateError,
UsernameNotOccupiedError,
UsernameInvalidError,
)
logger = logging.getLogger(__name__)
SESSION_DIR = Path("/data")
@dataclass
class Message:
channel: str
channel_title: str
text: str
date: datetime
views: int = 0
url: str = ""
@dataclass
class CollectorResult:
messages: list[Message] = field(default_factory=list)
errors: dict[str, str] = field(default_factory=dict)
class MessageCollector:
def __init__(self, config: dict):
tg = config["telegram"]
session_path = SESSION_DIR / tg["session_name"]
self.client = TelegramClient(
str(session_path), tg["api_id"], tg["api_hash"]
)
self.channels: list[str] = tg["channels"]
self.hours_back: int = tg.get("hours_back", 12)
self.max_per_channel: int = tg.get("max_messages_per_channel", 50)
self.delay: float = tg.get("delay_between_channels", 2)
self.min_length: int = tg.get("min_message_length", 50)
async def auth(self):
"""Интерактивная авторизация (первый запуск)."""
await self.client.start()
me = await self.client.get_me()
logger.info(f"Авторизован как: {me.first_name} ({me.phone})")
await self.client.disconnect()
async def collect(self) -> CollectorResult:
"""Собрать сообщения из всех каналов."""
result = CollectorResult()
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.hours_back)
await self.client.connect()
if not await self.client.is_user_authorized():
raise RuntimeError(
"Сессия не авторизована. Запусти:\n"
"docker compose run --rm app python main.py --auth"
)
for channel in self.channels:
try:
messages = await self._collect_channel(channel, cutoff)
result.messages.extend(messages)
logger.info(f"{channel}: {len(messages)} сообщений")
except ChannelPrivateError:
err = "Канал приватный или вы не подписаны"
result.errors[channel] = err
logger.warning(f"{channel}: {err}")
except (UsernameNotOccupiedError, UsernameInvalidError):
err = "Канал не найден"
result.errors[channel] = err
logger.warning(f"{channel}: {err}")
except FloodWaitError as e:
logger.warning(f" ⏳ FloodWait: ждём {e.seconds}с")
await asyncio.sleep(e.seconds)
try:
messages = await self._collect_channel(channel, cutoff)
result.messages.extend(messages)
except Exception as retry_err:
result.errors[channel] = str(retry_err)
except Exception as e:
result.errors[channel] = str(e)
logger.warning(f"{channel}: {e}")
await asyncio.sleep(self.delay)
await self.client.disconnect()
result.messages.sort(key=lambda m: m.date, reverse=True)
logger.info(
f"Итого: {len(result.messages)} сообщений из "
f"{len(self.channels) - len(result.errors)} каналов"
)
return result
async def _collect_channel(
self, channel: str, cutoff: datetime
) -> list[Message]:
entity = await self.client.get_entity(channel)
channel_title = getattr(entity, "title", channel)
messages = []
async for msg in self.client.iter_messages(
entity, limit=self.max_per_channel
):
if msg.date < cutoff:
break
if not msg.text or len(msg.text.strip()) < self.min_length:
continue
username = getattr(entity, "username", None)
url = f"https://t.me/{username}/{msg.id}" if username else ""
messages.append(
Message(
channel=channel,
channel_title=channel_title,
text=msg.text.strip(),
date=msg.date,
views=msg.views or 0,
url=url,
)
)
return messages