import asyncio from typing import AsyncGenerator, Iterable from aiogram import Bot from aiogram.enums import ButtonStyle from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup from models import RichText from models.callback_data import PayInvoiceClb def eclipse_text(text: str, size: int, eclipses: str = "...") -> str: if len(text) <= size: return text return f"{text[: size - len(eclipses)]}{eclipses}" async def publish_announcement( bot: Bot, users: Iterable[int], rich_text: RichText, ) -> AsyncGenerator[int]: for n, user_id in enumerate(users, start=1): for _ in range(5): try: await rich_text.send(bot, user_id) break except TelegramRetryAfter as e: await asyncio.sleep(e.retry_after + 1) except TelegramAPIError: await asyncio.sleep(0.2) yield n await asyncio.sleep(0.2) async def send_invoice( bot: Bot, users: Iterable[int], rich_text: RichText, invoice_id: int, ) -> AsyncGenerator[int]: callback_data = PayInvoiceClb(invoice_id=invoice_id).pack() reply_markup = InlineKeyboardMarkup( inline_keyboard=[ [ InlineKeyboardButton( text="Оплатить", style=ButtonStyle.PRIMARY, callback_data=callback_data, ) ] ] ) for n, user_id in enumerate(users, start=1): for _ in range(5): try: await rich_text.send(bot, user_id, reply_markup=reply_markup) break except TelegramRetryAfter as e: await asyncio.sleep(e.retry_after + 1) except TelegramAPIError: await asyncio.sleep(0.2) yield n await asyncio.sleep(0.2)