from __future__ import annotations import asyncio import hashlib import json import logging import re import xml.etree.ElementTree as ET from asyncio import Task from datetime import datetime, timedelta from typing import Optional, Dict, List, Union import requests from bs4 import BeautifulSoup from device import Device class FritzBox: def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False, verify_ssl: bool = True, old_fb: bool = False) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", "data": "data.lua", "device_details": "net/home_auto_hkr_edit.lua" } self.url: str = url self.dry_run: bool = dry_run self.user: Optional[str] = user self.session: requests.Session = requests.Session() self.password: str = password self.sid: Optional[str] = None self.update_timeout: int = update_timeout self.update_time: Dict[str, datetime] = {} self.hold_connection: Optional[Task] = None self.verify_ssl = verify_ssl self.old_fb = old_fb if not verify_ssl: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity # according to the manual await asyncio.sleep(19 * 60) self.check_session() def _calc_challenge_v2(self, challenge: str) -> str: logging.debug(f"Calculate v2 challenge: {challenge}") chall_regex = re.compile( "2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") chall_parts = chall_regex.match(challenge).groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) salt2: bytes = bytes.fromhex(chall_parts["salt2"]) iter2: int = int(chall_parts["iter2"]) hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1) response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex() return response def _calc_challenge_v1(self, challenge: str) -> str: """ Calculate the response for a challenge using legacy MD5 """ logging.debug(f"Calculate v1 challenge: {challenge}") response = f"{challenge}-{self.password}" response = response.encode("utf_16_le") response = challenge + "-" + hashlib.md5(response).hexdigest() return response def check_session(self) -> None: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "overview", "xhrId": "first", "noMenuRef": 1 } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) if len(r.history) > 0: if not self.login(): logging.error("Failed to login to Fritz!Box") else: logging.debug("Already logged in") def login(self) -> bool: logging.info(f"Login user {self.user} to Fritz!Box") challenge = None r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl) xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text elif elem.tag == "Challenge": challenge = elem.text elif self.user is None and elem.tag == "Users": for user_elem in elem: if "fritz" in user_elem.text: self.user = user_elem.text assert challenge is not None and self.user is not None if challenge.startswith("2$"): response = self._calc_challenge_v2(challenge) else: response = self._calc_challenge_v1(challenge) data = { "username": self.user, "response": response } r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl) logging.debug(r.text) xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") if len(self.sid) != self.sid.count("0"): self.hold_connection = asyncio.create_task(self.hold_connection_alive()) return len(self.sid) != self.sid.count("0") def logout(self) -> bool: logging.info("logout") data = { "xhr": 1, "sid": self.sid, "logout": 1, "no_sidrenew": ""} r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl) if self.hold_connection is not None: self.hold_connection.cancel() return r.status_code == 200 def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh", "xhrId": "all" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) logging.debug(r.text[:100]) if len(r.history) > 0: if self.login(): r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) else: return None devices_overview_raw = BeautifulSoup(r.text, "html.parser") table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table devices_raw = [] for r in table_rows.find_all("tr"): name = r.findAll("td", {'class': ['name', 'cut_overflow']}) button = r.findAll("td", {'class': 'btncolumn'}) temperature = r.findAll("td", {'class': 'temperature'}) if name is None or len(name) < 1: continue if button is None or len(button) < 1: continue if temperature is None or len(temperature) < 1: continue name = name[0].string id = int(button[0].button["value"]) print("Temperature raw:", temperature) try: temperature = float(temperature[0].text.split(" ")[0].replace(",", ".")) except: print("Error parsing temperature.") return [] request_data = { "device": id, "sid": self.sid, "xhr": 1 } r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data, verify=self.verify_ssl) device_content_raw = BeautifulSoup(r.text, "html.parser") offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"]) devices_raw.append({ "name": name, "display_name": name, "id": id, "temperature": temperature, "offset": offset }) return devices_raw def list_devices(self) -> Optional[List[Device]]: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh_dev", "xhrId": "all" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) logging.debug(r.text[:100]) if len(r.history) > 0: if self.login(): r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) else: return None devices: List[Device] = [] for device in json.loads(r.text)["data"]["devices"]: devices.append(Device(device)) return devices def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]: if idx is None and name is None: logging.debug("No id or name given") return None if self.old_fb: devices = self.list_devices_old() else: devices = self.list_devices() device = None for device in devices: if isinstance(device, dict): if device["id"] == idx or device["display_name"] == name: break else: if device.id == idx or device.display_name == name: break device = None if device is None: logging.debug(f"Device {idx} {name} not found") return None return device def set_offset(self, device: Union[Device, Dict]) -> None: if self.dry_run: logging.warning("No updates in dry-run-mode") return old_type = isinstance(device, dict) if old_type: device_id = device["id"] else: device_id = device.id if not old_type: data = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device_id, "page": "home_auto_hkr_edit" } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) data = { "xhr": 1, "sid": self.sid, "lang": "de", "view": "", "back_to_page": "sh_dev", "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua" } data.update(device.to_web_data()) else: data = { "xhr": 1, "sid": self.sid, "lang": "de", "no_sidrenew": "", "device": device_id, "view": "", "back_to_page": "/net/home_auto_overview.lua", "ule_device_name": device["name"], "Offset": device["offset"], "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua", } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl) def correct_offset(self, device_name: str, real_temp: float): elapsed = None if device_name in self.update_time.keys(): elapsed = datetime.now() - self.update_time[device_name] logging.info(f"Last update for {device_name} {elapsed} ago") delta = timedelta(minutes=self.update_timeout) if device_name not in self.update_time.keys() or elapsed > delta: device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name) logging.info(f"device: {device}") if device is None: return offset_changed = False if self.old_fb: new_offset = device["offset"] + real_temp - device["temperature"] offset_changed = new_offset != device["offset"] logging.info(f"Update offset from {device['offset']} to {new_offset} (changed: {offset_changed})") else: new_offset = device.get_offset() + real_temp - device.get_temperature() offset_changed = new_offset != device.get_offset() logging.info(f"Update offset from {device.get_offset()} to {new_offset} (changed: {offset_changed})") if offset_changed: if self.old_fb: device["offset"] = new_offset else: device.set_offset(new_offset) self.set_offset(device) self.update_time[device_name] = datetime.now()