146 lines
4.8 KiB
Python
146 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Any, Iterable, Optional, cast
|
|
|
|
from errors import ConfigError
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Mapping:
|
|
sensor: str
|
|
thermostate: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FritzBoxConfig:
|
|
url: str
|
|
password: str
|
|
username: Optional[str] = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AppConfig:
|
|
fritzbox: FritzBoxConfig
|
|
mappings: list[Mapping]
|
|
update_timeout: int
|
|
log_level: str = "INFO"
|
|
offset_threshold: float = 0.5
|
|
dry_run: bool = False
|
|
force_ipv4: bool = True
|
|
log_ws_messages: bool = False
|
|
log_http_requests: bool = False
|
|
request_timeout: int = 10
|
|
request_retries: int = 2
|
|
|
|
|
|
def load_config(path: str) -> AppConfig:
|
|
with open(path, "r", encoding="utf-8") as config_file:
|
|
raw = json.load(config_file)
|
|
|
|
if not isinstance(raw, dict):
|
|
raise ConfigError("Config must be a JSON object")
|
|
raw = cast(dict[str, Any], raw)
|
|
|
|
fritzbox = raw.get("fritzbox")
|
|
if not isinstance(fritzbox, dict):
|
|
raise ConfigError("Missing or invalid 'fritzbox' config")
|
|
fritzbox = cast(dict[str, Any], fritzbox)
|
|
|
|
url = fritzbox.get("url")
|
|
password = fritzbox.get("password")
|
|
username = fritzbox.get("username")
|
|
|
|
if not isinstance(url, str) or not url:
|
|
raise ConfigError("Missing or invalid 'fritzbox.url' config")
|
|
if not isinstance(password, str) or not password:
|
|
raise ConfigError("Missing or invalid 'fritzbox.password' config")
|
|
if username is not None and not isinstance(username, str):
|
|
raise ConfigError("Invalid 'fritzbox.username' config")
|
|
|
|
mappings = _parse_mappings(raw.get("mappings"))
|
|
|
|
update_timeout = raw.get("update_timeout")
|
|
if not isinstance(update_timeout, int):
|
|
raise ConfigError("Missing or invalid 'update_timeout' config")
|
|
|
|
log_level = raw.get("log_level", "INFO")
|
|
if not isinstance(log_level, str):
|
|
raise ConfigError("Invalid 'log_level' config")
|
|
|
|
offset_threshold = raw.get("offset_threshold", 0.5)
|
|
if not isinstance(offset_threshold, (int, float)):
|
|
raise ConfigError("Invalid 'offset_threshold' config")
|
|
|
|
dry_run = raw.get("dry_run", False)
|
|
if not isinstance(dry_run, bool):
|
|
raise ConfigError("Invalid 'dry_run' config")
|
|
|
|
force_ipv4 = raw.get("force_ipv4", True)
|
|
if not isinstance(force_ipv4, bool):
|
|
raise ConfigError("Invalid 'force_ipv4' config")
|
|
|
|
log_ws_messages = raw.get("log_ws_messages", False)
|
|
if not isinstance(log_ws_messages, bool):
|
|
raise ConfigError("Invalid 'log_ws_messages' config")
|
|
|
|
log_http_requests = raw.get("log_http_requests", False)
|
|
if not isinstance(log_http_requests, bool):
|
|
raise ConfigError("Invalid 'log_http_requests' config")
|
|
|
|
request_timeout = raw.get("request_timeout", 10)
|
|
if not isinstance(request_timeout, int):
|
|
raise ConfigError("Invalid 'request_timeout' config")
|
|
|
|
request_retries = raw.get("request_retries", 2)
|
|
if not isinstance(request_retries, int):
|
|
raise ConfigError("Invalid 'request_retries' config")
|
|
|
|
return AppConfig(
|
|
fritzbox=FritzBoxConfig(url=url, password=password, username=username),
|
|
mappings=mappings,
|
|
update_timeout=update_timeout,
|
|
log_level=log_level,
|
|
offset_threshold=float(offset_threshold),
|
|
dry_run=dry_run,
|
|
force_ipv4=force_ipv4,
|
|
log_ws_messages=log_ws_messages,
|
|
log_http_requests=log_http_requests,
|
|
request_timeout=request_timeout,
|
|
request_retries=request_retries,
|
|
)
|
|
|
|
|
|
def _parse_mappings(value: Any) -> list[Mapping]:
|
|
if not isinstance(value, list):
|
|
raise ConfigError("Missing or invalid 'mappings' config")
|
|
value = cast(list[Any], value)
|
|
mappings: list[Mapping] = []
|
|
seen_pairs: set[tuple[str, str]] = set()
|
|
seen_thermostats: set[str] = set()
|
|
for idx, entry in enumerate(_iter_dicts(value)):
|
|
sensor = entry.get("sensor")
|
|
thermostate = entry.get("thermostate")
|
|
if not isinstance(sensor, str) or not isinstance(thermostate, str):
|
|
raise ConfigError(f"Invalid mapping at index {idx}")
|
|
if thermostate in seen_thermostats:
|
|
raise ConfigError(
|
|
f"Duplicate thermostate mapping at index {idx}: {thermostate}"
|
|
)
|
|
pair = (sensor, thermostate)
|
|
if pair in seen_pairs:
|
|
raise ConfigError(f"Duplicate mapping at index {idx}: {pair}")
|
|
seen_thermostats.add(thermostate)
|
|
seen_pairs.add(pair)
|
|
mappings.append(Mapping(sensor=sensor, thermostate=thermostate))
|
|
if not mappings:
|
|
raise ConfigError("'mappings' config must not be empty")
|
|
return mappings
|
|
|
|
|
|
def _iter_dicts(items: Iterable[Any]) -> Iterable[dict[str, Any]]:
|
|
for item in items:
|
|
if isinstance(item, dict):
|
|
yield item
|