From 4aaddf23e0ddfec021aa2b10cedf268168785930 Mon Sep 17 00:00:00 2001 From: Horlabs Date: Sun, 25 Jan 2026 02:37:07 +0100 Subject: [PATCH] Improve stability etc --- .gitignore | 2 + fritz_temp_sync/Dockerfile | 2 +- fritz_temp_sync/README.md | 56 ++ fritz_temp_sync/config.py | 145 ++++ fritz_temp_sync/config.yaml | 19 +- fritz_temp_sync/device.py | 950 ++++++++------------------- fritz_temp_sync/errors.py | 10 + fritz_temp_sync/fritzbox.py | 280 +++++--- fritz_temp_sync/ha_types.py | 23 + fritz_temp_sync/homeassistant.py | 96 +-- fritz_temp_sync/logging_config.py | 16 + fritz_temp_sync/sync_controller.py | 215 ++++++ fritz_temp_sync/sync_ha_fb.py | 146 ++-- fritz_temp_sync/tests/test_config.py | 38 ++ 14 files changed, 1083 insertions(+), 915 deletions(-) create mode 100644 fritz_temp_sync/README.md create mode 100644 fritz_temp_sync/config.py create mode 100644 fritz_temp_sync/errors.py create mode 100644 fritz_temp_sync/ha_types.py create mode 100644 fritz_temp_sync/logging_config.py create mode 100644 fritz_temp_sync/sync_controller.py create mode 100644 fritz_temp_sync/tests/test_config.py diff --git a/.gitignore b/.gitignore index c0c8839..578a288 100644 --- a/.gitignore +++ b/.gitignore @@ -244,3 +244,5 @@ fabric.properties # local config test options.json .DS_Store + +options.json \ No newline at end of file diff --git a/fritz_temp_sync/Dockerfile b/fritz_temp_sync/Dockerfile index 67d9b0c..e8b78bd 100755 --- a/fritz_temp_sync/Dockerfile +++ b/fritz_temp_sync/Dockerfile @@ -2,7 +2,7 @@ ARG BUILD_FROM FROM $BUILD_FROM # Install requirements for add-on -RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests +RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-httpx WORKDIR /data diff --git a/fritz_temp_sync/README.md b/fritz_temp_sync/README.md new file mode 100644 index 0000000..8fe7ed0 --- /dev/null +++ b/fritz_temp_sync/README.md @@ -0,0 +1,56 @@ +# Fritz!Box Temperature Sync + +Dieses Add-on synchronisiert die gemessene Temperatur von Home-Assistant-Sensoren mit Fritz!DECT-Thermostaten, indem die Offset-Korrektur an der Fritz!Box angepasst wird. + +## Voraussetzungen +- Home Assistant Add-on Umgebung +- Fritz!Box mit Fritz!DECT-Thermostaten +- Home Assistant Entitäten für Thermostate und Temperatursensoren + +## Konfiguration +Die Optionen werden über das Add-on-Formular oder die `options.json` gesetzt. + +### Optionen +- `fritzbox.url` (URL, erforderlich): Basis-URL der Fritz!Box, z. B. `http://fritz.box` +- `fritzbox.username` (string, optional): Benutzername +- `fritzbox.password` (string, erforderlich): Passwort +- `mappings` (Liste, erforderlich): Zuordnung von Sensor → Thermostat +- `update_timeout` (int, erforderlich): Mindestabstand in Minuten zwischen Offset-Updates pro Thermostat +- `log_level` (string, optional): z. B. `DEBUG`, `INFO`, `WARNING` +- `offset_threshold` (float, optional): Mindestabweichung in °C, ab der ein Offset gesetzt wird (Default 0.5) +- `dry_run` (bool, optional): Wenn `true`, werden keine Änderungen an der Fritz!Box vorgenommen + +### Beispiel +```json +{ + "fritzbox": { + "url": "http://fritz.box", + "username": "ha", + "password": "***" + }, + "mappings": [ + { + "sensor": "sensor.room_temperature", + "thermostate": "climate.room" + } + ], + "update_timeout": 15, + "log_level": "INFO", + "offset_threshold": 0.5, + "dry_run": false +} +``` + +## Verhalten +- Bei jedem relevanten `state_changed`-Event wird die Differenz zwischen Thermostat- und Sensorwert geprüft. +- Erst ab `offset_threshold` erfolgt eine Offset-Korrektur. +- Pro Thermostat wird maximal alle `update_timeout` Minuten ein Update durchgeführt. + +## Logging +- `INFO`: Start/Stop, erfolgreiche Authentifizierung, Offset-Korrekturen +- `WARNING`: Verbindungsprobleme, Retries, Dry-Run-Hinweise +- `DEBUG`: Detailzustände und Entscheidungslogik + +## Hinweise +- Das Add-on schreibt keine Payload-Snapshots mehr auf die Platte. +- Die Zuordnung erfolgt über die Entity-ID-Namen aus Home Assistant. diff --git a/fritz_temp_sync/config.py b/fritz_temp_sync/config.py new file mode 100644 index 0000000..32e2dff --- /dev/null +++ b/fritz_temp_sync/config.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any, Iterable, Optional, cast + +from errors import ConfigError + + +@dataclass(frozen=True) +class Mapping: + sensor: str + thermostate: str + + +@dataclass(frozen=True) +class FritzBoxConfig: + url: str + password: str + username: Optional[str] = None + + +@dataclass(frozen=True) +class AppConfig: + fritzbox: FritzBoxConfig + mappings: list[Mapping] + update_timeout: int + log_level: str = "INFO" + offset_threshold: float = 0.5 + dry_run: bool = False + force_ipv4: bool = True + log_ws_messages: bool = False + log_http_requests: bool = False + request_timeout: int = 10 + request_retries: int = 2 + + +def load_config(path: str) -> AppConfig: + with open(path, "r", encoding="utf-8") as config_file: + raw = json.load(config_file) + + if not isinstance(raw, dict): + raise ConfigError("Config must be a JSON object") + raw = cast(dict[str, Any], raw) + + fritzbox = raw.get("fritzbox") + if not isinstance(fritzbox, dict): + raise ConfigError("Missing or invalid 'fritzbox' config") + fritzbox = cast(dict[str, Any], fritzbox) + + url = fritzbox.get("url") + password = fritzbox.get("password") + username = fritzbox.get("username") + + if not isinstance(url, str) or not url: + raise ConfigError("Missing or invalid 'fritzbox.url' config") + if not isinstance(password, str) or not password: + raise ConfigError("Missing or invalid 'fritzbox.password' config") + if username is not None and not isinstance(username, str): + raise ConfigError("Invalid 'fritzbox.username' config") + + mappings = _parse_mappings(raw.get("mappings")) + + update_timeout = raw.get("update_timeout") + if not isinstance(update_timeout, int): + raise ConfigError("Missing or invalid 'update_timeout' config") + + log_level = raw.get("log_level", "INFO") + if not isinstance(log_level, str): + raise ConfigError("Invalid 'log_level' config") + + offset_threshold = raw.get("offset_threshold", 0.5) + if not isinstance(offset_threshold, (int, float)): + raise ConfigError("Invalid 'offset_threshold' config") + + dry_run = raw.get("dry_run", False) + if not isinstance(dry_run, bool): + raise ConfigError("Invalid 'dry_run' config") + + force_ipv4 = raw.get("force_ipv4", True) + if not isinstance(force_ipv4, bool): + raise ConfigError("Invalid 'force_ipv4' config") + + log_ws_messages = raw.get("log_ws_messages", False) + if not isinstance(log_ws_messages, bool): + raise ConfigError("Invalid 'log_ws_messages' config") + + log_http_requests = raw.get("log_http_requests", False) + if not isinstance(log_http_requests, bool): + raise ConfigError("Invalid 'log_http_requests' config") + + request_timeout = raw.get("request_timeout", 10) + if not isinstance(request_timeout, int): + raise ConfigError("Invalid 'request_timeout' config") + + request_retries = raw.get("request_retries", 2) + if not isinstance(request_retries, int): + raise ConfigError("Invalid 'request_retries' config") + + return AppConfig( + fritzbox=FritzBoxConfig(url=url, password=password, username=username), + mappings=mappings, + update_timeout=update_timeout, + log_level=log_level, + offset_threshold=float(offset_threshold), + dry_run=dry_run, + force_ipv4=force_ipv4, + log_ws_messages=log_ws_messages, + log_http_requests=log_http_requests, + request_timeout=request_timeout, + request_retries=request_retries, + ) + + +def _parse_mappings(value: Any) -> list[Mapping]: + if not isinstance(value, list): + raise ConfigError("Missing or invalid 'mappings' config") + value = cast(list[Any], value) + mappings: list[Mapping] = [] + seen_pairs: set[tuple[str, str]] = set() + seen_thermostats: set[str] = set() + for idx, entry in enumerate(_iter_dicts(value)): + sensor = entry.get("sensor") + thermostate = entry.get("thermostate") + if not isinstance(sensor, str) or not isinstance(thermostate, str): + raise ConfigError(f"Invalid mapping at index {idx}") + if thermostate in seen_thermostats: + raise ConfigError( + f"Duplicate thermostate mapping at index {idx}: {thermostate}" + ) + pair = (sensor, thermostate) + if pair in seen_pairs: + raise ConfigError(f"Duplicate mapping at index {idx}: {pair}") + seen_thermostats.add(thermostate) + seen_pairs.add(pair) + mappings.append(Mapping(sensor=sensor, thermostate=thermostate)) + if not mappings: + raise ConfigError("'mappings' config must not be empty") + return mappings + + +def _iter_dicts(items: Iterable[Any]) -> Iterable[dict[str, Any]]: + for item in items: + if isinstance(item, dict): + yield item diff --git a/fritz_temp_sync/config.yaml b/fritz_temp_sync/config.yaml index 25e7e4f..2cab808 100755 --- a/fritz_temp_sync/config.yaml +++ b/fritz_temp_sync/config.yaml @@ -1,9 +1,9 @@ name: "Fritz!Box Temperature Sync Dev" description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant" -version: "0.4.3" +version: "0.5" startup: "application" stage: "stable" -slug: "fritz_temp_sync_dev" +slug: "fritz_temp_sync" homeassistant_api: true init: false arch: @@ -20,6 +20,14 @@ options: - sensor: null thermostate: null update_timeout: 15 + log_level: "INFO" + offset_threshold: 0.5 + dry_run: false + force_ipv4: true + log_ws_messages: false + log_http_requests: false + request_timeout: 10 + request_retries: 2 schema: fritzbox: url: url @@ -30,3 +38,10 @@ schema: thermostate: str update_timeout: int log_level: "str?" + offset_threshold: float? + dry_run: bool? + force_ipv4: bool? + log_ws_messages: bool? + log_http_requests: bool? + request_timeout: int? + request_retries: int? diff --git a/fritz_temp_sync/device.py b/fritz_temp_sync/device.py index a192e2d..a69ead5 100755 --- a/fritz_temp_sync/device.py +++ b/fritz_temp_sync/device.py @@ -1,714 +1,276 @@ - from __future__ import annotations -from abc import ABC, abstractmethod -from enum import Enum, IntFlag, auto -from typing import Dict, List, Optional, Union -import typing +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, cast -class WeekDay(IntFlag): - MON = 0b1 - TUE = 0b10 - WED = 0b100 - THU = 0b1000 - FRI = 0b10000 - SAT = 0b100000 - SUN = 0b1000000 +@dataclass +class TemperatureSensor: + current_celsius: Optional[float] + offset: Optional[float] -class Manufacturer: - def __init__(self, name: str): - self.name: str = name +@dataclass +class Device: + id: int + display_name: str + temp_sensor: TemperatureSensor + raw_state: Dict[str, Any] - def __repr__(self): - return f"name: {self.name}" - - def to_json(self): - return {"name": self.name} - - @staticmethod - def parse_dict(manufacturer: Dict): - return Manufacturer(manufacturer["name"]) - - -class FirmwareVersion: - def __init__(self, search: bool, current: str, update: bool, running: bool): - self.search: bool = search - self.current: str = current - self.update: bool = update - self.running: bool = running - - def __repr__(self): - return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}" - - def to_json(self): - return {"search": self.search, "current": self.current, "update": self.update, "running": self.running} - - @staticmethod - def parse_dict(firmware_version: dict): - return FirmwareVersion(firmware_version["search"], - firmware_version["current"], - firmware_version["update"], - firmware_version["running"]) - - -class PushService: - def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool): - self.mail_address: str = mail_address - self.unit_settings: List = unit_settings - self.is_enabled: bool = is_enabled - - def __repr__(self): - return f"" - - def to_json(self): - return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled} - - @staticmethod - def parse_dict(push_service: Dict): - return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"]) - - -class SkillType(Enum): - SmartHomeTemperatureSensor = auto() - SmartHomeThermostat = auto() - SmartHomeBattery = auto() - - -class Skill(ABC): - def __init__(self, skill_type: SkillType): - self.type: SkillType = skill_type - - @abstractmethod - def to_json(self): - pass - - @abstractmethod - def to_web_data(self, device_id: int): - pass - - @staticmethod - def parse_dict(skill: Dict): - skill_type = SkillType[skill["type"]] - if skill_type == SkillType.SmartHomeTemperatureSensor: - return TemperatureSkill.parse_dict(skill) - elif skill_type == SkillType.SmartHomeThermostat: - return ThermostatSkill.parse_dict(skill) - elif skill_type == SkillType.SmartHomeBattery: - return BatterySkill.parse_dict(skill) + def __init__(self, state: Dict[str, Any]): + self.raw_state = state + id_value = state.get("id") + if isinstance(id_value, int): + self.id = id_value else: - raise NotImplementedError(skill_type) - - -class PresetName(Enum): - LOWER_TEMPERATURE = auto() - UPPER_TEMPERATURE = auto() - HOLIDAY_TEMPERATURE = auto() - - -class Preset: - def __init__(self, name: PresetName, temperature: Optional[int]): - self.name: PresetName = name - self.temperature: Optional[int] = temperature - - def __repr__(self): - return f"" - - def to_json(self): - return {"name": self.name.name, "temperature": self.temperature} - - -class Description: - def __init__(self, action: str, preset_temperature: Optional[Preset]): - self.action: str = action - self.preset_temperature: Optional[Preset] = preset_temperature - - def __repr__(self): - desc = f"" - - def to_json(self): - return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()} - - @staticmethod - def parse_dict(change: Dict): - return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"])) - - -class TemperatureDropDetection: - def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int): - self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes - self.sensitivity: int = sensitivity - - def __repr__(self): - # ToDo - pass - - def to_json(self): - return { - "doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes, - "sensitivity": self.sensitivity - } - - def to_web_data(self, device_id: int): - return { - "WindowOpenTrigger": self.sensitivity + 3, - "WindowOpenTimer": self.do_not_heat_offset_in_minutes - } - - @staticmethod - def parse_dict(drop_detection: Dict): - return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"], - drop_detection["sensitivity"]) - - -class ScheduleKind(Enum): - REPETITIVE = auto() - WEEKLY_TIMETABLE = auto() - - -class ThermostatSkillMode(Enum): - TARGET_TEMPERATURE = auto() - - -class Action: - def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description): - self.is_enabled: bool = is_enabled - self.time_setting: TimeSetting = time_setting - self.description: Description = description - - def __repr__(self): - return f"" - - def to_json(self): - return {"isEnabled": self.is_enabled, - "timeSetting": self.time_setting.to_json(), - "description": self.description.to_json()} - - @staticmethod - def parse_dict(action: Dict): - return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]), - Description.parse_dict(action["description"])) - - -class Schedule: - def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]): - self.is_enabled: bool = is_enabled - self.kind: ScheduleKind = kind - self.name: str = name - self.actions: List[Action] = actions - - def __repr__(self): - return f"" - - def to_json(self): - return { - "isEnabled": self.is_enabled, - "kind": self.kind.name, - "name": self.name, - "actions": [action.to_json() for action in self.actions] - } - - def to_web_data(self, device_id: int): - data = {} - if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names? - enabled_count = 0 - for num, holiday in enumerate(self.actions): - num += 1 - _, start_month, start_day = holiday.time_setting.start_date.split("-") - _, end_month, end_day = holiday.time_setting.end_date.split("-") - data.update({ - f"Holiday{num}StartDay": start_day, - f"Holiday{num}StartMonth": start_month, - f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0], - f"Holiday{num}EndDay": end_day, - f"Holiday{num}EndMonth": end_month, - f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0], - f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0, - f"Holiday{num}ID": num - }) - if holiday.is_enabled: - enabled_count += 1 - data["HolidayEnabledCount"] = enabled_count - elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME": - _, start_month, start_day = self.actions[0].time_setting.start_date.split("-") - _, end_month, end_day = self.actions[0].time_setting.end_date.split("-") - data = { - "SummerStartDay": start_day, - "SummerStartMonth": start_month, - "SummerEndDay": end_day, - "SummerEndMonth": end_month, - "SummerEnabled": 1 if self.is_enabled else 0 - } - elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE": - timer_items = {} - for action in self.actions: - if not action.is_enabled: - continue - time = "".join(action.time_setting.start_time.split(":")[:-1]) - if time not in timer_items.keys(): - timer_items[time] = [0, 0] - heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0 - days = timer_items[time][heat] - days |= action.time_setting.day_of_week - timer_items[time][heat] = days - num = 0 - for key in timer_items.keys(): - if timer_items[key][0] != 0: - data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"}) - num += 1 - if timer_items[key][1] != 0: - data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"}) - num += 1 + self.id = -1 + name_value = state.get("displayName") + if isinstance(name_value, str): + self.display_name = name_value else: - raise NotImplementedError(self.name) - - return data - - @staticmethod - def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts - return Schedule(schedule["isEnabled"], - ScheduleKind[schedule["kind"]], - schedule["name"], - [Action.parse_dict(action) for action in schedule["actions"]]) - - -class TimeControl: - def __init__(self, is_enabled: bool, time_schedules: List[Schedule]): - self.is_enabled: bool = is_enabled - self.time_schedules: List[Schedule] = time_schedules - - def __repr__(self): - return f" Dict[str, Any]: + data: Dict[str, Any] = { "device": self.id, - "ule_device_name": self.display_name + "ule_device_name": self.display_name, } - for unit in self.units: - data.update(unit.to_web_data(self.id)) + + units = self._get_units() + for unit in units: + data.update(self._unit_to_web_data(unit)) return data - def to_json(self): - state = {"type": self.type.name, - "isDeletable": self.is_deletable, - "id": self.id, - "masterConnectionState": self.master_connection_state, - "displayName": self.display_name, - "category": self.category, - "units": [unit.to_json() for unit in self.units], - "firmwareVersion": self.firmware_version.to_json(), - "model": self.model, - "isEditable": self.is_editable, - "manufacturer": self.manufacturer.to_json(), - "pushService": self.push_service.to_json(), - "actorIdentificationNumber": self.actor_identification_number} + def set_offset(self, offset: float) -> None: + self.temp_sensor.offset = offset + self._set_temp_value("offset", offset) - return state + def get_offset(self) -> Optional[float]: + return self.temp_sensor.offset - def set_offset(self, offset: float): - temp_sensor: Optional[Unit] = None - for unit in self.units: - if unit.type == UnitTypes.TEMPERATURE_SENSOR: - temp_sensor = unit - break - temp_skill: Optional[TemperatureSkill] = None - for skill in temp_sensor.skills: - if skill.type == SkillType.SmartHomeTemperatureSensor: - temp_skill = typing.cast(TemperatureSkill, skill) - break - temp_skill.offset = offset + def get_temperature(self) -> Optional[float]: + return self.temp_sensor.current_celsius - def get_offset(self): - temp_sensor: Optional[Unit] = None - for unit in self.units: - if unit.type == UnitTypes.TEMPERATURE_SENSOR: - temp_sensor = unit - break - temp_skill: Optional[TemperatureSkill] = None - for skill in temp_sensor.skills: - if skill.type == SkillType.SmartHomeTemperatureSensor: - temp_skill = typing.cast(TemperatureSkill, skill) - break - return temp_skill.offset + def _get_temp_value(self, key: str) -> Optional[float]: + sensor_skill = self._find_device_temp_skill() + if sensor_skill is None: + return None + value = sensor_skill.get(key) + if isinstance(value, (int, float)): + return float(value) + return None - def get_temperature(self): - temp_sensor: Optional[Unit] = None - for unit in self.units: - if unit.type == UnitTypes.TEMPERATURE_SENSOR: - temp_sensor = unit - break - temp_skill: Optional[TemperatureSkill] = None - for skill in temp_sensor.skills: - if skill.type == SkillType.SmartHomeTemperatureSensor: - temp_skill = typing.cast(TemperatureSkill, skill) - break - return temp_skill.current_in_celsius + def _set_temp_value(self, key: str, value: float) -> None: + sensor_skill = self._find_device_temp_skill() + if sensor_skill is None: + return + sensor_skill[key] = value - def to_short_json(self): - return { - "masterConnectionState": self.master_connection_state, - "type": self.type.name, - "model": self.model, - "id": self.id, - "manufacturer": self.manufacturer.to_json(), - "actorIdentificationNumber": self.actor_identification_number, - "displayName": self.display_name - } \ No newline at end of file + def _find_device_temp_skill(self) -> Optional[Dict[str, Any]]: + for unit in self._as_dict_list(self.raw_state.get("units")): + if unit.get("type") != "TEMPERATURE_SENSOR": + continue + if unit.get("id") != self.id: + continue + for skill in self._as_dict_list(unit.get("skills")): + if skill.get("type") == "SmartHomeTemperatureSensor": + return skill + return None + + def _get_units(self) -> List[Dict[str, Any]]: + return self._as_dict_list(self.raw_state.get("units")) + + def _unit_to_web_data(self, unit: Dict[str, Any]) -> Dict[str, Any]: + data: Dict[str, Any] = {} + for skill in self._as_dict_list(unit.get("skills")): + skill_type = skill.get("type") + if skill_type == "SmartHomeThermostat": + data.update(self._thermostat_to_web_data(skill)) + elif skill_type == "SmartHomeTemperatureSensor": + data.update(self._temperature_to_web_data(skill)) + return data + + def _temperature_to_web_data(self, skill: Dict[str, Any]) -> Dict[str, Any]: + data: Dict[str, Any] = {} + current = skill.get("currentInCelsius") + offset = skill.get("offset") + if current is not None: + data["Roomtemp"] = current + if offset is not None: + data["Offset"] = offset + return data + + def _thermostat_to_web_data(self, skill: Dict[str, Any]) -> Dict[str, Any]: + data: Dict[str, Any] = {} + + upper = 0.0 + lower = 0.0 + for preset in self._as_dict_list(skill.get("presets")): + if preset.get("name") == "UPPER_TEMPERATURE": + upper = self._as_float(preset.get("temperature"), upper) + elif preset.get("name") == "LOWER_TEMPERATURE": + lower = self._as_float(preset.get("temperature"), lower) + data.update({"Heiztemp": upper, "Absenktemp": lower}) + + used_temp_sensor = self._as_dict(skill.get("usedTempSensor")) + if used_temp_sensor is not None: + used_id = used_temp_sensor.get("id") + if used_id == self.id: + data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"}) + else: + data.update({"ExtTempsensorID": used_id, "tempsensor": "extern"}) + + time_control = self._as_dict(skill.get("timeControl")) + if time_control is not None: + data.update(self._time_control_to_web_data(time_control)) + + drop_detection = self._as_dict(skill.get("temperatureDropDetection")) + if drop_detection is not None: + data.update(self._drop_detection_to_web_data(drop_detection)) + + return data + + def _drop_detection_to_web_data(self, drop: Dict[str, Any]) -> Dict[str, Any]: + sensitivity = drop.get("sensitivity") + do_not_heat = drop.get("doNotHeatOffsetInMinutes") + data: Dict[str, Any] = {} + if sensitivity is not None: + data["WindowOpenTrigger"] = sensitivity + 3 + if do_not_heat is not None: + data["WindowOpenTimer"] = do_not_heat + return data + + def _time_control_to_web_data(self, time_control: Dict[str, Any]) -> Dict[str, Any]: + data: Dict[str, Any] = {} + for schedule in self._as_dict_list(time_control.get("timeSchedules")): + data.update(self._schedule_to_web_data(schedule)) + return data + + def _schedule_to_web_data(self, schedule: Dict[str, Any]) -> Dict[str, Any]: + data: Dict[str, Any] = {} + kind = schedule.get("kind") + name = schedule.get("name") + actions = self._as_dict_list(schedule.get("actions")) + + if kind == "REPETITIVE" and name == "HOLIDAYS": + enabled_count = 0 + for num, holiday in enumerate(actions, start=1): + time_setting = self._as_dict(holiday.get("timeSetting")) + if time_setting is None: + continue + start_date = self._as_str(time_setting.get("startDate"), "0000-00-00") + end_date = self._as_str(time_setting.get("endDate"), "0000-00-00") + start_time = self._as_str(time_setting.get("startTime"), "00:00:00") + end_time = self._as_str(time_setting.get("endTime"), "00:00:00") + _, start_month, start_day = start_date.split("-") + _, end_month, end_day = end_date.split("-") + data.update( + { + f"Holiday{num}StartDay": start_day, + f"Holiday{num}StartMonth": start_month, + f"Holiday{num}StartHour": start_time.split(":")[0], + f"Holiday{num}EndDay": end_day, + f"Holiday{num}EndMonth": end_month, + f"Holiday{num}EndHour": end_time.split(":")[0], + f"Holiday{num}Enabled": 1 if holiday.get("isEnabled") else 0, + f"Holiday{num}ID": num, + } + ) + if holiday.get("isEnabled"): + enabled_count += 1 + data["HolidayEnabledCount"] = enabled_count + elif kind == "REPETITIVE" and name == "SUMMER_TIME" and actions: + action = actions[0] + time_setting = self._as_dict(action.get("timeSetting")) + if time_setting is not None: + start_date = self._as_str(time_setting.get("startDate"), "0000-00-00") + end_date = self._as_str(time_setting.get("endDate"), "0000-00-00") + _, start_month, start_day = start_date.split("-") + _, end_month, end_day = end_date.split("-") + data = { + "SummerStartDay": start_day, + "SummerStartMonth": start_month, + "SummerEndDay": end_day, + "SummerEndMonth": end_month, + "SummerEnabled": 1 if schedule.get("isEnabled") else 0, + } + elif kind == "WEEKLY_TIMETABLE" and name == "TEMPERATURE": + timer_items: Dict[str, List[int]] = {} + for action in actions: + if not action.get("isEnabled"): + continue + time_setting = self._as_dict(action.get("timeSetting")) + if time_setting is None: + continue + start_time = self._as_str(time_setting.get("startTime"), "") + if not start_time: + continue + time = "".join(start_time.split(":")[:-1]) + if time not in timer_items: + timer_items[time] = [0, 0] + description = self._as_dict(action.get("description")) + preset_temperature = None + if description is not None: + preset_temperature = self._as_dict( + description.get("presetTemperature") + ) + heat = 0 + if ( + preset_temperature is not None + and preset_temperature.get("name") == "UPPER_TEMPERATURE" + ): + heat = 1 + day_of_week = self._as_int(time_setting.get("dayOfWeek"), 0) + timer_items[time][heat] |= day_of_week + num = 0 + for key, value in timer_items.items(): + if value[0] != 0: + data.update({f"timer_item_{num}": f"{key};0;{value[0]}"}) + num += 1 + if value[1] != 0: + data.update({f"timer_item_{num}": f"{key};1;{value[1]}"}) + num += 1 + + return data + + @staticmethod + def _as_dict_list(value: Any) -> List[Dict[str, Any]]: + result: List[Dict[str, Any]] = [] + if not isinstance(value, list): + return result + items = cast(List[Any], value) + for item in items: + if isinstance(item, dict): + result.append(cast(Dict[str, Any], item)) + return result + + @staticmethod + def _as_dict(value: Any) -> Optional[Dict[str, Any]]: + if isinstance(value, dict): + return cast(Dict[str, Any], value) + return None + + @staticmethod + def _as_str(value: Any, default: str) -> str: + if isinstance(value, str): + return value + return default + + @staticmethod + def _as_int(value: Any, default: int) -> int: + if isinstance(value, int): + return value + return default + + @staticmethod + def _as_float(value: Any, default: float) -> float: + if isinstance(value, (int, float)): + return float(value) + return default + + +__all__ = ["Device", "TemperatureSensor"] diff --git a/fritz_temp_sync/errors.py b/fritz_temp_sync/errors.py new file mode 100644 index 0000000..b6be1d6 --- /dev/null +++ b/fritz_temp_sync/errors.py @@ -0,0 +1,10 @@ +class ConfigError(ValueError): + pass + + +class HomeAssistantError(RuntimeError): + pass + + +class FritzBoxError(RuntimeError): + pass diff --git a/fritz_temp_sync/fritzbox.py b/fritz_temp_sync/fritzbox.py index 36e1191..7c6dd2b 100755 --- a/fritz_temp_sync/fritzbox.py +++ b/fritz_temp_sync/fritzbox.py @@ -8,81 +8,153 @@ import re import xml.etree.ElementTree as ET from asyncio import Task from datetime import datetime, timedelta -from typing import Optional, Dict, List - -import requests +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse +import httpx # type: ignore[import-not-found] from device import Device +from errors import FritzBoxError + +logger = logging.getLogger(__name__) + class FritzBox: - def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None: + def __init__( + self, + url: str, + password: str, + update_timeout: int, + user: Optional[str] = None, + dry_run: bool = False, + force_ipv4: bool = False, + request_timeout: int = 10, + request_retries: int = 2, + ) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", - "data": "data.lua" + "data": "data.lua", } self.url: str = url self.dry_run: bool = dry_run self.user: Optional[str] = user - self.session: requests.Session = requests.Session() + if force_ipv4: + transport = httpx.AsyncHTTPTransport(local_address="0.0.0.0") + else: + transport = None + self.session: httpx.AsyncClient = httpx.AsyncClient(transport=transport) self.password: str = password self.sid: Optional[str] = None self.update_timeout: int = update_timeout self.update_time: Dict[str, datetime] = {} - self.hold_connection: Optional[Task] = None + self.hold_connection: Optional[Task[None]] = None + self.request_timeout: int = request_timeout + self.request_retries: int = request_retries async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity # according to the manual await asyncio.sleep(19 * 60) - self.check_session() + await self.check_session() + + async def _request( + self, + method: str, + url: str, + data: Optional[Dict[str, Any]] = None, + follow_redirects: bool = True, + ) -> Optional[Any]: + for attempt in range(self.request_retries + 1): + try: + return await self.session.request( + method, + url, + data=data, + timeout=self.request_timeout, + follow_redirects=follow_redirects, + ) + except Exception as exc: + if attempt >= self.request_retries: + logger.error("Request failed (%s %s): %s", method, url, exc) + return None + backoff = 2**attempt + logger.warning( + "Request failed (%s %s), retry in %ss: %s", + method, + url, + backoff, + exc, + ) + await asyncio.sleep(backoff) + return None def _calc_challenge_v2(self, challenge: str) -> str: - logging.debug(f"Calculate v2 challenge: {challenge}") + logger.debug("Calculate v2 challenge") chall_regex = re.compile( - r"2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") + r"2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)" + ) - chall_parts = chall_regex.match(challenge).groupdict() + chall_match = chall_regex.match(challenge) + if not chall_match: + logger.error("Invalid Fritz!Box challenge format") + return "" + chall_parts = chall_match.groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) salt2: bytes = bytes.fromhex(chall_parts["salt2"]) iter2: int = int(chall_parts["iter2"]) - hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1) - response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex() + hash1 = hashlib.pbkdf2_hmac("sha256", self.password.encode(), salt1, iter1) + response = ( + salt2.hex() + "$" + hashlib.pbkdf2_hmac("sha256", hash1, salt2, iter2).hex() + ) return response def _calc_challenge_v1(self, challenge: str) -> str: - """ Calculate the response for a challenge using legacy MD5 """ - logging.debug(f"Calculate v1 challenge: {challenge}") + """Calculate the response for a challenge using legacy MD5""" + logger.debug("Calculate v1 challenge") response = f"{challenge}-{self.password}" - response = response.encode("utf_16_le") - response = challenge + "-" + hashlib.md5(response).hexdigest() - return response + response_bytes = response.encode("utf_16_le") + return challenge + "-" + hashlib.md5(response_bytes).hexdigest() - def check_session(self) -> None: - data = { + async def check_session(self) -> None: + data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "overview", "xhrId": "first", - "noMenuRef": 1 + "noMenuRef": 1, } - r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) + if r is None: + return if len(r.history) > 0: - if not self.login(): - logging.error("Failed to login to Fritz!Box") + if not await self.login(): + logger.error("Failed to login to Fritz!Box") else: - logging.debug("Already logged in") + logger.debug("Already logged in") - def login(self) -> bool: - logging.info(f"Login user {self.user} to Fritz!Box") + async def login(self) -> bool: + logger.info("Login user %s to Fritz!Box", self.user) challenge = None - r = self.session.get(f"{self.url}/{self._endpoints['login']}") + login_url = f"{self.url}/{self._endpoints['login']}" + r = await self._request("GET", login_url, follow_redirects=False) + if r is None: + return False + if getattr(r, "status_code", 200) in {301, 302, 303, 307, 308}: + location = getattr(r, "headers", {}).get("Location") + if location: + retry_url, new_base = self._normalize_login_redirect(location) + logger.debug("Login redirect to %s, retry %s", location, retry_url) + r = await self._request("GET", retry_url, follow_redirects=False) + if r is None: + return False + if new_base: + self.url = new_base xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": @@ -91,59 +163,70 @@ class FritzBox: challenge = elem.text elif self.user is None and elem.tag == "Users": for user_elem in elem: - if "fritz" in user_elem.text: + if user_elem.text and "fritz" in user_elem.text: self.user = user_elem.text - assert challenge is not None and self.user is not None + if challenge is None or self.user is None: + raise FritzBoxError("Missing Fritz!Box login challenge or user") if challenge.startswith("2$"): response = self._calc_challenge_v2(challenge) else: response = self._calc_challenge_v1(challenge) + if not response: + raise FritzBoxError("Failed to compute Fritz!Box login response") - data = { - "username": self.user, - "response": response - } - - r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) - logging.debug(r.text) + data = {"username": self.user, "response": response} + r = await self._request("POST", f"{self.url}/{self._endpoints['login']}", data) + if r is None: + return False + logger.debug("Login response received") xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text - logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") - if len(self.sid) != self.sid.count("0"): + sid = self.sid or "" + logger.info("Authenticated Fritz!Box: %s", len(sid) != sid.count("0")) + if len(sid) != sid.count("0"): self.hold_connection = asyncio.create_task(self.hold_connection_alive()) - return len(self.sid) != self.sid.count("0") + return len(sid) != sid.count("0") - def logout(self) -> bool: - logging.info("logout") - data = { + async def logout(self) -> bool: + logger.info("Logout from Fritz!Box") + data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "logout": 1, - "no_sidrenew": ""} - r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) + "no_sidrenew": "", + } + r = await self._request("POST", f"{self.url}/{self._endpoints['logout']}", data) + if r is None: + return False if self.hold_connection is not None: self.hold_connection.cancel() return r.status_code == 200 - def list_devices(self) -> Optional[List[Device]]: - data = { + async def list_devices(self) -> Optional[List[Device]]: + data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh_dev", - "xhrId": "all" + "xhrId": "all", } - r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) - logging.debug(r.text[:100]) + r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) + if r is None: + return None + logger.debug("Devices response received") if len(r.history) > 0: - if self.login(): - r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + if await self.login(): + r = await self._request( + "POST", f"{self.url}/{self._endpoints['data']}", data + ) + if r is None: + return None else: return None devices: List[Device] = [] @@ -152,36 +235,40 @@ class FritzBox: return devices - def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]: + async def get_device_data( + self, idx: Optional[int] = None, name: Optional[str] = None + ) -> Optional[Device]: if idx is None and name is None: - logging.debug("No id or name given") + logger.debug("No id or name given") return None - devices = self.list_devices() - device = None - for device in devices: - if device.id == idx or device.display_name == name: + devices = await self.list_devices() + if devices is None: + return None + found_device: Optional[Device] = None + for candidate in devices: + if candidate.id == idx or candidate.display_name == name: + found_device = candidate break - device = None - if device is None: - logging.debug(f"Device {idx} {name} not found") + if found_device is None: + logger.debug("Device not found for id=%s name=%s", idx, name) return None - return device + return found_device - def set_offset(self, device: Device) -> None: + async def set_offset(self, device: Device) -> None: if self.dry_run: - logging.warning("No updates in dry-run-mode") + logger.warning("Dry-run enabled, skipping update") return - data = { + data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device.id, - "page": "home_auto_hkr_edit" + "page": "home_auto_hkr_edit", } - self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) data = { "xhr": 1, @@ -190,24 +277,63 @@ class FritzBox: "view": "", "back_to_page": "sh_dev", "apply": "", - "oldpage": "/net/home_auto_hkr_edit.lua" + "oldpage": "/net/home_auto_hkr_edit.lua", } data.update(device.to_web_data()) - self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) - def correct_offset(self, device_name: str, real_temp: float): + async def correct_offset(self, device_name: str, real_temp: float) -> None: elapsed = None if device_name in self.update_time.keys(): elapsed = datetime.now() - self.update_time[device_name] - logging.info(f"Last update for {device_name} {elapsed} ago") + logger.debug("Last update for %s %s ago", device_name, elapsed) delta = timedelta(minutes=self.update_timeout) - if device_name not in self.update_time.keys() or elapsed > delta: - device: Optional[Device] = self.get_device_data(name=device_name) + if ( + device_name not in self.update_time.keys() + or elapsed is None + or elapsed > delta + ): + device: Optional[Device] = await self.get_device_data(name=device_name) if device is None: return - new_offset = device.get_offset() + real_temp - device.get_temperature() - logging.info(f"Update offset from {device.get_offset()} to {new_offset}") + current_offset = device.get_offset() + current_temp = device.get_temperature() + if current_offset is None or current_temp is None: + logger.warning( + "Skipping offset update for %s: missing temperature data", + device_name, + ) + return + new_offset = current_offset + real_temp - current_temp + logger.info( + "Update offset for %s from %.2f to %.2f", + device_name, + current_offset, + new_offset, + ) device.set_offset(new_offset) - self.set_offset(device) - self.update_time[device.display_name] = datetime.now() + await self.set_offset(device) + update_key = device.display_name or device_name + self.update_time[update_key] = datetime.now() + else: + logger.debug( + "Skip offset update for %s: last update %s ago (min %s)", + device_name, + elapsed, + delta, + ) + + async def close(self) -> None: + await self.session.aclose() + + def _normalize_login_redirect(self, location: str) -> tuple[str, Optional[str]]: + parsed_location = urlparse(location) + if parsed_location.scheme and parsed_location.netloc: + base = f"{parsed_location.scheme}://{parsed_location.netloc}" + else: + parsed_base = urlparse(self.url) + base = f"{parsed_base.scheme}://{parsed_base.netloc}" + if location.endswith(self._endpoints["login"]): + return location, base + return f"{base}/{self._endpoints['login']}", base diff --git a/fritz_temp_sync/ha_types.py b/fritz_temp_sync/ha_types.py new file mode 100644 index 0000000..89dff23 --- /dev/null +++ b/fritz_temp_sync/ha_types.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from typing import TypedDict + + +class HAAttributes(TypedDict, total=False): + current_temperature: float + friendly_name: str + + +class HAState(TypedDict, total=False): + entity_id: str + state: str + attributes: HAAttributes + + +class HAEventData(TypedDict): + entity_id: str + new_state: HAState + + +class HAEvent(TypedDict): + data: HAEventData diff --git a/fritz_temp_sync/homeassistant.py b/fritz_temp_sync/homeassistant.py index 19f5f72..a35a007 100755 --- a/fritz_temp_sync/homeassistant.py +++ b/fritz_temp_sync/homeassistant.py @@ -3,34 +3,39 @@ from __future__ import annotations import asyncio import json import logging -from asyncio import Queue, Task, Event, Lock -from typing import Callable, Dict, Optional -import websockets +from asyncio import Event, Lock, Queue, Task +from typing import Any, Dict, Optional, cast + +from errors import HomeAssistantError +from ha_types import HAEvent +from websockets.client import connect as ws_connect from websockets.exceptions import InvalidStatusCode +logger = logging.getLogger(__name__) + class HomeAssistantAPI: def __init__(self, token: str, url: str) -> None: self.token = token self.msg_id = 1 self.msg_id_lock = Lock() - self.ws: websockets.WebSocketClientProtocol = None + self.ws: Any = None self.url = url - self.receiver: Optional[Task] = None - self.sender: Optional[Task] = None - self.sending_queue: Queue = Queue() + self.receiver: Optional[Task[Any]] = None + self.sender: Optional[Task[Any]] = None + self.sending_queue: Queue[Dict[str, Any]] = Queue() self.authenticated: Event = Event() - self.events: Dict[int, Queue] = {} - self.responses: Dict[int, Dict] = {} + self.events: Dict[int, Queue[HAEvent]] = {} + self.responses: Dict[int, Dict[str, Any]] = {} self.response_events: Dict[int, Event] = {} self.response_lock: Lock = Lock() async def connect(self): retries = 5 - logging.info("Connect to home assistant...") + logger.info("Connect to Home Assistant") while True: try: - self.ws = await websockets.connect(self.url) + self.ws = await ws_connect(self.url) self.sender = asyncio.create_task(self.sending()) await self.auth() self.receiver = asyncio.create_task(self.receiving()) @@ -39,23 +44,30 @@ class HomeAssistantAPI: if retries > 0: retries -= 1 await asyncio.sleep(30) - logging.info(f"Retry home assistant connection... ({retries})") + logger.warning( + "Retry Home Assistant connection (%s retries left)", retries + ) continue - else: - logging.error("Invalid status code while connecting to Home Assistant") - await self.exit_loop() - return False + logger.error("Invalid status code while connecting to Home Assistant") + await self.exit_loop() + raise HomeAssistantError( + "Invalid status code while connecting to Home Assistant" + ) async def wait_for_close(self): await self.ws.wait_closed() async def receiving(self): - logging.debug("Start receiving") + logger.debug("Start receiving") async for message in self.ws: - msg: Dict = json.loads(message) + msg: Dict[str, Any] = json.loads(cast(str, message)) if msg["type"] == "event": if msg["id"] not in self.events.keys(): - logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}") + logger.warning( + "Received event for not subscribed id: %s %s", + msg["id"], + msg.get("event_type"), + ) continue await self.events[msg["id"]].put(msg["event"]) else: @@ -64,7 +76,7 @@ class HomeAssistantAPI: if msg["id"] in self.response_events.keys(): self.response_events[msg["id"]].set() - async def wait_for(self, idx): + async def wait_for(self, idx: int): async with self.response_lock: if idx in self.responses.keys(): msg = self.responses[idx] @@ -76,7 +88,7 @@ class HomeAssistantAPI: async with self.response_lock: del self.response_events[idx] if idx not in self.responses.keys(): - logging.error("Response ID not found") + logger.warning("Response ID not found") return None msg = self.responses[idx] del self.responses[idx] @@ -88,26 +100,31 @@ class HomeAssistantAPI: if self.receiver is not None: self.receiver.cancel() + async def close(self) -> None: + await self.exit_loop() + if self.ws is not None: + try: + await self.ws.close() + except Exception: + pass + async def auth(self): msg = json.loads(await self.ws.recv()) if msg["type"] != "auth_required": - logging.error("Authentication error: Not required") await self.exit_loop() - response = { - "type": "auth", - "access_token": self.token - } + raise HomeAssistantError("Authentication error: Not required") + response: Dict[str, Any] = {"type": "auth", "access_token": self.token} await self.sending_queue.put(response) msg = json.loads(await self.ws.recv()) if msg["type"] == "auth_invalid": - logging.info("Auth failed") await self.exit_loop() + raise HomeAssistantError("Auth failed") elif msg["type"] == "auth_ok": - logging.debug("Authenticated") + logger.info("Authenticated") self.authenticated.set() else: - logging.error(f"Unknown answer for auth: {msg}") await self.exit_loop() + raise HomeAssistantError(f"Unknown answer for auth: {msg}") async def sending(self): while msg := await self.sending_queue.get(): @@ -116,34 +133,33 @@ class HomeAssistantAPI: async def subscribe_event(self, event_type: str): await self.authenticated.wait() - logging.info(f"Subscribe to {event_type}") + logger.info("Subscribe to %s", event_type) async with self.msg_id_lock: msg_id = self.msg_id - response = { + response: Dict[str, Any] = { "id": msg_id, "type": "subscribe_events", - "event_type": event_type + "event_type": event_type, } self.events[msg_id] = Queue() self.msg_id += 1 await self.sending_queue.put(response) return msg_id - async def get_states(self): + async def get_states(self) -> list[Dict[str, Any]]: await self.authenticated.wait() async with self.msg_id_lock: - message = { - "id": self.msg_id, - "type": "get_states" - } + message: Dict[str, Any] = {"id": self.msg_id, "type": "get_states"} self.msg_id += 1 await self.sending_queue.put(message) - response = await self.wait_for(message["id"]) + response = await self.wait_for(cast(int, message["id"])) # ToDo: Error handling - return response["result"] + if response is None: + return [] + return cast(list[Dict[str, Any]], response.get("result", [])) - async def get_device_state(self, entity_id: str): + async def get_device_state(self, entity_id: str) -> Optional[Dict[str, Any]]: device_states = await self.get_states() for device_state in device_states: if device_state["entity_id"] == entity_id: diff --git a/fritz_temp_sync/logging_config.py b/fritz_temp_sync/logging_config.py new file mode 100644 index 0000000..975b7f3 --- /dev/null +++ b/fritz_temp_sync/logging_config.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +import logging +from typing import Union + + +def configure_logging(level: Union[str, int]) -> None: + numeric_level = level + if isinstance(level, str): + numeric_level = logging.getLevelName(level.upper()) + if isinstance(numeric_level, str): + numeric_level = logging.INFO + logging.basicConfig( + level=numeric_level, + format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", + ) diff --git a/fritz_temp_sync/sync_controller.py b/fritz_temp_sync/sync_controller.py new file mode 100644 index 0000000..04a744f --- /dev/null +++ b/fritz_temp_sync/sync_controller.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Optional, cast + +from config import Mapping +from errors import FritzBoxError, HomeAssistantError +from fritzbox import FritzBox +from ha_types import HAEvent +from homeassistant import HomeAssistantAPI + +logger = logging.getLogger(__name__) + + +class SyncController: + def __init__( + self, ha: HomeAssistantAPI, fb: FritzBox, offset_threshold: float + ) -> None: + self.ha = ha + self.fb = fb + self.offset_threshold = offset_threshold + self.sensor_mappings: dict[str, list[str]] = {} + self.thermostate_mappings: dict[str, str] = {} + self._offset_queue: asyncio.Queue[Optional[tuple[str, float]]] = asyncio.Queue() + self._worker_task: Optional[asyncio.Task[None]] = None + self._event_task: Optional[asyncio.Task[None]] = None + + def load_mappings(self, mappings: list[Mapping]) -> None: + for mapping in mappings: + sensor = mapping.sensor + thermostate = mapping.thermostate + if sensor not in self.sensor_mappings.keys(): + self.sensor_mappings[sensor] = [] + self.sensor_mappings[sensor].append(thermostate) + self.thermostate_mappings[thermostate] = sensor + logger.debug( + "Loaded %s sensor mappings and %s thermostat mappings", + len(self.sensor_mappings), + len(self.thermostate_mappings), + ) + + async def handle_event(self, idx: int): + logger.debug("Wait for events for subscription %s", idx) + + event_queue = cast(Any, self.ha).events[idx] + while event := await event_queue.get(): + event = cast(HAEvent, event) + try: + entity_id = event["data"]["entity_id"] + if not entity_id: + continue + if ( + entity_id in self.sensor_mappings.keys() + or entity_id in self.thermostate_mappings.keys() + ): + state = cast(Any, await self.ha.get_device_state(entity_id)) + if not isinstance(state, dict): + continue + state = cast(dict[str, Any], state) + new_state = cast(dict[str, Any], event["data"]["new_state"]) + logger.debug( + "state_changed for %s (is_thermostat=%s is_sensor=%s state=%s)", + entity_id, + entity_id in self.thermostate_mappings.keys(), + entity_id in self.sensor_mappings.keys(), + state.get("state"), + ) + if ( + entity_id in self.thermostate_mappings.keys() + and state.get("state") != "unavailable" + ): + therm_temp = cast( + float, new_state["attributes"]["current_temperature"] + ) + therm_name = cast(str, new_state["attributes"]["friendly_name"]) + sensor = self.thermostate_mappings[entity_id] + sensor_state = cast(Any, await self.ha.get_device_state(sensor)) + if not isinstance(sensor_state, dict): + continue + sensor_state = cast(dict[str, Any], sensor_state) + sensor_temp = ( + round(float(cast(str, sensor_state["state"])) * 2) / 2 + ) + logger.debug( + "Temps for %s: thermostat=%s sensor=%s", + entity_id, + therm_temp, + sensor_temp, + ) + diff = abs(therm_temp - sensor_temp) + if diff < self.offset_threshold: + logger.debug( + "Offset diff %.2f below threshold %.2f for %s", + diff, + self.offset_threshold, + therm_name, + ) + continue + if therm_temp != sensor_temp: + logger.info( + "Offset correction: %s thermostat=%s sensor=%s (from %s)", + therm_name, + therm_temp, + sensor_temp, + sensor, + ) + await self._enqueue_offset(therm_name, sensor_temp) + + elif entity_id in self.sensor_mappings.keys(): + logger.debug("sensor update for %s", entity_id) + sensor_temp = ( + round(float(cast(str, new_state["state"])) * 2) / 2 + ) + logger.debug( + "thermostats for %s: %s", + entity_id, + self.sensor_mappings[entity_id], + ) + for thermostate in self.sensor_mappings[entity_id]: + logger.debug("check thermostat %s", thermostate) + therm_state = cast( + Any, await self.ha.get_device_state(thermostate) + ) + if not isinstance(therm_state, dict): + continue + therm_state = cast(dict[str, Any], therm_state) + if therm_state.get("state") == "unavailable": + logger.debug("thermostat %s unavailable", thermostate) + continue + therm_temp = float( + cast( + float, + therm_state["attributes"]["current_temperature"], + ) + ) + therm_name = cast( + str, therm_state["attributes"]["friendly_name"] + ) + logger.debug( + "Temps for %s: thermostat=%s sensor=%s", + thermostate, + therm_temp, + sensor_temp, + ) + diff = abs(therm_temp - sensor_temp) + if diff < self.offset_threshold: + logger.debug( + "Offset diff %.2f below threshold %.2f for %s", + diff, + self.offset_threshold, + therm_name, + ) + continue + if therm_temp != sensor_temp: + logger.info( + "Offset correction: %s thermostat=%s sensor=%s (from %s)", + therm_name, + therm_temp, + sensor_temp, + entity_id, + ) + await self._enqueue_offset(therm_name, sensor_temp) + except KeyError: + continue + + async def _enqueue_offset(self, device_name: str, sensor_temp: float) -> None: + await self._offset_queue.put((device_name, sensor_temp)) + + async def _offset_worker(self) -> None: + while True: + item = await self._offset_queue.get() + if item is None: + break + device_name, sensor_temp = item + try: + await self.fb.correct_offset(device_name, sensor_temp) + except FritzBoxError as exc: + logger.error("Offset update failed for %s: %s", device_name, exc) + + async def run(self) -> None: + try: + await self.ha.connect() + except HomeAssistantError as exc: + logger.error("Home Assistant error: %s", exc) + return + logger.debug("Subscribing to state_changed") + state_changed_id = await self.ha.subscribe_event("state_changed") + logger.debug(state_changed_id) + self._event_task = asyncio.create_task(self.handle_event(state_changed_id)) + try: + try: + if not await self.fb.login(): + raise FritzBoxError("Failed to login to Fritz!Box") + except FritzBoxError as exc: + logger.error("Fritz!Box error: %s", exc) + return + logger.info("Health check OK") + self._worker_task = asyncio.create_task(self._offset_worker()) + try: + await self.ha.wait_for_close() + except asyncio.CancelledError: + pass + finally: + await self.shutdown() + + async def shutdown(self) -> None: + if self._event_task is not None: + self._event_task.cancel() + await self._offset_queue.put(None) + if self._worker_task is not None: + await self._worker_task + await self.ha.close() + await self.fb.close() + logger.info("Shutdown complete") diff --git a/fritz_temp_sync/sync_ha_fb.py b/fritz_temp_sync/sync_ha_fb.py index 84c7f50..342af65 100755 --- a/fritz_temp_sync/sync_ha_fb.py +++ b/fritz_temp_sync/sync_ha_fb.py @@ -1,125 +1,69 @@ #!/usr/bin/env python3 import asyncio -import json import logging import os import sys +from config import load_config +from errors import ConfigError from fritzbox import FritzBox from homeassistant import HomeAssistantAPI +from logging_config import configure_logging +from sync_controller import SyncController -sensor_mappings = {} -thermostate_mappings = {} - - -async def handle_event(idx: int): - global ha, fb - logging.debug(f"Wait for events for {idx}") - - while event := await ha.events[idx].get(): - try: - entity_id = event["data"]["entity_id"] - if ( - entity_id in sensor_mappings.keys() - or entity_id in thermostate_mappings.keys() - ): - state = await ha.get_device_state(entity_id) - new_state = event["data"]["new_state"] - logging.info( - f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}" - ) - if ( - entity_id in thermostate_mappings.keys() - and state["state"] != "unavailable" - ): - therm_temp = new_state["attributes"]["current_temperature"] - therm_name = new_state["attributes"]["friendly_name"] - sensor = thermostate_mappings[entity_id] - sensor_state = await ha.get_device_state(sensor) - sensor_temp = round(float(sensor_state["state"]) * 2) / 2 - logging.info(f"temps: {therm_temp} {sensor_temp}") - if therm_temp != sensor_temp: - logging.info( - f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})" - ) - fb.correct_offset(therm_name, sensor_temp) - - elif entity_id in sensor_mappings.keys(): - logging.info(f"here {sensor_mappings} {entity_id}") - logging.info(f"{new_state}") - sensor_temp = round(float(new_state["state"]) * 2) / 2 - logging.info(f"entry: {sensor_mappings[entity_id]}") - for thermostate in sensor_mappings[entity_id]: - logging.info(thermostate) - therm_state = await ha.get_device_state(thermostate) - logging.info(f"{thermostate} {therm_state}") - if therm_state["state"] == "unavailable": - continue - therm_temp = float( - therm_state["attributes"]["current_temperature"] - ) - therm_name = therm_state["attributes"]["friendly_name"] - logging.info(f"Temps: {therm_temp} {sensor_temp}") - if therm_temp != sensor_temp: - logging.info( - f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})" - ) - fb.correct_offset(therm_name, sensor_temp) - except KeyError: - pass - - -async def init(ha: HomeAssistantAPI, fb: FritzBox): - if not await ha.connect(): - return - logging.debug("Subscribe") - state_changed_id = await ha.subscribe_event("state_changed") - logging.debug(state_changed_id) - asyncio.create_task(handle_event(state_changed_id)) - fb.login() - await ha.wait_for_close() - logging.info("Websocket closed, shutting down..") +logger = logging.getLogger(__name__) async def main(): config_path = sys.argv[1] - config = json.load(open(config_path)) - level = logging.INFO - if "log_level" in config: - print(f"Setting log_level {config['log_level']}") - if config["log_level"] == "DEBUG": - level = logging.DEBUG - logging.basicConfig( - level=level, format="[%(asctime)s] [%(levelname)s] %(message)s" - ) - logging.debug(config) + try: + config = load_config(config_path) + except ConfigError as exc: + logging.basicConfig(level=logging.ERROR) + logging.error("Invalid config: %s", exc) + return + configure_logging(config.log_level) + logger.debug("Loaded config") + if not config.log_ws_messages: + logging.getLogger("websockets").setLevel(logging.WARNING) + logging.getLogger("websockets.client").setLevel(logging.WARNING) + logging.getLogger("websockets.server").setLevel(logging.WARNING) + if not config.log_http_requests: + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + + fritzbox = config.fritzbox - global fb fb = FritzBox( - url=config["fritzbox"]["url"], - user=config["fritzbox"]["username"], - password=config["fritzbox"]["password"], - update_timeout=config["update_timeout"], - dry_run=False, + url=fritzbox.url, + user=fritzbox.username, + password=fritzbox.password, + update_timeout=config.update_timeout, + dry_run=config.dry_run, + force_ipv4=config.force_ipv4, + request_timeout=config.request_timeout, + request_retries=config.request_retries, ) supervisor_url = "ws://supervisor/core/websocket" if "SUPERVISOR_URL" in os.environ: supervisor_url = os.environ["SUPERVISOR_URL"] - supervisor_token = os.environ["SUPERVISOR_TOKEN"] - global ha + supervisor_token = os.environ.get("SUPERVISOR_TOKEN") + if not supervisor_token: + logger.error("Missing SUPERVISOR_TOKEN; cannot connect to Home Assistant") + return ha = HomeAssistantAPI(supervisor_token, supervisor_url) - - for mapping in config["mappings"]: - if mapping["sensor"] not in sensor_mappings.keys(): - sensor_mappings[mapping["sensor"]] = [] - sensor_mappings[mapping["sensor"]].append(mapping["thermostate"]) - thermostate_mappings[mapping["thermostate"]] = mapping["sensor"] - logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}") + controller = SyncController(ha, fb, config.offset_threshold) + controller.load_mappings(config.mappings) try: - await init(ha, fb) - except KeyboardInterrupt: - pass + logger.info("Starting sync controller") + await controller.run() + except (KeyboardInterrupt, asyncio.CancelledError): + logger.info("Shutdown requested") + await controller.shutdown() -asyncio.run(main()) +try: + asyncio.run(main()) +except KeyboardInterrupt: + pass diff --git a/fritz_temp_sync/tests/test_config.py b/fritz_temp_sync/tests/test_config.py new file mode 100644 index 0000000..1c73ec9 --- /dev/null +++ b/fritz_temp_sync/tests/test_config.py @@ -0,0 +1,38 @@ +# pyright: reportMissingImports=false +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest # type: ignore[import-not-found] + +ROOT = Path(__file__).resolve().parents[1] +sys.path.append(str(ROOT)) + +from config import _parse_mappings, load_config # type: ignore[import-not-found] +from errors import ConfigError # type: ignore[import-not-found] + + +def test_parse_mappings_rejects_duplicate_thermostat(): + with pytest.raises(ConfigError): + _parse_mappings( + [ + {"sensor": "sensor.a", "thermostate": "climate.x"}, + {"sensor": "sensor.b", "thermostate": "climate.x"}, + ] + ) + + +def test_load_config_parses_dry_run(tmp_path: Path): + payload = { + "fritzbox": {"url": "http://fritz.box", "password": "secret"}, + "mappings": [{"sensor": "sensor.a", "thermostate": "climate.x"}], + "update_timeout": 15, + "dry_run": True, + } + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps(payload), encoding="utf-8") + + config = load_config(str(config_path)) + assert config.dry_run is True