from math import ceil from aiogram import Bot, Router from aiogram.enums import ButtonStyle from aiogram.filters import Command from aiogram.types import ( CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, ) from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql.functions import count from libs.invoice import get_payment_status from libs.msg import eclipse_text from libs.user import load_user_cache from models import Invoice, Payment, PaymentStatus, User from models.callback_data import ( PayInvoiceClb, PaymentItemClb, PaymentPageClb, PaymentStatusClb, ) router = Router(name="payments") PAGE_SIZE = 5 PAYMENT_STATUS = { PaymentStatus.PENDING: "🟡", PaymentStatus.ACCEPTED: "🟢", PaymentStatus.REJECTED: "🔴", } def get_text(user: User) -> str: if user.is_admin(): return "Выберете платёж для управления:" else: return "Выберете счёт для оплаты:" def get_no_data_text(user: User) -> str: if user.is_admin(): return "Нету платежей." else: return "Нету счетов для оплаты." def build_markup( page: int, total_pages: int, item_buttons: list[list[InlineKeyboardButton]], ) -> InlineKeyboardMarkup: page_buttons = [] if page > 0: page_buttons.append( InlineKeyboardButton( text="◀️", callback_data=PaymentPageClb(page=page - 1).pack(), ) ) if page < total_pages - 1: page_buttons.append( InlineKeyboardButton( text="▶️", callback_data=PaymentPageClb(page=page + 1).pack(), ) ) return InlineKeyboardMarkup(inline_keyboard=[*item_buttons, page_buttons]) async def get_user_reply_markup( page: int, user: User, session: AsyncSession, ) -> InlineKeyboardMarkup | None: total = await session.scalar( select(count()).select_from(Invoice).where(Invoice.datetime >= user.datetime) ) assert total is not None total_pages = ceil(total / PAGE_SIZE) if total == 0: return None page = max(0, min(page, total_pages - 1)) invoices = await session.scalars( select(Invoice) .where(Invoice.datetime >= user.datetime) .offset(PAGE_SIZE * page) .limit(PAGE_SIZE) .order_by(Invoice.id.desc()) ) invoice_buttons = [] for i in invoices: status = PAYMENT_STATUS[await get_payment_status(session, i.id, user.id)] button = InlineKeyboardButton( text=( f"{status} " f"{eclipse_text(i.message.text, 10)} " f"({i.datetime.strftime('%d %b %y г.')})" ), callback_data=PayInvoiceClb(invoice_id=i.id).pack(), ) invoice_buttons.append([button]) return build_markup(page, total_pages, invoice_buttons) async def get_admin_reply_markup( page: int, bot: Bot, session: AsyncSession, ) -> InlineKeyboardMarkup | None: total = await session.scalar(select(count()).select_from(Payment)) assert total is not None total_pages = ceil(total / PAGE_SIZE) if total == 0: return None page = max(0, min(page, total_pages - 1)) payments = await session.scalars( select(Payment) .offset(PAGE_SIZE * page) .limit(PAGE_SIZE) .order_by(Payment.id.desc()) ) payment_buttons = [] for p in payments: invoice = await session.get(Invoice, p.invoice_id) assert invoice is not None chat = await bot.get_chat(p.user_id) button = InlineKeyboardButton( text=( f"{PAYMENT_STATUS[p.status]} " f"{chat.full_name} - " f"{eclipse_text(invoice.message.text, 10)} " f"({p.datetime.strftime('%d %b %y г.')})" ), callback_data=PaymentItemClb(page=page, payment_id=p.id).pack(), ) payment_buttons.append([button]) return build_markup(page, total_pages, payment_buttons) async def get_reply_markup( page: int, bot: Bot, user: User, session: AsyncSession, ) -> InlineKeyboardMarkup | None: if user.is_admin(): return await get_admin_reply_markup(page, bot, session) else: return await get_user_reply_markup(page, user, session) @router.message(Command("payments")) async def command(msg: Message, bot: Bot, session: AsyncSession, user: User) -> None: reply_markup = await get_reply_markup(0, bot, user, session) if reply_markup is None: await msg.answer(get_no_data_text(user)) return await msg.answer(get_text(user), reply_markup=reply_markup) @router.callback_query(PaymentPageClb.filter()) async def page( clb: CallbackQuery, bot: Bot, callback_data: PaymentPageClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) text = get_text(user) reply_markup = await get_reply_markup(callback_data.page, bot, user, session) if clb.message.photo is not None or clb.message.document is not None: await clb.message.delete() await bot.send_message(clb.from_user.id, text, reply_markup=reply_markup) else: await clb.message.edit_text(text, reply_markup=reply_markup) await clb.answer() @router.callback_query(PaymentItemClb.filter()) async def item( clb: CallbackQuery, bot: Bot, callback_data: PaymentItemClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) if not user.is_admin(): await clb.answer("У вас нет прав для данного действия.", show_alert=True) return payment = await session.get(Payment, callback_data.payment_id) if payment is None: await clb.answer("Платёж был удален.", show_alert=True) return invoice = await session.get(Invoice, payment.invoice_id) assert invoice is not None user_cache = await load_user_cache(bot, payment.user_id) status_buttons = [] if payment.status != PaymentStatus.ACCEPTED: status_buttons.append( InlineKeyboardButton( text="Подтвердить", style=ButtonStyle.SUCCESS, callback_data=PaymentStatusClb( payment_id=payment.id, payment_status=PaymentStatus.ACCEPTED, ).pack(), ) ) if payment.status != PaymentStatus.REJECTED: status_buttons.append( InlineKeyboardButton( text="Отклонить", style=ButtonStyle.DANGER, callback_data=PaymentStatusClb( payment_id=payment.id, payment_status=PaymentStatus.REJECTED, ).pack(), ) ) back_button = InlineKeyboardButton( text="Назад к выбору", callback_data=PaymentPageClb(page=callback_data.page).pack(), ) reply_markup = InlineKeyboardMarkup(inline_keyboard=[status_buttons, [back_button]]) caption = ( f"Платёж от {user_cache.mention}\n" f"Счёт: {eclipse_text(invoice.message.text, 30)}\n" f"Дата: {payment.datetime.strftime('%d %b %y г.')}\n" f"Статус: {PAYMENT_STATUS[payment.status]}" ) await clb.message.delete() await payment.receipt_file.send( bot, clb.from_user.id, caption=caption, reply_markup=reply_markup, ) await clb.answer()