from __future__ import annotations import asyncio import hashlib import json import logging import re import xml.etree.ElementTree as ET from asyncio import Task from datetime import datetime, timedelta from typing import Optional, Dict, List import requests from device import Device class FritzBox: def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", "data": "data.lua" } self.url: str = url self.dry_run: bool = dry_run self.user: Optional[str] = user self.session: requests.Session = requests.Session() self.password: str = password self.sid: Optional[str] = None self.update_timeout: int = update_timeout self.update_time: Dict[str, datetime] = {} self.hold_connection: Optional[Task] = None async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity # according to the manual await asyncio.sleep(19 * 60) self.check_session() def _calc_challenge_v2(self, challenge: str) -> str: logging.debug(f"Calculate v2 challenge: {challenge}") chall_regex = re.compile( "2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") chall_parts = chall_regex.match(challenge).groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) salt2: bytes = bytes.fromhex(chall_parts["salt2"]) iter2: int = int(chall_parts["iter2"]) hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1) response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex() return response def _calc_challenge_v1(self, challenge: str) -> str: """ Calculate the response for a challenge using legacy MD5 """ logging.debug(f"Calculate v1 challenge: {challenge}") response = f"{challenge}-{self.password}" response = response.encode("utf_16_le") response = challenge + "-" + hashlib.md5(response).hexdigest() return response def check_session(self) -> None: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "overview", "xhrId": "first", "noMenuRef": 1 } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) if len(r.history) > 0: if not self.login(): logging.error("Failed to login to Fritz!Box") else: logging.debug("Already logged in") def login(self) -> bool: logging.info(f"Login user {self.user} to Fritz!Box") challenge = None r = self.session.get(f"{self.url}/{self._endpoints['login']}") xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text elif elem.tag == "Challenge": challenge = elem.text elif self.user is None and elem.tag == "Users": for user_elem in elem: if "fritz" in user_elem.text: self.user = user_elem.text assert challenge is not None and self.user is not None if challenge.startswith("2$"): response = self._calc_challenge_v2(challenge) else: response = self._calc_challenge_v1(challenge) data = { "username": self.user, "response": response } r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) logging.debug(r.text) xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") if len(self.sid) != self.sid.count("0"): self.hold_connection = asyncio.create_task(self.hold_connection_alive()) return len(self.sid) != self.sid.count("0") def logout(self) -> bool: logging.info("logout") data = { "xhr": 1, "sid": self.sid, "logout": 1, "no_sidrenew": ""} r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) if self.hold_connection is not None: self.hold_connection.cancel() return r.status_code == 200 def list_devices(self) -> Optional[List[Device]]: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh_dev", "xhrId": "all" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) logging.debug(r.text[:100]) if len(r.history) > 0: if self.login(): r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) else: return None devices: List[Device] = [] for device in json.loads(r.text)["data"]["devices"]: devices.append(Device(device)) return devices def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]: if idx is None and name is None: logging.debug("No id or name given") return None devices = self.list_devices() device = None for device in devices: if device.id == idx or device.display_name == name: break device = None if device is None: logging.debug(f"Device {idx} {name} not found") return None return device def set_offset(self, device: Device) -> None: if self.dry_run: logging.warning("No updates in dry-run-mode") return data = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device.id, "page": "home_auto_hkr_edit" } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) data = { "xhr": 1, "sid": self.sid, "lang": "de", "view": "", "back_to_page": "sh_dev", "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua" } data.update(device.to_web_data()) self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) def correct_offset(self, device_name: str, real_temp: float): elapsed = None if device_name in self.update_time.keys(): elapsed = datetime.now() - self.update_time[device_name] logging.info(f"Last update for {device_name} {elapsed} ago") delta = timedelta(minutes=self.update_timeout) if device_name not in self.update_time.keys() or elapsed > delta: device: Optional[Device] = self.get_device_data(name=device_name) if device is None: return new_offset = device.get_offset() + real_temp - device.get_temperature() logging.info(f"Update offset from {device.get_offset()} to {new_offset}") device.set_offset(new_offset) self.set_offset(device) self.update_time[device.display_name] = datetime.now()