From a1d7dfa9bda4785c900e8d512518117f88c97719 Mon Sep 17 00:00:00 2001 From: Horlabs Date: Sat, 5 Feb 2022 15:43:52 +0100 Subject: [PATCH] Fix communication with fb/device as class --- fritz_temp_sync/config.yaml | 6 +- fritz_temp_sync/fritzbox.py | 803 ++++++++++++++++++++++++++++++++-- fritz_temp_sync/sync_ha_fb.py | 11 +- 3 files changed, 771 insertions(+), 49 deletions(-) diff --git a/fritz_temp_sync/config.yaml b/fritz_temp_sync/config.yaml index 31bed60..fadf507 100755 --- a/fritz_temp_sync/config.yaml +++ b/fritz_temp_sync/config.yaml @@ -1,5 +1,5 @@ name: "Fritz!Box Temperature Sync" -description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant" +description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant" version: "0.2.0" slug: "fritz_temp_sync" homeassistant_api: true @@ -16,10 +16,12 @@ options: mappings: - sensor: null thermostate: null + update_timeout: int schema: fritzbox: url: url password: str mappings: - sensor: str - thermostate: str \ No newline at end of file + thermostate: str + update_timeout: 15 \ No newline at end of file diff --git a/fritz_temp_sync/fritzbox.py b/fritz_temp_sync/fritzbox.py index fb93355..21bffae 100755 --- a/fritz_temp_sync/fritzbox.py +++ b/fritz_temp_sync/fritzbox.py @@ -1,17 +1,744 @@ +from __future__ import annotations + import asyncio +import hashlib +import json +import logging +import re +import typing +import xml.etree.ElementTree as ET +from abc import ABC, abstractmethod from asyncio import Task from datetime import datetime, timedelta -from typing import Optional, Tuple, Dict +from enum import IntFlag, Enum, auto +from typing import Optional, Tuple, Dict, List, Union, TypedDict +from urllib.parse import urlencode + import requests -import json -import re -import hashlib -import xml.etree.ElementTree as ET -import logging + + +class WeekDay(IntFlag): + MON = 0b1 + TUE = 0b10 + WED = 0b100 + THU = 0b1000 + FRI = 0b10000 + SAT = 0b100000 + SUN = 0b1000000 + + +class Manufacturer: + def __init__(self, name: str): + self.name: str = name + + def __repr__(self): + return f"name: {self.name}" + + def to_json(self): + return {"name": self.name} + + @staticmethod + def parse_dict(manufacturer: Dict): + return Manufacturer(manufacturer["name"]) + + +class FirmwareVersion: + def __init__(self, search: bool, current: str, update: bool, running: bool): + self.search: bool = search + self.current: str = current + self.update: bool = update + self.running: bool = running + + def __repr__(self): + return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}" + + def to_json(self): + return {"search": self.search, "current": self.current, "update": self.update, "running": self.running} + + @staticmethod + def parse_dict(firmware_version: dict): + return FirmwareVersion(firmware_version["search"], + firmware_version["current"], + firmware_version["update"], + firmware_version["running"]) + + +class PushService: + def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool): + self.mail_address: str = mail_address + self.unit_settings: List = unit_settings + self.is_enabled: bool = is_enabled + + def __repr__(self): + return f"" + + def to_json(self): + return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled} + + @staticmethod + def parse_dict(push_service: Dict): + return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"]) + + +class SkillType(Enum): + SmartHomeTemperatureSensor = auto() + SmartHomeThermostat = auto() + SmartHomeBattery = auto() + + +class Skill(ABC): + def __init__(self, skill_type: SkillType): + self.type: SkillType = skill_type + + @abstractmethod + def to_json(self): + pass + + @abstractmethod + def to_web_data(self, device_id: int): + pass + + @staticmethod + def parse_dict(skill: Dict): + skill_type = SkillType[skill["type"]] + if skill_type == SkillType.SmartHomeTemperatureSensor: + return TemperatureSkill.parse_dict(skill) + elif skill_type == SkillType.SmartHomeThermostat: + return ThermostatSkill.parse_dict(skill) + elif skill_type == SkillType.SmartHomeBattery: + return BatterySkill.parse_dict(skill) + else: + raise NotImplementedError(skill_type) + + +class PresetName(Enum): + LOWER_TEMPERATURE = auto() + UPPER_TEMPERATURE = auto() + HOLIDAY_TEMPERATURE = auto() + + +class Preset: + def __init__(self, name: PresetName, temperature: int): + self.name: PresetName = name + self.temperature: int = temperature + + def __repr__(self): + return f"" + + def to_json(self): + return {"name": self.name.name, "temperature": self.temperature} + + +class Description: + def __init__(self, action: str, preset_temperature: Optional[Preset]): + self.action: str = action + self.preset_temperature: Optional[Preset] = preset_temperature + + def __repr__(self): + desc = f"" + + def to_json(self): + return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()} + + @staticmethod + def parse_dict(change: Dict): + return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"])) + + +class TemperatureDropDetection: + def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int, is_window_open: bool): + self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes + self.sensitivity: int = sensitivity + self.is_window_open: bool = is_window_open + + def __repr__(self): + # ToDo + pass + + def to_json(self): + return { + "doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes, + "sensitivity": self.sensitivity, + "isWindowOpen": self.is_window_open + } + + def to_web_data(self, device_id: int): + return { + "WindowOpenTrigger": self.sensitivity + 3, + "WindowOpenTimer": self.do_not_heat_offset_in_minutes + } + + @staticmethod + def parse_dict(drop_detection: Dict): + return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"], + drop_detection["sensitivity"], + drop_detection["isWindowOpen"]) + + +class ScheduleKind(Enum): + REPETITIVE = auto() + WEEKLY_TIMETABLE = auto() + + +class ThermostatSkillMode(Enum): + TARGET_TEMPERATURE = auto() + + +class Action: + def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description): + self.is_enabled: bool = is_enabled + self.time_setting: TimeSetting = time_setting + self.description: Description = description + + def __repr__(self): + return f"" + + def to_json(self): + return {"isEnabled": self.is_enabled, + "timeSetting": self.time_setting.to_json(), + "description": self.description.to_json()} + + @staticmethod + def parse_dict(action: Dict): + return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]), + Description.parse_dict(action["description"])) + + +class Schedule: + def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]): + self.is_enabled: bool = is_enabled + self.kind: ScheduleKind = kind + self.name: str = name + self.actions: List[Action] = actions + + def __repr__(self): + return f"" + + def to_json(self): + return { + "isEnabled": self.is_enabled, + "kind": self.kind.name, + "name": self.name, + "actions": [action.to_json() for action in self.actions] + } + + def to_web_data(self, device_id: int): + data = {} + if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names? + enabled_count = 0 + for num, holiday in enumerate(self.actions): + num += 1 + _, start_month, start_day = holiday.time_setting.start_date.split("-") + _, end_month, end_day = holiday.time_setting.end_date.split("-") + data.update({ + f"Holiday{num}StartDay": start_day, + f"Holiday{num}StartMonth": start_month, + f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0], + f"Holiday{num}EndDay": end_day, + f"Holiday{num}EndMonth": end_month, + f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0], + f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0, + f"Holiday{num}ID": num + }) + if holiday.is_enabled: + enabled_count += 1 + data["HolidayEnabledCount"] = enabled_count + elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME": + _, start_month, start_day = self.actions[0].time_setting.start_date.split("-") + _, end_month, end_day = self.actions[0].time_setting.end_date.split("-") + data = { + "SummerStartDay": start_day, + "SummerStartMonth": start_month, + "SummerEndDay": end_day, + "SummerEndMonth": end_month, + "SummerEnabled": 1 if self.is_enabled else 0 + } + elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE": + timer_items = {} + for action in self.actions: + if not action.is_enabled: + continue + time = "".join(action.time_setting.start_time.split(":")[:-1]) + if time not in timer_items.keys(): + timer_items[time] = [0, 0] + heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0 + days = timer_items[time][heat] + days |= action.time_setting.day_of_week + timer_items[time][heat] = days + num = 0 + for key in timer_items.keys(): + if timer_items[key][0] != 0: + data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"}) + num += 1 + if timer_items[key][1] != 0: + data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"}) + num += 1 + else: + raise NotImplementedError(self.name) + + return data + + @staticmethod + def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts + return Schedule(schedule["isEnabled"], + ScheduleKind[schedule["kind"]], + schedule["name"], + [Action.parse_dict(action) for action in schedule["actions"]]) + + +class TimeControl: + def __init__(self, is_enabled: bool, time_schedules: List[Schedule]): + self.is_enabled: bool = is_enabled + self.time_schedules: List[Schedule] = time_schedules + + def __repr__(self): + return f" None: + def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", @@ -23,13 +750,14 @@ class FritzBox: self.session: requests.Session = requests.Session() self.password: str = password self.sid: Optional[str] = None + self.update_timeout: int = update_timeout self.update_time: Dict[str, datetime] = {} self.hold_connection: Optional[Task] = None async def hold_connection_alive(self) -> None: while True: # Session automatically destroyed after 20m of inactivity - await asyncio.sleep(19*60) + await asyncio.sleep(19 * 60) self.check_session() def _calc_challenge_v2(self, challenge: str) -> str: @@ -125,7 +853,7 @@ class FritzBox: return r.status_code == 200 - def list_devices(self) -> Optional[Dict]: + def list_devices(self) -> Optional[List[Device]]: data = { "xhr": 1, "sid": self.sid, @@ -140,38 +868,31 @@ class FritzBox: r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) else: return None - devices = json.loads(r.text)["data"]["devices"] + devices: List[Device] = [] + for device in json.loads(r.text)["data"]["devices"]: + devices.append(Device(device)) return devices - def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]: - if id is None and name is None: + def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]: + if idx is None and name is None: logging.debug("No id or name given") return None devices = self.list_devices() device = None for device in devices: - if device["id"] == id or device["displayName"] == name: + if device.id == idx or device.display_name == name: break device = None if device is None: - logging.debug(f"Device {id} {name} not found") + logging.debug(f"Device {idx} {name} not found") return None - current_temp = None - current_offset = None - for unit in device["units"]: - if unit["type"] == "TEMPERATURE_SENSOR": - for skill in unit["skills"]: - if skill["type"] == "SmartHomeTemperatureSensor": - current_temp = float(skill["currentInCelsius"]) - current_offset = float(skill["offset"]) + return device - return current_temp, current_offset, device["id"], device["displayName"] - - def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str) -> None: + def set_offset(self, device: Device) -> None: if self.dry_run: logging.warning("No updates in dry-run-mode") return @@ -179,7 +900,7 @@ class FritzBox: "xhr": 1, "sid": self.sid, "lang": "de", - "device": device_id, + "device": device.id, "page": "home_auto_hkr_edit" } self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) @@ -188,29 +909,27 @@ class FritzBox: "xhr": 1, "sid": self.sid, "lang": "de", - "device": device_id, "view": "", "back_to_page": "sh_dev", - "ule_device_name": device_name, - "WindowOpenTrigger": 8, - "WindowOpenTimer": 10, - "tempsensor": "own", - "Roomtemp": f"{current_temp}", - "ExtTempsensorID": "tochoose", - "Offset": f"{offset}", "apply": "", "oldpage": "/net/home_auto_hkr_edit.lua" } + data.update(device.to_web_data()) self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) def correct_offset(self, device_name: str, real_temp: float): + elapsed = None if device_name in self.update_time.keys(): - logging.info(f"Last update for {device_name} {datetime.now() - self.update_time[device_name]} ago") - delta = timedelta(minutes=5) - if device_name not in self.update_time.keys() or (datetime.now() - self.update_time[device_name]) > delta: - current_temp, current_offset, idx, name = self.get_device_data(name=device_name) - new_offset = current_offset + real_temp - current_temp - logging.info(f"Update offset from {current_offset} to {new_offset}") - self.set_offset(current_temp, new_offset, idx, device_name) - self.update_time[device_name] = datetime.now() + elapsed = datetime.now() - self.update_time[device_name] + logging.info(f"Last update for {device_name} {elapsed} ago") + delta = timedelta(minutes=self.update_timeout) + if device_name not in self.update_time.keys() or elapsed > delta: + device: Optional[Device] = self.get_device_data(name=device_name) + if device is None: + return + new_offset = device.get_offset() + real_temp - device.get_temperature() + logging.info(f"Update offset from {device.get_offset()} to {new_offset}") + device.set_offset(new_offset) + self.set_offset(device) + self.update_time[device.display_name] = datetime.now() diff --git a/fritz_temp_sync/sync_ha_fb.py b/fritz_temp_sync/sync_ha_fb.py index ad586d7..17e6474 100755 --- a/fritz_temp_sync/sync_ha_fb.py +++ b/fritz_temp_sync/sync_ha_fb.py @@ -3,8 +3,9 @@ import asyncio import os import sys +from typing import Dict -from fritzbox import FritzBox +from fritzbox import FritzBox, Device from homeassistant import HomeAssistantAPI import logging import json @@ -71,7 +72,7 @@ async def handle_event(idx: int): """ -async def init(ha: HomeAssistantAPI): +async def init(ha: HomeAssistantAPI, fb: FritzBox): await ha.connect() logging.debug("Subscribe") state_changed_id = await ha.subscribe_event("state_changed") @@ -89,8 +90,8 @@ config = json.load(open(config_path)) logging.debug(config) loop = asyncio.get_event_loop() -fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"]) -supervisor_url = "ws://supervisor/core/websocket" +fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"], config["update_timeout"], dry_run=False) +supervisor_url = "ws://192.168.124.187:8123/api/websocket" ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], supervisor_url) for mapping in config["mappings"]: @@ -99,7 +100,7 @@ for mapping in config["mappings"]: sensor_mappings[mapping["sensor"]].append(mapping["thermostate"]) thermostate_mappings[mapping["thermostate"]] = mapping["sensor"] -loop.create_task(init(ha)) +loop.create_task(init(ha, fb)) try: loop.run_forever() except KeyboardInterrupt: