HOHA-Addons/fritz_temp_sync/sync_controller.py
2026-01-25 02:37:07 +01:00

216 lines
9.4 KiB
Python

from __future__ import annotations
import asyncio
import logging
from typing import Any, Optional, cast
from config import Mapping
from errors import FritzBoxError, HomeAssistantError
from fritzbox import FritzBox
from ha_types import HAEvent
from homeassistant import HomeAssistantAPI
logger = logging.getLogger(__name__)
class SyncController:
def __init__(
self, ha: HomeAssistantAPI, fb: FritzBox, offset_threshold: float
) -> None:
self.ha = ha
self.fb = fb
self.offset_threshold = offset_threshold
self.sensor_mappings: dict[str, list[str]] = {}
self.thermostate_mappings: dict[str, str] = {}
self._offset_queue: asyncio.Queue[Optional[tuple[str, float]]] = asyncio.Queue()
self._worker_task: Optional[asyncio.Task[None]] = None
self._event_task: Optional[asyncio.Task[None]] = None
def load_mappings(self, mappings: list[Mapping]) -> None:
for mapping in mappings:
sensor = mapping.sensor
thermostate = mapping.thermostate
if sensor not in self.sensor_mappings.keys():
self.sensor_mappings[sensor] = []
self.sensor_mappings[sensor].append(thermostate)
self.thermostate_mappings[thermostate] = sensor
logger.debug(
"Loaded %s sensor mappings and %s thermostat mappings",
len(self.sensor_mappings),
len(self.thermostate_mappings),
)
async def handle_event(self, idx: int):
logger.debug("Wait for events for subscription %s", idx)
event_queue = cast(Any, self.ha).events[idx]
while event := await event_queue.get():
event = cast(HAEvent, event)
try:
entity_id = event["data"]["entity_id"]
if not entity_id:
continue
if (
entity_id in self.sensor_mappings.keys()
or entity_id in self.thermostate_mappings.keys()
):
state = cast(Any, await self.ha.get_device_state(entity_id))
if not isinstance(state, dict):
continue
state = cast(dict[str, Any], state)
new_state = cast(dict[str, Any], event["data"]["new_state"])
logger.debug(
"state_changed for %s (is_thermostat=%s is_sensor=%s state=%s)",
entity_id,
entity_id in self.thermostate_mappings.keys(),
entity_id in self.sensor_mappings.keys(),
state.get("state"),
)
if (
entity_id in self.thermostate_mappings.keys()
and state.get("state") != "unavailable"
):
therm_temp = cast(
float, new_state["attributes"]["current_temperature"]
)
therm_name = cast(str, new_state["attributes"]["friendly_name"])
sensor = self.thermostate_mappings[entity_id]
sensor_state = cast(Any, await self.ha.get_device_state(sensor))
if not isinstance(sensor_state, dict):
continue
sensor_state = cast(dict[str, Any], sensor_state)
sensor_temp = (
round(float(cast(str, sensor_state["state"])) * 2) / 2
)
logger.debug(
"Temps for %s: thermostat=%s sensor=%s",
entity_id,
therm_temp,
sensor_temp,
)
diff = abs(therm_temp - sensor_temp)
if diff < self.offset_threshold:
logger.debug(
"Offset diff %.2f below threshold %.2f for %s",
diff,
self.offset_threshold,
therm_name,
)
continue
if therm_temp != sensor_temp:
logger.info(
"Offset correction: %s thermostat=%s sensor=%s (from %s)",
therm_name,
therm_temp,
sensor_temp,
sensor,
)
await self._enqueue_offset(therm_name, sensor_temp)
elif entity_id in self.sensor_mappings.keys():
logger.debug("sensor update for %s", entity_id)
sensor_temp = (
round(float(cast(str, new_state["state"])) * 2) / 2
)
logger.debug(
"thermostats for %s: %s",
entity_id,
self.sensor_mappings[entity_id],
)
for thermostate in self.sensor_mappings[entity_id]:
logger.debug("check thermostat %s", thermostate)
therm_state = cast(
Any, await self.ha.get_device_state(thermostate)
)
if not isinstance(therm_state, dict):
continue
therm_state = cast(dict[str, Any], therm_state)
if therm_state.get("state") == "unavailable":
logger.debug("thermostat %s unavailable", thermostate)
continue
therm_temp = float(
cast(
float,
therm_state["attributes"]["current_temperature"],
)
)
therm_name = cast(
str, therm_state["attributes"]["friendly_name"]
)
logger.debug(
"Temps for %s: thermostat=%s sensor=%s",
thermostate,
therm_temp,
sensor_temp,
)
diff = abs(therm_temp - sensor_temp)
if diff < self.offset_threshold:
logger.debug(
"Offset diff %.2f below threshold %.2f for %s",
diff,
self.offset_threshold,
therm_name,
)
continue
if therm_temp != sensor_temp:
logger.info(
"Offset correction: %s thermostat=%s sensor=%s (from %s)",
therm_name,
therm_temp,
sensor_temp,
entity_id,
)
await self._enqueue_offset(therm_name, sensor_temp)
except KeyError:
continue
async def _enqueue_offset(self, device_name: str, sensor_temp: float) -> None:
await self._offset_queue.put((device_name, sensor_temp))
async def _offset_worker(self) -> None:
while True:
item = await self._offset_queue.get()
if item is None:
break
device_name, sensor_temp = item
try:
await self.fb.correct_offset(device_name, sensor_temp)
except FritzBoxError as exc:
logger.error("Offset update failed for %s: %s", device_name, exc)
async def run(self) -> None:
try:
await self.ha.connect()
except HomeAssistantError as exc:
logger.error("Home Assistant error: %s", exc)
return
logger.debug("Subscribing to state_changed")
state_changed_id = await self.ha.subscribe_event("state_changed")
logger.debug(state_changed_id)
self._event_task = asyncio.create_task(self.handle_event(state_changed_id))
try:
try:
if not await self.fb.login():
raise FritzBoxError("Failed to login to Fritz!Box")
except FritzBoxError as exc:
logger.error("Fritz!Box error: %s", exc)
return
logger.info("Health check OK")
self._worker_task = asyncio.create_task(self._offset_worker())
try:
await self.ha.wait_for_close()
except asyncio.CancelledError:
pass
finally:
await self.shutdown()
async def shutdown(self) -> None:
if self._event_task is not None:
self._event_task.cancel()
await self._offset_queue.put(None)
if self._worker_task is not None:
await self._worker_task
await self.ha.close()
await self.fb.close()
logger.info("Shutdown complete")