2026-01-25 02:37:07 +01:00

146 lines
4.8 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Iterable, Optional, cast
from errors import ConfigError
@dataclass(frozen=True)
class Mapping:
sensor: str
thermostate: str
@dataclass(frozen=True)
class FritzBoxConfig:
url: str
password: str
username: Optional[str] = None
@dataclass(frozen=True)
class AppConfig:
fritzbox: FritzBoxConfig
mappings: list[Mapping]
update_timeout: int
log_level: str = "INFO"
offset_threshold: float = 0.5
dry_run: bool = False
force_ipv4: bool = True
log_ws_messages: bool = False
log_http_requests: bool = False
request_timeout: int = 10
request_retries: int = 2
def load_config(path: str) -> AppConfig:
with open(path, "r", encoding="utf-8") as config_file:
raw = json.load(config_file)
if not isinstance(raw, dict):
raise ConfigError("Config must be a JSON object")
raw = cast(dict[str, Any], raw)
fritzbox = raw.get("fritzbox")
if not isinstance(fritzbox, dict):
raise ConfigError("Missing or invalid 'fritzbox' config")
fritzbox = cast(dict[str, Any], fritzbox)
url = fritzbox.get("url")
password = fritzbox.get("password")
username = fritzbox.get("username")
if not isinstance(url, str) or not url:
raise ConfigError("Missing or invalid 'fritzbox.url' config")
if not isinstance(password, str) or not password:
raise ConfigError("Missing or invalid 'fritzbox.password' config")
if username is not None and not isinstance(username, str):
raise ConfigError("Invalid 'fritzbox.username' config")
mappings = _parse_mappings(raw.get("mappings"))
update_timeout = raw.get("update_timeout")
if not isinstance(update_timeout, int):
raise ConfigError("Missing or invalid 'update_timeout' config")
log_level = raw.get("log_level", "INFO")
if not isinstance(log_level, str):
raise ConfigError("Invalid 'log_level' config")
offset_threshold = raw.get("offset_threshold", 0.5)
if not isinstance(offset_threshold, (int, float)):
raise ConfigError("Invalid 'offset_threshold' config")
dry_run = raw.get("dry_run", False)
if not isinstance(dry_run, bool):
raise ConfigError("Invalid 'dry_run' config")
force_ipv4 = raw.get("force_ipv4", True)
if not isinstance(force_ipv4, bool):
raise ConfigError("Invalid 'force_ipv4' config")
log_ws_messages = raw.get("log_ws_messages", False)
if not isinstance(log_ws_messages, bool):
raise ConfigError("Invalid 'log_ws_messages' config")
log_http_requests = raw.get("log_http_requests", False)
if not isinstance(log_http_requests, bool):
raise ConfigError("Invalid 'log_http_requests' config")
request_timeout = raw.get("request_timeout", 10)
if not isinstance(request_timeout, int):
raise ConfigError("Invalid 'request_timeout' config")
request_retries = raw.get("request_retries", 2)
if not isinstance(request_retries, int):
raise ConfigError("Invalid 'request_retries' config")
return AppConfig(
fritzbox=FritzBoxConfig(url=url, password=password, username=username),
mappings=mappings,
update_timeout=update_timeout,
log_level=log_level,
offset_threshold=float(offset_threshold),
dry_run=dry_run,
force_ipv4=force_ipv4,
log_ws_messages=log_ws_messages,
log_http_requests=log_http_requests,
request_timeout=request_timeout,
request_retries=request_retries,
)
def _parse_mappings(value: Any) -> list[Mapping]:
if not isinstance(value, list):
raise ConfigError("Missing or invalid 'mappings' config")
value = cast(list[Any], value)
mappings: list[Mapping] = []
seen_pairs: set[tuple[str, str]] = set()
seen_thermostats: set[str] = set()
for idx, entry in enumerate(_iter_dicts(value)):
sensor = entry.get("sensor")
thermostate = entry.get("thermostate")
if not isinstance(sensor, str) or not isinstance(thermostate, str):
raise ConfigError(f"Invalid mapping at index {idx}")
if thermostate in seen_thermostats:
raise ConfigError(
f"Duplicate thermostate mapping at index {idx}: {thermostate}"
)
pair = (sensor, thermostate)
if pair in seen_pairs:
raise ConfigError(f"Duplicate mapping at index {idx}: {pair}")
seen_thermostats.add(thermostate)
seen_pairs.add(pair)
mappings.append(Mapping(sensor=sensor, thermostate=thermostate))
if not mappings:
raise ConfigError("'mappings' config must not be empty")
return mappings
def _iter_dicts(items: Iterable[Any]) -> Iterable[dict[str, Any]]:
for item in items:
if isinstance(item, dict):
yield item