From f7b7e87cffc9dcb2817b070d7a003ac234c96ec3 Mon Sep 17 00:00:00 2001 From: Tolmachev Igor Date: Mon, 23 Mar 2026 18:07:30 +0300 Subject: Add new_announcement command --- libs/storage.py | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 libs/storage.py (limited to 'libs/storage.py') diff --git a/libs/storage.py b/libs/storage.py new file mode 100644 index 0000000..6220cfc --- /dev/null +++ b/libs/storage.py @@ -0,0 +1,75 @@ +from pathlib import Path +from typing import Any, Mapping + +from aiofiles import open as open +from aiogram.fsm.state import State +from aiogram.fsm.storage.base import ( + BaseStorage, + DefaultKeyBuilder, + KeyBuilder, + StateType, + StorageKey, +) +from pydantic import TypeAdapter +from pydantic.main import BaseModel + + +class Record(BaseModel): + data: dict[str, Any] = {} + state: str | None = None + + +class JsonStorage(BaseStorage): + file_path: Path + records: dict[str, Record] + records_adapter: TypeAdapter + key_builder: KeyBuilder + + def __init__(self, file_path: Path, key_builder: KeyBuilder | None = None) -> None: + self.file_path = file_path + self.records = {} + self.records_adapter = TypeAdapter(dict[str, Record]) + self.key_builder = DefaultKeyBuilder() if key_builder is None else key_builder + + async def read(self) -> None: + async with open(self.file_path, "rb") as file: + json = await file.read() + self.records = self.records_adapter.validate_json(json) + + async def flush(self) -> None: + async with open(self.file_path, "wb") as file: + json = self.records_adapter.dump_json(self.records) + await file.write(json) + + async def get_record(self, key: StorageKey) -> Record: + await self.read() + record_key = self.key_builder.build(key) + if record_key not in self.records: + self.records[record_key] = Record() + return self.records[record_key] + + async def set_state(self, key: StorageKey, state: StateType = None) -> None: + record = await self.get_record(key) + record.state = state.state if isinstance(state, State) else state + await self.flush() + + async def get_state(self, key: StorageKey) -> str | None: + record = await self.get_record(key) + return record.state + + async def set_data(self, key: StorageKey, data: Mapping[str, Any]) -> None: + if not isinstance(data, dict): + raise TypeError( + f"Data must be a dict or dict-like object, got {type(data).__name__}", + data, + ) + record = await self.get_record(key) + record.data = data.copy() + await self.flush() + + async def get_data(self, key: StorageKey) -> dict[str, Any]: + record = await self.get_record(key) + return record.data + + async def close(self) -> None: + await self.flush() -- cgit v1.3