From 75e99ca0712a2c09230e5c6f8d093dc526cc717d Mon Sep 17 00:00:00 2001 From: Tolmachev Igor Date: Mon, 20 Apr 2026 20:56:35 +0300 Subject: Add users command --- handlers/admin/users.py | 557 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 557 insertions(+) create mode 100644 handlers/admin/users.py (limited to 'handlers/admin/users.py') diff --git a/handlers/admin/users.py b/handlers/admin/users.py new file mode 100644 index 0000000..88dfd89 --- /dev/null +++ b/handlers/admin/users.py @@ -0,0 +1,557 @@ +from datetime import UTC, datetime +from math import ceil + +from aiogram import Bot, F, Router +from aiogram.enums import ButtonStyle +from aiogram.filters import Command +from aiogram.fsm.context import FSMContext +from aiogram.fsm.state import State, StatesGroup +from aiogram.types import ( + CallbackQuery, + InlineKeyboardButton, + InlineKeyboardMarkup, + KeyboardButton, + KeyboardButtonRequestUsers, + Message, + ReplyKeyboardMarkup, + ReplyKeyboardRemove, +) +from pydantic import BaseModel +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.sql.functions import count + +from libs.fsm import edit_data, get_data, set_data +from libs.user import load_user_cache +from models import Payment, User, UserRole +from models.callback_data import ( + UserAddClb, + UserDeleteClb, + UserDeleteConfirmClb, + UserItemClb, + UserPageClb, + UserRoleClb, + UserRoleSetClb, + UserVpnLinkClb, +) + +router = Router(name="users") +PAGE_SIZE = 5 +ROLE_ICON = { + UserRole.ADMIN: "👑", + UserRole.REGULAR: "👤", +} +ROLE_NAME = { + UserRole.ADMIN: "Администратор", + UserRole.REGULAR: "Пользователь", +} + +LIST_TEXT = "Список пользователей:" +ADD_CANCEL_BUTTON = "Отменить добавление" +EDIT_CANCEL_BUTTON = "Отменить изменение" + + +class AddUserStates(StatesGroup): + user_id = State() + vpn_link = State() + + +class AddUserData(BaseModel): + user_id: int | None = None + + +class EditVpnLinkStates(StatesGroup): + vpn_link = State() + + +class EditVpnLinkData(BaseModel): + page: int + user_id: int + + +async def get_list_markup( + page: int, + bot: Bot, + session: AsyncSession, +) -> InlineKeyboardMarkup: + total = await session.scalar(select(count()).select_from(User)) + assert total is not None + total_pages = max(1, ceil(total / PAGE_SIZE)) + page = max(0, min(page, total_pages - 1)) + + users = await session.scalars( + select(User).offset(PAGE_SIZE * page).limit(PAGE_SIZE).order_by(User.id.desc()) + ) + + user_buttons = [] + for u in users: + user_cache = await load_user_cache(bot, u.id) + user_buttons.append( + [ + InlineKeyboardButton( + text=f"{ROLE_ICON[u.role]} {user_cache.full_name}", + callback_data=UserItemClb(page=page, user_id=u.id).pack(), + ) + ] + ) + + page_buttons = [] + if page > 0: + page_buttons.append( + InlineKeyboardButton( + text="◀️", + callback_data=UserPageClb(page=page - 1).pack(), + ) + ) + if page < total_pages - 1: + page_buttons.append( + InlineKeyboardButton( + text="▶️", + callback_data=UserPageClb(page=page + 1).pack(), + ) + ) + + add_row = [ + InlineKeyboardButton( + text="➕ Добавить пользователя", + callback_data=UserAddClb().pack(), + ) + ] + + return InlineKeyboardMarkup(inline_keyboard=[*user_buttons, page_buttons, add_row]) + + +def build_item_markup(user_id: int, page: int) -> InlineKeyboardMarkup: + return InlineKeyboardMarkup( + inline_keyboard=[ + [ + InlineKeyboardButton( + text="Поменять роль", + callback_data=UserRoleClb(page=page, user_id=user_id).pack(), + ), + ], + [ + InlineKeyboardButton( + text="Изменить VPN ссылку", + callback_data=UserVpnLinkClb(page=page, user_id=user_id).pack(), + ), + ], + [ + InlineKeyboardButton( + text="Удалить пользователя", + style=ButtonStyle.DANGER, + callback_data=UserDeleteClb(page=page, user_id=user_id).pack(), + ), + ], + [ + InlineKeyboardButton( + text="Назад к списку", + callback_data=UserPageClb(page=page).pack(), + ), + ], + ] + ) + + +def build_role_markup(user_id: int, page: int) -> InlineKeyboardMarkup: + return InlineKeyboardMarkup( + inline_keyboard=[ + [ + InlineKeyboardButton( + text=f"{ROLE_ICON[UserRole.REGULAR]} {ROLE_NAME[UserRole.REGULAR]}", + callback_data=UserRoleSetClb( + page=page, + user_id=user_id, + role=UserRole.REGULAR, + ).pack(), + ), + ], + [ + InlineKeyboardButton( + text=f"{ROLE_ICON[UserRole.ADMIN]} {ROLE_NAME[UserRole.ADMIN]}", + callback_data=UserRoleSetClb( + page=page, + user_id=user_id, + role=UserRole.ADMIN, + ).pack(), + ), + ], + [ + InlineKeyboardButton( + text="Назад", + callback_data=UserItemClb(page=page, user_id=user_id).pack(), + ), + ], + ] + ) + + +def build_delete_markup(user_id: int, page: int) -> InlineKeyboardMarkup: + return InlineKeyboardMarkup( + inline_keyboard=[ + [ + InlineKeyboardButton( + text="❌ Удалить", + style=ButtonStyle.DANGER, + callback_data=UserDeleteConfirmClb( + page=page, + user_id=user_id, + ).pack(), + ), + ], + [ + InlineKeyboardButton( + text="Назад", + callback_data=UserItemClb(page=page, user_id=user_id).pack(), + ), + ], + ] + ) + + +async def get_item_text(bot: Bot, target: User) -> str: + user_cache = await load_user_cache(bot, target.id) + return ( + f"Пользователь: {user_cache.mention}\n" + f"Роль: {ROLE_NAME[target.role]}\n" + f"VPN ссылка: {target.vpn_link}" + ) + + +@router.message(Command("users")) +async def command(msg: Message, bot: Bot, session: AsyncSession) -> None: + await msg.answer( + LIST_TEXT, + reply_markup=await get_list_markup(0, bot, session), + ) + + +@router.callback_query(UserPageClb.filter()) +async def page( + clb: CallbackQuery, + bot: Bot, + callback_data: UserPageClb, + session: AsyncSession, +) -> None: + assert isinstance(clb.message, Message) + await clb.message.edit_text( + LIST_TEXT, + reply_markup=await get_list_markup(callback_data.page, bot, session), + ) + await clb.answer() + + +@router.callback_query(UserItemClb.filter()) +async def item( + clb: CallbackQuery, + bot: Bot, + callback_data: UserItemClb, + session: AsyncSession, +) -> None: + assert isinstance(clb.message, Message) + target = await session.get(User, callback_data.user_id) + if target is None: + await clb.answer("Пользователь не найден.", show_alert=True) + return + + await clb.message.edit_text( + await get_item_text(bot, target), + reply_markup=build_item_markup(target.id, callback_data.page), + ) + await clb.answer() + + +@router.callback_query(UserRoleClb.filter()) +async def role_menu( + clb: CallbackQuery, + callback_data: UserRoleClb, + session: AsyncSession, + user: User, +) -> None: + assert isinstance(clb.message, Message) + if user.id == callback_data.user_id: + await clb.answer("Нельзя поменять свою роль.", show_alert=True) + return + + target = await session.get(User, callback_data.user_id) + if target is None: + await clb.answer("Пользователь не найден.", show_alert=True) + return + + await clb.message.edit_text( + f"Выберете новую роль для пользователя (текущая: {ROLE_NAME[target.role]}):", + reply_markup=build_role_markup(target.id, callback_data.page), + ) + await clb.answer() + + +@router.callback_query(UserRoleSetClb.filter()) +async def role_set( + clb: CallbackQuery, + bot: Bot, + callback_data: UserRoleSetClb, + session: AsyncSession, + user: User, +) -> None: + assert isinstance(clb.message, Message) + if user.id == callback_data.user_id: + await clb.answer("Нельзя поменять свою роль.", show_alert=True) + return + + target = await session.get(User, callback_data.user_id) + if target is None: + await clb.answer("Пользователь не найден.", show_alert=True) + return + + if target.role != callback_data.role: + target.role = callback_data.role + await session.flush() + + await clb.message.edit_text( + await get_item_text(bot, target), + reply_markup=build_item_markup(target.id, callback_data.page), + ) + await clb.answer(f"Роль: {ROLE_NAME[target.role]}.") + + +@router.callback_query(UserDeleteClb.filter()) +async def delete_menu( + clb: CallbackQuery, + bot: Bot, + callback_data: UserDeleteClb, + session: AsyncSession, + user: User, +) -> None: + assert isinstance(clb.message, Message) + if user.id == callback_data.user_id: + await clb.answer("Нельзя удалить себя.", show_alert=True) + return + + target = await session.get(User, callback_data.user_id) + if target is None: + await clb.answer("Пользователь не найден.", show_alert=True) + return + + user_cache = await load_user_cache(bot, target.id) + await clb.message.edit_text( + f"Удалить пользователя {user_cache.mention}?", + reply_markup=build_delete_markup(target.id, callback_data.page), + ) + await clb.answer() + + +@router.callback_query(UserDeleteConfirmClb.filter()) +async def delete_confirm( + clb: CallbackQuery, + bot: Bot, + callback_data: UserDeleteConfirmClb, + session: AsyncSession, + user: User, +) -> None: + assert isinstance(clb.message, Message) + if user.id == callback_data.user_id: + await clb.answer("Нельзя удалить себя.", show_alert=True) + return + + target = await session.get(User, callback_data.user_id) + if target is None: + await clb.answer("Пользователь не найден.", show_alert=True) + return + + await session.execute(delete(Payment).where(Payment.user_id == target.id)) + await session.delete(target) + await session.flush() + + await clb.message.edit_text( + LIST_TEXT, + reply_markup=await get_list_markup(callback_data.page, bot, session), + ) + await clb.answer("Пользователь удалён.") + + +@router.callback_query(UserVpnLinkClb.filter()) +async def vpn_link_button( + clb: CallbackQuery, + bot: Bot, + state: FSMContext, + callback_data: UserVpnLinkClb, + session: AsyncSession, +) -> None: + assert isinstance(clb.message, Message) + target = await session.get(User, callback_data.user_id) + if target is None: + await clb.answer("Пользователь не найден.", show_alert=True) + return + + await clb.message.delete() + await bot.send_message( + clb.from_user.id, + "Укажите новую ссылку для доступа к VPN.", + reply_markup=ReplyKeyboardMarkup( + keyboard=[ + [KeyboardButton(text=EDIT_CANCEL_BUTTON, style=ButtonStyle.DANGER)] + ], + resize_keyboard=True, + ), + ) + await set_data( + state, + EditVpnLinkData(page=callback_data.page, user_id=callback_data.user_id), + ) + await state.set_state(EditVpnLinkStates.vpn_link) + await clb.answer() + + +async def send_item(bot: Bot, chat_id: int, target: User, page: int) -> None: + await bot.send_message( + chat_id, + await get_item_text(bot, target), + reply_markup=build_item_markup(target.id, page), + ) + + +@router.message(EditVpnLinkStates.vpn_link, F.text == EDIT_CANCEL_BUTTON) +async def vpn_link_cancel( + msg: Message, + bot: Bot, + state: FSMContext, + session: AsyncSession, +) -> None: + data = await get_data(state, EditVpnLinkData) + await state.clear() + + await msg.answer( + "Изменение VPN ссылки отменено.", + reply_markup=ReplyKeyboardRemove(), + ) + + target = await session.get(User, data.user_id) + if target is None: + return + await send_item(bot, msg.chat.id, target, data.page) + + +@router.message(EditVpnLinkStates.vpn_link) +async def vpn_link_set( + msg: Message, + bot: Bot, + state: FSMContext, + session: AsyncSession, +) -> None: + if msg.text is None: + await msg.answer("Вы должны указать ссылку отправив текстовое сообщение.") + return + + data = await get_data(state, EditVpnLinkData) + target = await session.get(User, data.user_id) + if target is None: + await msg.answer( + "Пользователь не найден.", + reply_markup=ReplyKeyboardRemove(), + ) + await state.clear() + return + + target.vpn_link = msg.text + await session.flush() + await state.clear() + + await msg.answer("VPN ссылка обновлена.", reply_markup=ReplyKeyboardRemove()) + await send_item(bot, msg.chat.id, target, data.page) + + +@router.callback_query(UserAddClb.filter()) +async def add_button( + clb: CallbackQuery, + bot: Bot, + state: FSMContext, +) -> None: + assert isinstance(clb.message, Message) + await clb.message.delete() + await bot.send_message( + clb.from_user.id, + "Выберете пользователя которого хотите добавить.", + reply_markup=ReplyKeyboardMarkup( + keyboard=[ + [ + KeyboardButton( + text="Выбрать пользователя", + style=ButtonStyle.PRIMARY, + request_users=KeyboardButtonRequestUsers(request_id=0), + ), + ], + [KeyboardButton(text=ADD_CANCEL_BUTTON, style=ButtonStyle.DANGER)], + ], + resize_keyboard=True, + ), + ) + await set_data(state, AddUserData(user_id=None)) + await state.set_state(AddUserStates.user_id) + await clb.answer() + + +@router.message(AddUserStates(), F.text == ADD_CANCEL_BUTTON) +async def add_cancel( + msg: Message, + bot: Bot, + state: FSMContext, + session: AsyncSession, +) -> None: + await msg.answer( + "Добавление пользователя отменено.", + reply_markup=ReplyKeyboardRemove(), + ) + + await msg.answer( + LIST_TEXT, + reply_markup=await get_list_markup(0, bot, session), + ) + await state.clear() + + +@router.message(AddUserStates.user_id) +async def add_set_user_id(msg: Message, state: FSMContext) -> None: + if msg.users_shared is None: + await msg.answer("Вы должны воспользоваться кнопкой ниже.") + return + + async with edit_data(state, AddUserData) as data: + data.user_id = msg.users_shared.users[0].user_id + + await msg.answer( + "Укажите ссылку для доступа к VPN.", + reply_markup=ReplyKeyboardMarkup( + keyboard=[ + [KeyboardButton(text=ADD_CANCEL_BUTTON, style=ButtonStyle.DANGER)] + ], + resize_keyboard=True, + ), + ) + await state.set_state(AddUserStates.vpn_link) + + +@router.message(AddUserStates.vpn_link) +async def add_set_vpn_link( + msg: Message, + bot: Bot, + state: FSMContext, + session: AsyncSession, +) -> None: + if msg.text is None: + await msg.answer("Вы должны указать ссылку отправив текстовое сообщение.") + return + + data = await get_data(state, AddUserData) + assert data.user_id is not None + + session.add(User(id=data.user_id, vpn_link=msg.text, datetime=datetime.now(UTC))) + await session.flush() + + await msg.answer("Пользователь добавлен.", reply_markup=ReplyKeyboardRemove()) + + target = await session.get(User, data.user_id) + if target is None: + return + + await send_item(bot, msg.chat.id, target, 0) + await state.clear() -- cgit v1.3