from __future__ import annotations import asyncio import hashlib import json import logging import re import xml.etree.ElementTree as ET from asyncio import Task from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from urllib.parse import urlparse import httpx # type: ignore[import-not-found] from device import Device from errors import FritzBoxError logger = logging.getLogger(__name__) class FritzBox: def __init__( self, url: str, password: str, update_timeout: int, user: Optional[str] = None, dry_run: bool = False, force_ipv4: bool = False, request_timeout: int = 10, request_retries: int = 2, ) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", "data": "data.lua", } self.url: str = url self.dry_run: bool = dry_run self.user: Optional[str] = user if force_ipv4: transport = httpx.AsyncHTTPTransport(local_address="0.0.0.0") else: transport = None self.session: httpx.AsyncClient = httpx.AsyncClient(transport=transport) self.password: str = password self.sid: Optional[str] = None self.update_timeout: int = update_timeout self.update_time: Dict[str, datetime] = {} self.hold_connection: Optional[Task[None]] = None self.request_timeout: int = request_timeout self.request_retries: int = request_retries async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity # according to the manual await asyncio.sleep(19 * 60) await self.check_session() async def _request( self, method: str, url: str, data: Optional[Dict[str, Any]] = None, follow_redirects: bool = True, ) -> Optional[Any]: for attempt in range(self.request_retries + 1): try: return await self.session.request( method, url, data=data, timeout=self.request_timeout, follow_redirects=follow_redirects, ) except Exception as exc: if attempt >= self.request_retries: logger.error("Request failed (%s %s): %s", method, url, exc) return None backoff = 2**attempt logger.warning( "Request failed (%s %s), retry in %ss: %s", method, url, backoff, exc, ) await asyncio.sleep(backoff) return None def _calc_challenge_v2(self, challenge: str) -> str: logger.debug("Calculate v2 challenge") chall_regex = re.compile( r"2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)" ) chall_match = chall_regex.match(challenge) if not chall_match: logger.error("Invalid Fritz!Box challenge format") return "" chall_parts = chall_match.groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) salt2: bytes = bytes.fromhex(chall_parts["salt2"]) iter2: int = int(chall_parts["iter2"]) hash1 = hashlib.pbkdf2_hmac("sha256", self.password.encode(), salt1, iter1) response = ( salt2.hex() + "$" + hashlib.pbkdf2_hmac("sha256", hash1, salt2, iter2).hex() ) return response def _calc_challenge_v1(self, challenge: str) -> str: """Calculate the response for a challenge using legacy MD5""" logger.debug("Calculate v1 challenge") response = f"{challenge}-{self.password}" response_bytes = response.encode("utf_16_le") return challenge + "-" + hashlib.md5(response_bytes).hexdigest() async def check_session(self) -> None: data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "overview", "xhrId": "first", "noMenuRef": 1, } r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) if r is None: return if len(r.history) > 0: if not await self.login(): logger.error("Failed to login to Fritz!Box") else: logger.debug("Already logged in") async def login(self) -> bool: logger.info("Login user %s to Fritz!Box", self.user) challenge = None login_url = f"{self.url}/{self._endpoints['login']}" r = await self._request("GET", login_url, follow_redirects=False) if r is None: return False if getattr(r, "status_code", 200) in {301, 302, 303, 307, 308}: location = getattr(r, "headers", {}).get("Location") if location: retry_url, new_base = self._normalize_login_redirect(location) logger.debug("Login redirect to %s, retry %s", location, retry_url) r = await self._request("GET", retry_url, follow_redirects=False) if r is None: return False if new_base: self.url = new_base xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text elif elem.tag == "Challenge": challenge = elem.text elif self.user is None and elem.tag == "Users": for user_elem in elem: if user_elem.text and "fritz" in user_elem.text: self.user = user_elem.text if challenge is None or self.user is None: raise FritzBoxError("Missing Fritz!Box login challenge or user") if challenge.startswith("2$"): response = self._calc_challenge_v2(challenge) else: response = self._calc_challenge_v1(challenge) if not response: raise FritzBoxError("Failed to compute Fritz!Box login response") data = {"username": self.user, "response": response} r = await self._request("POST", f"{self.url}/{self._endpoints['login']}", data) if r is None: return False logger.debug("Login response received") xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text sid = self.sid or "" logger.info("Authenticated Fritz!Box: %s", len(sid) != sid.count("0")) if len(sid) != sid.count("0"): self.hold_connection = asyncio.create_task(self.hold_connection_alive()) return len(sid) != sid.count("0") async def logout(self) -> bool: logger.info("Logout from Fritz!Box") data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "logout": 1, "no_sidrenew": "", } r = await self._request("POST", f"{self.url}/{self._endpoints['logout']}", data) if r is None: return False if self.hold_connection is not None: self.hold_connection.cancel() return r.status_code == 200 async def list_devices(self) -> Optional[List[Device]]: data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh_dev", "xhrId": "all", } r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) if r is None: return None logger.debug("Devices response received") if len(r.history) > 0: if await self.login(): r = await self._request( "POST", f"{self.url}/{self._endpoints['data']}", data ) if r is None: return None else: return None devices: List[Device] = [] for device in json.loads(r.text)["data"]["devices"]: devices.append(Device(device)) return devices async def get_device_data( self, idx: Optional[int] = None, name: Optional[str] = None ) -> Optional[Device]: if idx is None and name is None: logger.debug("No id or name given") return None devices = await self.list_devices() if devices is None: return None found_device: Optional[Device] = None for candidate in devices: if candidate.id == idx or candidate.display_name == name: found_device = candidate break if found_device is None: logger.debug("Device not found for id=%s name=%s", idx, name) return None return found_device async def set_offset(self, device: Device) -> None: if self.dry_run: logger.warning("Dry-run enabled, skipping update") return data: Dict[str, Any] = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device.id, "page": "home_auto_hkr_edit", } await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) data = { "xhr": 1, "sid": self.sid, "lang": "de", "view": "", "back_to_page": "sh_dev", "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua", } data.update(device.to_web_data()) await self._request("POST", f"{self.url}/{self._endpoints['data']}", data) async def correct_offset(self, device_name: str, real_temp: float) -> None: elapsed = None if device_name in self.update_time.keys(): elapsed = datetime.now() - self.update_time[device_name] logger.debug("Last update for %s %s ago", device_name, elapsed) delta = timedelta(minutes=self.update_timeout) if ( device_name not in self.update_time.keys() or elapsed is None or elapsed > delta ): device: Optional[Device] = await self.get_device_data(name=device_name) if device is None: return current_offset = device.get_offset() current_temp = device.get_temperature() if current_offset is None or current_temp is None: logger.warning( "Skipping offset update for %s: missing temperature data", device_name, ) return new_offset = current_offset + real_temp - current_temp logger.info( "Update offset for %s from %.2f to %.2f", device_name, current_offset, new_offset, ) device.set_offset(new_offset) await self.set_offset(device) update_key = device.display_name or device_name self.update_time[update_key] = datetime.now() else: logger.debug( "Skip offset update for %s: last update %s ago (min %s)", device_name, elapsed, delta, ) async def close(self) -> None: await self.session.aclose() def _normalize_login_redirect(self, location: str) -> tuple[str, Optional[str]]: parsed_location = urlparse(location) if parsed_location.scheme and parsed_location.netloc: base = f"{parsed_location.scheme}://{parsed_location.netloc}" else: parsed_base = urlparse(self.url) base = f"{parsed_base.scheme}://{parsed_base.netloc}" if location.endswith(self._endpoints["login"]): return location, base return f"{base}/{self._endpoints['login']}", base