from datetime import UTC, datetime from math import ceil from aiogram import Bot, F, Router from aiogram.enums import ButtonStyle from aiogram.filters import Command from aiogram.fsm.context import FSMContext from aiogram.fsm.state import State, StatesGroup from aiogram.types import ( CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, KeyboardButtonRequestUsers, Message, ReplyKeyboardMarkup, ReplyKeyboardRemove, ) from pydantic import BaseModel from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql.functions import count from libs.fsm import edit_data, get_data, set_data from libs.user import load_user_cache from models import Payment, User, UserRole from models.callback_data import ( UserAddClb, UserDeleteClb, UserDeleteConfirmClb, UserItemClb, UserPageClb, UserRoleClb, UserRoleSetClb, UserVpnLinkClb, ) router = Router(name="users") PAGE_SIZE = 5 ROLE_ICON = { UserRole.ADMIN: "👑", UserRole.REGULAR: "👤", } ROLE_NAME = { UserRole.ADMIN: "Администратор", UserRole.REGULAR: "Пользователь", } LIST_TEXT = "Список пользователей:" ADD_CANCEL_BUTTON = "Отменить добавление" EDIT_CANCEL_BUTTON = "Отменить изменение" class AddUserStates(StatesGroup): user_id = State() vpn_link = State() class AddUserData(BaseModel): user_id: int | None = None class EditVpnLinkStates(StatesGroup): vpn_link = State() class EditVpnLinkData(BaseModel): page: int user_id: int async def get_list_markup( page: int, bot: Bot, session: AsyncSession, ) -> InlineKeyboardMarkup: total = await session.scalar(select(count()).select_from(User)) assert total is not None total_pages = max(1, ceil(total / PAGE_SIZE)) page = max(0, min(page, total_pages - 1)) users = await session.scalars( select(User).offset(PAGE_SIZE * page).limit(PAGE_SIZE).order_by(User.id.desc()) ) user_buttons = [] for u in users: user_cache = await load_user_cache(bot, u.id) user_buttons.append( [ InlineKeyboardButton( text=f"{ROLE_ICON[u.role]} {user_cache.full_name}", callback_data=UserItemClb(page=page, user_id=u.id).pack(), ) ] ) page_buttons = [] if page > 0: page_buttons.append( InlineKeyboardButton( text="◀️", callback_data=UserPageClb(page=page - 1).pack(), ) ) if page < total_pages - 1: page_buttons.append( InlineKeyboardButton( text="▶️", callback_data=UserPageClb(page=page + 1).pack(), ) ) add_row = [ InlineKeyboardButton( text="➕ Добавить пользователя", callback_data=UserAddClb().pack(), ) ] return InlineKeyboardMarkup(inline_keyboard=[*user_buttons, page_buttons, add_row]) def build_item_markup(user_id: int, page: int) -> InlineKeyboardMarkup: return InlineKeyboardMarkup( inline_keyboard=[ [ InlineKeyboardButton( text="Поменять роль", callback_data=UserRoleClb(page=page, user_id=user_id).pack(), ), ], [ InlineKeyboardButton( text="Изменить VPN ссылку", callback_data=UserVpnLinkClb(page=page, user_id=user_id).pack(), ), ], [ InlineKeyboardButton( text="Удалить пользователя", style=ButtonStyle.DANGER, callback_data=UserDeleteClb(page=page, user_id=user_id).pack(), ), ], [ InlineKeyboardButton( text="Назад к списку", callback_data=UserPageClb(page=page).pack(), ), ], ] ) def build_role_markup(user_id: int, page: int) -> InlineKeyboardMarkup: return InlineKeyboardMarkup( inline_keyboard=[ [ InlineKeyboardButton( text=f"{ROLE_ICON[UserRole.REGULAR]} {ROLE_NAME[UserRole.REGULAR]}", callback_data=UserRoleSetClb( page=page, user_id=user_id, role=UserRole.REGULAR, ).pack(), ), ], [ InlineKeyboardButton( text=f"{ROLE_ICON[UserRole.ADMIN]} {ROLE_NAME[UserRole.ADMIN]}", callback_data=UserRoleSetClb( page=page, user_id=user_id, role=UserRole.ADMIN, ).pack(), ), ], [ InlineKeyboardButton( text="Назад", callback_data=UserItemClb(page=page, user_id=user_id).pack(), ), ], ] ) def build_delete_markup(user_id: int, page: int) -> InlineKeyboardMarkup: return InlineKeyboardMarkup( inline_keyboard=[ [ InlineKeyboardButton( text="❌ Удалить", style=ButtonStyle.DANGER, callback_data=UserDeleteConfirmClb( page=page, user_id=user_id, ).pack(), ), ], [ InlineKeyboardButton( text="Назад", callback_data=UserItemClb(page=page, user_id=user_id).pack(), ), ], ] ) async def get_item_text(bot: Bot, target: User) -> str: user_cache = await load_user_cache(bot, target.id) return ( f"Пользователь: {user_cache.mention}\n" f"Роль: {ROLE_NAME[target.role]}\n" f"VPN ссылка: {target.vpn_link}" ) @router.message(Command("users")) async def command(msg: Message, bot: Bot, session: AsyncSession) -> None: await msg.answer( LIST_TEXT, reply_markup=await get_list_markup(0, bot, session), ) @router.callback_query(UserPageClb.filter()) async def page( clb: CallbackQuery, bot: Bot, callback_data: UserPageClb, session: AsyncSession, ) -> None: assert isinstance(clb.message, Message) await clb.message.edit_text( LIST_TEXT, reply_markup=await get_list_markup(callback_data.page, bot, session), ) await clb.answer() @router.callback_query(UserItemClb.filter()) async def item( clb: CallbackQuery, bot: Bot, callback_data: UserItemClb, session: AsyncSession, ) -> None: assert isinstance(clb.message, Message) target = await session.get(User, callback_data.user_id) if target is None: await clb.answer("Пользователь не найден.", show_alert=True) return await clb.message.edit_text( await get_item_text(bot, target), reply_markup=build_item_markup(target.id, callback_data.page), ) await clb.answer() @router.callback_query(UserRoleClb.filter()) async def role_menu( clb: CallbackQuery, callback_data: UserRoleClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) if user.id == callback_data.user_id: await clb.answer("Нельзя поменять свою роль.", show_alert=True) return target = await session.get(User, callback_data.user_id) if target is None: await clb.answer("Пользователь не найден.", show_alert=True) return await clb.message.edit_text( f"Выберете новую роль для пользователя (текущая: {ROLE_NAME[target.role]}):", reply_markup=build_role_markup(target.id, callback_data.page), ) await clb.answer() @router.callback_query(UserRoleSetClb.filter()) async def role_set( clb: CallbackQuery, bot: Bot, callback_data: UserRoleSetClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) if user.id == callback_data.user_id: await clb.answer("Нельзя поменять свою роль.", show_alert=True) return target = await session.get(User, callback_data.user_id) if target is None: await clb.answer("Пользователь не найден.", show_alert=True) return if target.role != callback_data.role: target.role = callback_data.role await session.flush() await clb.message.edit_text( await get_item_text(bot, target), reply_markup=build_item_markup(target.id, callback_data.page), ) await clb.answer(f"Роль: {ROLE_NAME[target.role]}.") @router.callback_query(UserDeleteClb.filter()) async def delete_menu( clb: CallbackQuery, bot: Bot, callback_data: UserDeleteClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) if user.id == callback_data.user_id: await clb.answer("Нельзя удалить себя.", show_alert=True) return target = await session.get(User, callback_data.user_id) if target is None: await clb.answer("Пользователь не найден.", show_alert=True) return user_cache = await load_user_cache(bot, target.id) await clb.message.edit_text( f"Удалить пользователя {user_cache.mention}?", reply_markup=build_delete_markup(target.id, callback_data.page), ) await clb.answer() @router.callback_query(UserDeleteConfirmClb.filter()) async def delete_confirm( clb: CallbackQuery, bot: Bot, callback_data: UserDeleteConfirmClb, session: AsyncSession, user: User, ) -> None: assert isinstance(clb.message, Message) if user.id == callback_data.user_id: await clb.answer("Нельзя удалить себя.", show_alert=True) return target = await session.get(User, callback_data.user_id) if target is None: await clb.answer("Пользователь не найден.", show_alert=True) return await session.execute(delete(Payment).where(Payment.user_id == target.id)) await session.delete(target) await session.flush() await clb.message.edit_text( LIST_TEXT, reply_markup=await get_list_markup(callback_data.page, bot, session), ) await clb.answer("Пользователь удалён.") @router.callback_query(UserVpnLinkClb.filter()) async def vpn_link_button( clb: CallbackQuery, bot: Bot, state: FSMContext, callback_data: UserVpnLinkClb, session: AsyncSession, ) -> None: assert isinstance(clb.message, Message) target = await session.get(User, callback_data.user_id) if target is None: await clb.answer("Пользователь не найден.", show_alert=True) return await clb.message.delete() await bot.send_message( clb.from_user.id, "Укажите новую ссылку для доступа к VPN.", reply_markup=ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text=EDIT_CANCEL_BUTTON, style=ButtonStyle.DANGER)] ], resize_keyboard=True, ), ) await set_data( state, EditVpnLinkData(page=callback_data.page, user_id=callback_data.user_id), ) await state.set_state(EditVpnLinkStates.vpn_link) await clb.answer() async def send_item(bot: Bot, chat_id: int, target: User, page: int) -> None: await bot.send_message( chat_id, await get_item_text(bot, target), reply_markup=build_item_markup(target.id, page), ) @router.message(EditVpnLinkStates.vpn_link, F.text == EDIT_CANCEL_BUTTON) async def vpn_link_cancel( msg: Message, bot: Bot, state: FSMContext, session: AsyncSession, ) -> None: data = await get_data(state, EditVpnLinkData) await state.clear() await msg.answer( "Изменение VPN ссылки отменено.", reply_markup=ReplyKeyboardRemove(), ) target = await session.get(User, data.user_id) if target is None: return await send_item(bot, msg.chat.id, target, data.page) @router.message(EditVpnLinkStates.vpn_link) async def vpn_link_set( msg: Message, bot: Bot, state: FSMContext, session: AsyncSession, ) -> None: if msg.text is None: await msg.answer("Вы должны указать ссылку отправив текстовое сообщение.") return data = await get_data(state, EditVpnLinkData) target = await session.get(User, data.user_id) if target is None: await msg.answer( "Пользователь не найден.", reply_markup=ReplyKeyboardRemove(), ) await state.clear() return target.vpn_link = msg.text await session.flush() await state.clear() await msg.answer("VPN ссылка обновлена.", reply_markup=ReplyKeyboardRemove()) await send_item(bot, msg.chat.id, target, data.page) @router.callback_query(UserAddClb.filter()) async def add_button( clb: CallbackQuery, bot: Bot, state: FSMContext, ) -> None: assert isinstance(clb.message, Message) await clb.message.delete() await bot.send_message( clb.from_user.id, "Выберете пользователя которого хотите добавить.", reply_markup=ReplyKeyboardMarkup( keyboard=[ [ KeyboardButton( text="Выбрать пользователя", style=ButtonStyle.PRIMARY, request_users=KeyboardButtonRequestUsers(request_id=0), ), ], [KeyboardButton(text=ADD_CANCEL_BUTTON, style=ButtonStyle.DANGER)], ], resize_keyboard=True, ), ) await set_data(state, AddUserData(user_id=None)) await state.set_state(AddUserStates.user_id) await clb.answer() @router.message(AddUserStates(), F.text == ADD_CANCEL_BUTTON) async def add_cancel( msg: Message, bot: Bot, state: FSMContext, session: AsyncSession, ) -> None: await msg.answer( "Добавление пользователя отменено.", reply_markup=ReplyKeyboardRemove(), ) await msg.answer( LIST_TEXT, reply_markup=await get_list_markup(0, bot, session), ) await state.clear() @router.message(AddUserStates.user_id) async def add_set_user_id(msg: Message, state: FSMContext) -> None: if msg.users_shared is None: await msg.answer("Вы должны воспользоваться кнопкой ниже.") return async with edit_data(state, AddUserData) as data: data.user_id = msg.users_shared.users[0].user_id await msg.answer( "Укажите ссылку для доступа к VPN.", reply_markup=ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text=ADD_CANCEL_BUTTON, style=ButtonStyle.DANGER)] ], resize_keyboard=True, ), ) await state.set_state(AddUserStates.vpn_link) @router.message(AddUserStates.vpn_link) async def add_set_vpn_link( msg: Message, bot: Bot, state: FSMContext, session: AsyncSession, ) -> None: if msg.text is None: await msg.answer("Вы должны указать ссылку отправив текстовое сообщение.") return data = await get_data(state, AddUserData) assert data.user_id is not None session.add(User(id=data.user_id, vpn_link=msg.text, datetime=datetime.now(UTC))) await session.flush() await msg.answer("Пользователь добавлен.", reply_markup=ReplyKeyboardRemove()) target = await session.get(User, data.user_id) if target is None: return await send_item(bot, msg.chat.id, target, 0) await state.clear()