from pathlib import Path from typing import Any, Mapping from aiofiles import open as open from aiogram.fsm.state import State from aiogram.fsm.storage.base import ( BaseStorage, DefaultKeyBuilder, KeyBuilder, StateType, StorageKey, ) from pydantic import TypeAdapter from pydantic.main import BaseModel class Record(BaseModel): data: dict[str, Any] = {} state: str | None = None class JsonStorage(BaseStorage): file_path: Path records: dict[str, Record] records_adapter: TypeAdapter key_builder: KeyBuilder def __init__(self, file_path: Path, key_builder: KeyBuilder | None = None) -> None: self.file_path = file_path self.records = {} self.records_adapter = TypeAdapter(dict[str, Record]) self.key_builder = DefaultKeyBuilder() if key_builder is None else key_builder async def read(self) -> None: async with open(self.file_path, "rb") as file: json = await file.read() self.records = self.records_adapter.validate_json(json) async def flush(self) -> None: async with open(self.file_path, "wb") as file: json = self.records_adapter.dump_json(self.records) await file.write(json) async def get_record(self, key: StorageKey) -> Record: await self.read() record_key = self.key_builder.build(key) if record_key not in self.records: self.records[record_key] = Record() return self.records[record_key] async def set_state(self, key: StorageKey, state: StateType = None) -> None: record = await self.get_record(key) record.state = state.state if isinstance(state, State) else state await self.flush() async def get_state(self, key: StorageKey) -> str | None: record = await self.get_record(key) return record.state async def set_data(self, key: StorageKey, data: Mapping[str, Any]) -> None: if not isinstance(data, dict): raise TypeError( f"Data must be a dict or dict-like object, got {type(data).__name__}", data, ) record = await self.get_record(key) record.data = data.copy() await self.flush() async def get_data(self, key: StorageKey) -> dict[str, Any]: record = await self.get_record(key) return record.data async def close(self) -> None: await self.flush()