1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import asyncio
from typing import AsyncGenerator, Iterable
from aiogram import Bot
from aiogram.enums import ButtonStyle
from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from models import RichText
from models.callback_data import PayInvoiceData
async def publish_announcement(
bot: Bot,
users: Iterable[int],
rich_text: RichText,
) -> AsyncGenerator[int]:
for n, user_id in enumerate(users, start=1):
for _ in range(5):
try:
await rich_text.send(bot, user_id)
break
except TelegramRetryAfter as e:
await asyncio.sleep(e.retry_after + 1)
except TelegramAPIError:
await asyncio.sleep(5)
yield n
async def send_invoice(
bot: Bot,
users: Iterable[int],
rich_text: RichText,
invoice_id: int,
) -> AsyncGenerator[int]:
callback_data = PayInvoiceData(invoice_id=invoice_id).pack()
reply_markup = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="Оплатить",
style=ButtonStyle.PRIMARY,
callback_data=callback_data,
)
]
]
)
for n, user_id in enumerate(users, start=1):
for _ in range(5):
try:
await rich_text.send(bot, user_id, reply_markup=reply_markup)
break
except TelegramRetryAfter as e:
await asyncio.sleep(e.retry_after + 1)
except TelegramAPIError:
await asyncio.sleep(5)
yield n
|