1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
from pathlib import Path
from typing import Any, Mapping
from aiofiles import open as open
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import (
BaseStorage,
DefaultKeyBuilder,
KeyBuilder,
StateType,
StorageKey,
)
from pydantic import TypeAdapter
from pydantic.main import BaseModel
class Record(BaseModel):
data: dict[str, Any] = {}
state: str | None = None
class JsonStorage(BaseStorage):
file_path: Path
records: dict[str, Record]
records_adapter: TypeAdapter
key_builder: KeyBuilder
def __init__(self, file_path: Path, key_builder: KeyBuilder | None = None) -> None:
self.file_path = file_path
self.records = {}
self.records_adapter = TypeAdapter(dict[str, Record])
self.key_builder = DefaultKeyBuilder() if key_builder is None else key_builder
async def read(self) -> None:
async with open(self.file_path, "rb") as file:
json = await file.read()
self.records = self.records_adapter.validate_json(json)
async def flush(self) -> None:
async with open(self.file_path, "wb") as file:
json = self.records_adapter.dump_json(self.records)
await file.write(json)
async def get_record(self, key: StorageKey) -> Record:
await self.read()
record_key = self.key_builder.build(key)
if record_key not in self.records:
self.records[record_key] = Record()
return self.records[record_key]
async def set_state(self, key: StorageKey, state: StateType = None) -> None:
record = await self.get_record(key)
record.state = state.state if isinstance(state, State) else state
await self.flush()
async def get_state(self, key: StorageKey) -> str | None:
record = await self.get_record(key)
return record.state
async def set_data(self, key: StorageKey, data: Mapping[str, Any]) -> None:
if not isinstance(data, dict):
raise TypeError(
f"Data must be a dict or dict-like object, got {type(data).__name__}",
data,
)
record = await self.get_record(key)
record.data = data.copy()
await self.flush()
async def get_data(self, key: StorageKey) -> dict[str, Any]:
record = await self.get_record(key)
return record.data
async def close(self) -> None:
await self.flush()
|