import asyncio from asyncio import Task from datetime import datetime, timedelta from typing import Optional, Tuple, Dict import requests import json import re import hashlib import xml.etree.ElementTree as ET import logging class FritzBox: def __init__(self, url: str, password: str, user: str = None, dry_run: bool = False) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", "data": "data.lua" } self.url: str = url self.dry_run: bool = dry_run self.user: Optional[str] = user self.session: requests.Session = requests.Session() self.password: str = password self.sid: Optional[str] = None self.update_time: Dict[str, datetime] = {} self.hold_connection: Optional[Task] = None async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity await asyncio.sleep(19*60) self.check_session() def _calc_challenge_v2(self, challenge: str) -> str: logging.debug(f"Calculate v2 challenge: {challenge}") chall_regex = re.compile( "2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") chall_parts = chall_regex.match(challenge).groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) salt2: bytes = bytes.fromhex(chall_parts["salt2"]) iter2: int = int(chall_parts["iter2"]) hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1) response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex() return response def _calc_challenge_v1(self, challenge: str) -> str: """ Calculate the response for a challenge using legacy MD5 """ logging.debug(f"Calculate v1 challenge: {challenge}") response = f"{challenge}-{self.password}" response = response.encode("utf_16_le") response = challenge + "-" + hashlib.md5(response).hexdigest() return response def check_session(self) -> None: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "overview", "xhrId": "first", "noMenuRef": 1 } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) if len(r.history) > 0: if not self.login(): logging.error("Failed to login to Fritz!Box") else: logging.info("Already logged in") def login(self, user: str = None) -> bool: logging.info(f"Login user {user} to Fritz!Box") challenge = None r = self.session.get(f"{self.url}/{self._endpoints['login']}") xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text elif elem.tag == "Challenge": challenge = elem.text elif user is None and elem.tag == "Users": for user_elem in elem: if "fritz" in user_elem.text: user = user_elem.text assert challenge is not None and user is not None if challenge.startswith("2$"): response = self._calc_challenge_v2(challenge) else: response = self._calc_challenge_v1(challenge) data = { "username": user, "response": response } r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) logging.debug(r.text) xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") if len(self.sid) != self.sid.count("0"): self.hold_connection = asyncio.create_task(self.hold_connection_alive()) return len(self.sid) != self.sid.count("0") def logout(self) -> bool: logging.info("logout") data = { "xhr": 1, "sid": self.sid, "logout": 1, "no_sidrenew": ""} r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) if self.hold_connection is not None: self.hold_connection.cancel() return r.status_code == 200 def list_devices(self) -> Optional[Dict]: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh_dev", "xhrId": "all" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) logging.debug(r.text[:100]) if len(r.history) > 0: if self.login(): r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) else: return None devices = json.loads(r.text)["data"]["devices"] return devices def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]: if id is None and name is None: logging.debug("No id or name given") return None devices = self.list_devices() device = None for device in devices: if device["id"] == id or device["displayName"] == name: break device = None if device is None: logging.debug(f"Device {id} {name} not found") return None current_temp = None current_offset = None for unit in device["units"]: if unit["type"] == "TEMPERATURE_SENSOR": for skill in unit["skills"]: if skill["type"] == "SmartHomeTemperatureSensor": current_temp = float(skill["currentInCelsius"]) current_offset = float(skill["offset"]) return current_temp, current_offset, device["id"], device["displayName"] def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str) -> None: if self.dry_run: logging.warning("No updates in dry-run-mode") return data = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device_id, "page": "home_auto_hkr_edit" } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) data = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device_id, "view": "", "back_to_page": "sh_dev", "ule_device_name": device_name, "WindowOpenTrigger": 8, "WindowOpenTimer": 10, "tempsensor": "own", "Roomtemp": f"{current_temp}", "ExtTempsensorID": "tochoose", "Offset": f"{offset}", "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua" } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) def correct_offset(self, device_name: str, real_temp: float): if device_name in self.update_time.keys(): logging.info(f"Last update for {device_name} {datetime.now() - self.update_time[device_name]} ago") delta = timedelta(minutes=5) if device_name not in self.update_time.keys() or (datetime.now() - self.update_time[device_name]) > delta: current_temp, current_offset, idx, name = self.get_device_data(name=device_name) new_offset = current_offset + real_temp - current_temp logging.info(f"Update offset from {current_offset} to {new_offset}") self.set_offset(current_temp, new_offset, idx, device_name) self.update_time[device_name] = datetime.now()