1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
from typing import Any, Awaitable, Callable
from aiogram import BaseMiddleware
from aiogram.types import CallbackQuery, Message, TelegramObject
from sqlalchemy.ext.asyncio import AsyncSession
from database import sessions
from libs.user import set_user_cache
from models import User
from models.user import UserCache
class InjectSessionMiddleware(BaseMiddleware):
async def __call__[T](
self,
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: dict[str, Any],
) -> Any:
async with sessions.begin() as session:
data["session"] = session
handler_result = await handler(event, data)
return handler_result
class UserAccessMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: dict[str, Any],
) -> Any:
if not isinstance(event, (Message, CallbackQuery)):
raise TypeError(
f"UserAccessMiddleware doesn't support event with type: {type(event).__name__}",
event,
)
if isinstance(event, Message):
user_id = event.chat.id
else:
user_id = event.from_user.id
session: AsyncSession = data["session"]
user = await session.get(User, user_id)
if user is None:
error_text = "Вы не добавлены в список пользователей VPN"
if isinstance(event, Message):
await event.answer(error_text)
else:
await event.answer(error_text)
return
data["user"] = user
return await handler(event, data)
class UserCacheMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: dict[str, Any],
) -> Any:
if not isinstance(event, (Message, CallbackQuery)):
raise TypeError(
f"UserAccessMiddleware doesn't support event with type: {type(event).__name__}",
event,
)
if isinstance(event, Message):
user_cache = UserCache.from_chat(event.chat)
else:
user_cache = UserCache.from_user(event.from_user)
await set_user_cache(user_cache)
return await handler(event, data)
|