from __future__ import annotations import asyncio import hashlib import json import logging import re import typing import xml.etree.ElementTree as ET from abc import ABC, abstractmethod from asyncio import Task from datetime import datetime, timedelta from enum import IntFlag, Enum, auto from typing import Optional, Tuple, Dict, List, Union, TypedDict from urllib.parse import urlencode import requests class WeekDay(IntFlag): MON = 0b1 TUE = 0b10 WED = 0b100 THU = 0b1000 FRI = 0b10000 SAT = 0b100000 SUN = 0b1000000 class Manufacturer: def __init__(self, name: str): self.name: str = name def __repr__(self): return f"name: {self.name}" def to_json(self): return {"name": self.name} @staticmethod def parse_dict(manufacturer: Dict): return Manufacturer(manufacturer["name"]) class FirmwareVersion: def __init__(self, search: bool, current: str, update: bool, running: bool): self.search: bool = search self.current: str = current self.update: bool = update self.running: bool = running def __repr__(self): return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}" def to_json(self): return {"search": self.search, "current": self.current, "update": self.update, "running": self.running} @staticmethod def parse_dict(firmware_version: dict): return FirmwareVersion(firmware_version["search"], firmware_version["current"], firmware_version["update"], firmware_version["running"]) class PushService: def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool): self.mail_address: str = mail_address self.unit_settings: List = unit_settings self.is_enabled: bool = is_enabled def __repr__(self): return f"" def to_json(self): return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled} @staticmethod def parse_dict(push_service: Dict): return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"]) class SkillType(Enum): SmartHomeTemperatureSensor = auto() SmartHomeThermostat = auto() SmartHomeBattery = auto() class Skill(ABC): def __init__(self, skill_type: SkillType): self.type: SkillType = skill_type @abstractmethod def to_json(self): pass @abstractmethod def to_web_data(self, device_id: int): pass @staticmethod def parse_dict(skill: Dict): skill_type = SkillType[skill["type"]] if skill_type == SkillType.SmartHomeTemperatureSensor: return TemperatureSkill.parse_dict(skill) elif skill_type == SkillType.SmartHomeThermostat: return ThermostatSkill.parse_dict(skill) elif skill_type == SkillType.SmartHomeBattery: return BatterySkill.parse_dict(skill) else: raise NotImplementedError(skill_type) class PresetName(Enum): LOWER_TEMPERATURE = auto() UPPER_TEMPERATURE = auto() HOLIDAY_TEMPERATURE = auto() class Preset: def __init__(self, name: PresetName, temperature: int): self.name: PresetName = name self.temperature: int = temperature def __repr__(self): return f"" def to_json(self): return {"name": self.name.name, "temperature": self.temperature} class Description: def __init__(self, action: str, preset_temperature: Optional[Preset]): self.action: str = action self.preset_temperature: Optional[Preset] = preset_temperature def __repr__(self): desc = f"" def to_json(self): return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()} @staticmethod def parse_dict(change: Dict): return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"])) class TemperatureDropDetection: def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int, is_window_open: bool): self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes self.sensitivity: int = sensitivity self.is_window_open: bool = is_window_open def __repr__(self): # ToDo pass def to_json(self): return { "doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes, "sensitivity": self.sensitivity, "isWindowOpen": self.is_window_open } def to_web_data(self, device_id: int): return { "WindowOpenTrigger": self.sensitivity + 3, "WindowOpenTimer": self.do_not_heat_offset_in_minutes } @staticmethod def parse_dict(drop_detection: Dict): return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"], drop_detection["sensitivity"], drop_detection["isWindowOpen"]) class ScheduleKind(Enum): REPETITIVE = auto() WEEKLY_TIMETABLE = auto() class ThermostatSkillMode(Enum): TARGET_TEMPERATURE = auto() class Action: def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description): self.is_enabled: bool = is_enabled self.time_setting: TimeSetting = time_setting self.description: Description = description def __repr__(self): return f"" def to_json(self): return {"isEnabled": self.is_enabled, "timeSetting": self.time_setting.to_json(), "description": self.description.to_json()} @staticmethod def parse_dict(action: Dict): return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]), Description.parse_dict(action["description"])) class Schedule: def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]): self.is_enabled: bool = is_enabled self.kind: ScheduleKind = kind self.name: str = name self.actions: List[Action] = actions def __repr__(self): return f"" def to_json(self): return { "isEnabled": self.is_enabled, "kind": self.kind.name, "name": self.name, "actions": [action.to_json() for action in self.actions] } def to_web_data(self, device_id: int): data = {} if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names? enabled_count = 0 for num, holiday in enumerate(self.actions): num += 1 _, start_month, start_day = holiday.time_setting.start_date.split("-") _, end_month, end_day = holiday.time_setting.end_date.split("-") data.update({ f"Holiday{num}StartDay": start_day, f"Holiday{num}StartMonth": start_month, f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0], f"Holiday{num}EndDay": end_day, f"Holiday{num}EndMonth": end_month, f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0], f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0, f"Holiday{num}ID": num }) if holiday.is_enabled: enabled_count += 1 data["HolidayEnabledCount"] = enabled_count elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME": _, start_month, start_day = self.actions[0].time_setting.start_date.split("-") _, end_month, end_day = self.actions[0].time_setting.end_date.split("-") data = { "SummerStartDay": start_day, "SummerStartMonth": start_month, "SummerEndDay": end_day, "SummerEndMonth": end_month, "SummerEnabled": 1 if self.is_enabled else 0 } elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE": timer_items = {} for action in self.actions: if not action.is_enabled: continue time = "".join(action.time_setting.start_time.split(":")[:-1]) if time not in timer_items.keys(): timer_items[time] = [0, 0] heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0 days = timer_items[time][heat] days |= action.time_setting.day_of_week timer_items[time][heat] = days num = 0 for key in timer_items.keys(): if timer_items[key][0] != 0: data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"}) num += 1 if timer_items[key][1] != 0: data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"}) num += 1 else: raise NotImplementedError(self.name) return data @staticmethod def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts return Schedule(schedule["isEnabled"], ScheduleKind[schedule["kind"]], schedule["name"], [Action.parse_dict(action) for action in schedule["actions"]]) class TimeControl: def __init__(self, is_enabled: bool, time_schedules: List[Schedule]): self.is_enabled: bool = is_enabled self.time_schedules: List[Schedule] = time_schedules def __repr__(self): return f" None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", "data": "data.lua" } self.url: str = url self.dry_run: bool = dry_run self.user: Optional[str] = user self.session: requests.Session = requests.Session() self.password: str = password self.sid: Optional[str] = None self.update_timeout: int = update_timeout self.update_time: Dict[str, datetime] = {} self.hold_connection: Optional[Task] = None async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity await asyncio.sleep(19 * 60) self.check_session() def _calc_challenge_v2(self, challenge: str) -> str: logging.debug(f"Calculate v2 challenge: {challenge}") chall_regex = re.compile( "2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") chall_parts = chall_regex.match(challenge).groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) salt2: bytes = bytes.fromhex(chall_parts["salt2"]) iter2: int = int(chall_parts["iter2"]) hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1) response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex() return response def _calc_challenge_v1(self, challenge: str) -> str: """ Calculate the response for a challenge using legacy MD5 """ logging.debug(f"Calculate v1 challenge: {challenge}") response = f"{challenge}-{self.password}" response = response.encode("utf_16_le") response = challenge + "-" + hashlib.md5(response).hexdigest() return response def check_session(self) -> None: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "overview", "xhrId": "first", "noMenuRef": 1 } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) if len(r.history) > 0: if not self.login(): logging.error("Failed to login to Fritz!Box") else: logging.info("Already logged in") def login(self, user: str = None) -> bool: logging.info(f"Login user {user} to Fritz!Box") challenge = None r = self.session.get(f"{self.url}/{self._endpoints['login']}") xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text elif elem.tag == "Challenge": challenge = elem.text elif user is None and elem.tag == "Users": for user_elem in elem: if "fritz" in user_elem.text: user = user_elem.text assert challenge is not None and user is not None if challenge.startswith("2$"): response = self._calc_challenge_v2(challenge) else: response = self._calc_challenge_v1(challenge) data = { "username": user, "response": response } r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) logging.debug(r.text) xml = ET.fromstring(r.text) for elem in xml: if elem.tag == "SID": self.sid = elem.text logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") if len(self.sid) != self.sid.count("0"): self.hold_connection = asyncio.create_task(self.hold_connection_alive()) return len(self.sid) != self.sid.count("0") def logout(self) -> bool: logging.info("logout") data = { "xhr": 1, "sid": self.sid, "logout": 1, "no_sidrenew": ""} r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) if self.hold_connection is not None: self.hold_connection.cancel() return r.status_code == 200 def list_devices(self) -> Optional[List[Device]]: data = { "xhr": 1, "sid": self.sid, "lang": "de", "page": "sh_dev", "xhrId": "all" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) logging.debug(r.text[:100]) if len(r.history) > 0: if self.login(): r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) else: return None devices: List[Device] = [] for device in json.loads(r.text)["data"]["devices"]: devices.append(Device(device)) return devices def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]: if idx is None and name is None: logging.debug("No id or name given") return None devices = self.list_devices() device = None for device in devices: if device.id == idx or device.display_name == name: break device = None if device is None: logging.debug(f"Device {idx} {name} not found") return None return device def set_offset(self, device: Device) -> None: if self.dry_run: logging.warning("No updates in dry-run-mode") return data = { "xhr": 1, "sid": self.sid, "lang": "de", "device": device.id, "page": "home_auto_hkr_edit" } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) data = { "xhr": 1, "sid": self.sid, "lang": "de", "view": "", "back_to_page": "sh_dev", "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua" } data.update(device.to_web_data()) self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) def correct_offset(self, device_name: str, real_temp: float): elapsed = None if device_name in self.update_time.keys(): elapsed = datetime.now() - self.update_time[device_name] logging.info(f"Last update for {device_name} {elapsed} ago") delta = timedelta(minutes=self.update_timeout) if device_name not in self.update_time.keys() or elapsed > delta: device: Optional[Device] = self.get_device_data(name=device_name) if device is None: return new_offset = device.get_offset() + real_temp - device.get_temperature() logging.info(f"Update offset from {device.get_offset()} to {new_offset}") device.set_offset(new_offset) self.set_offset(device) self.update_time[device.display_name] = datetime.now()