from math import ceil from aiogram import Bot, Router from aiogram.filters import Command from aiogram.types import ( CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, ) from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql.functions import count from libs.invoice import ( InvoiceStatus, get_invoice_payments, get_payment_status, ) from libs.msg import eclipse_text from libs.user import load_user_cache from models import Invoice, PaymentStatus, User from models.callback_data import InvoiceItemClb, InvoicePageClb, PayInvoiceClb router = Router(name="invoices") PAGE_SIZE = 5 PAYMENT_STATUS = { PaymentStatus.PENDING: "🟡", PaymentStatus.ACCEPTED: "🟢", PaymentStatus.REJECTED: "🔴", } INVOICE_STATUS = { InvoiceStatus.PAID: "🟢", InvoiceStatus.UNPAID: "🔴", } def get_text(user: User) -> str: if user.is_admin(): return "Выберете счёт для просмотра информации:" else: return "Выберете счёт для оплаты:" async def get_reply_markup( page: int, user: User, session: AsyncSession, ) -> InlineKeyboardMarkup | None: total = await session.scalar( select(count()).select_from(Invoice).where(Invoice.datetime >= user.datetime) ) assert total is not None total_pages = ceil(total / PAGE_SIZE) if total == 0: return None page = max(0, min(page, total_pages - 1)) query = ( select(Invoice) .where(Invoice.datetime >= user.datetime) .offset(PAGE_SIZE * page) .limit(PAGE_SIZE) .order_by(Invoice.id.desc()) ) invoices = await session.scalars(query) invoice_buttons = [] for i in invoices: if user.is_admin(): invoice_payments = await get_invoice_payments(session, i) status = INVOICE_STATUS[invoice_payments.status] callback_data = InvoiceItemClb(page=page, invoice_id=i.id).pack() else: status = PAYMENT_STATUS[await get_payment_status(session, i.id, user.id)] callback_data = PayInvoiceClb(invoice_id=i.id).pack() button = InlineKeyboardButton( text=( f"{status} " f"{eclipse_text(i.message.text, 10)} " f"({i.datetime.strftime('%d %b %y г.')})" ), callback_data=callback_data, ) invoice_buttons.append([button]) page_buttons = [] if page > 0: page_buttons.append( InlineKeyboardButton( text="◀️", callback_data=InvoicePageClb(page=page - 1).pack(), ) ) if page < total_pages - 1: page_buttons.append( InlineKeyboardButton( text="▶️", callback_data=InvoicePageClb(page=page + 1).pack(), ) ) return InlineKeyboardMarkup(inline_keyboard=[*invoice_buttons, page_buttons]) @router.message(Command("invoices")) async def command(msg: Message, session: AsyncSession, user: User) -> None: reply_markup = await get_reply_markup(0, user, session) if reply_markup is None: await msg.answer("Нету счетов для оплаты.") return await msg.answer(get_text(user), reply_markup=reply_markup) @router.callback_query(InvoicePageClb.filter()) async def page( clb: CallbackQuery, callback_data: InvoicePageClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) await clb.message.edit_text( get_text(user), reply_markup=await get_reply_markup(callback_data.page, user, session), ) await clb.answer() @router.callback_query(InvoiceItemClb.filter()) async def item( clb: CallbackQuery, bot: Bot, callback_data: InvoiceItemClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) if not user.is_admin(): await clb.answer("У вас нет прав для данного действия.", show_alert=True) return invoice = await session.get(Invoice, callback_data.invoice_id) assert invoice is not None invoice_payments = await get_invoice_payments(session, invoice) text_template = ( f"Статус оплаты счёта: {INVOICE_STATUS[invoice_payments.status]}\n" "Пользователи оплатившие счёт:\n" "{}" ) reply_markup = InlineKeyboardMarkup( inline_keyboard=[ [ InlineKeyboardButton( text="Назад к выбору", callback_data=InvoicePageClb(page=callback_data.page).pack(), ) ] ] ) user_status = [] for user_id, s in invoice_payments.user_status.items(): user_cache = await load_user_cache(bot, user_id) user_status.append(f"{PAYMENT_STATUS[s]} - {user_cache.mention}") await clb.message.edit_text( text_template.format("\n".join(user_status)), reply_markup=reply_markup, disable_web_page_preview=True, ) await clb.answer()