aboutsummaryrefslogtreecommitdiff
path: root/handlers/middleware.py
blob: 376f4a2c8e1ac3c34c6517532581cd5c809913d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from typing import Any, Awaitable, Callable

from aiogram import BaseMiddleware
from aiogram.types import CallbackQuery, Message, TelegramObject
from sqlalchemy.ext.asyncio import AsyncSession

from database import sessions
from libs.user import set_user_cache
from models import User
from models.user import UserCache


class InjectSessionMiddleware(BaseMiddleware):
    async def __call__[T](
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        async with sessions.begin() as session:
            data["session"] = session
            handler_result = await handler(event, data)
        return handler_result


class UserAccessMiddleware(BaseMiddleware):
    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        if not isinstance(event, (Message, CallbackQuery)):
            raise TypeError(
                f"UserAccessMiddleware doesn't support event with type: {type(event).__name__}",
                event,
            )

        if isinstance(event, Message):
            user_id = event.chat.id
        else:
            user_id = event.from_user.id

        session: AsyncSession = data["session"]
        user = await session.get(User, user_id)
        if user is None:
            error_text = "Вы не добавлены в список пользователей VPN"

            if isinstance(event, Message):
                await event.answer(error_text)
            else:
                await event.answer(error_text)

            return

        data["user"] = user

        return await handler(event, data)


class UserCacheMiddleware(BaseMiddleware):
    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        if not isinstance(event, (Message, CallbackQuery)):
            raise TypeError(
                f"UserAccessMiddleware doesn't support event with type: {type(event).__name__}",
                event,
            )

        if isinstance(event, Message):
            user_cache = UserCache.from_chat(event.chat)
        else:
            user_cache = UserCache.from_user(event.from_user)

        await set_user_cache(user_cache)
        return await handler(event, data)