From 9bf21d04e30ab08d5faa632acff9314530b2aa02 Mon Sep 17 00:00:00 2001 From: Horlabs Date: Fri, 28 Jan 2022 22:37:28 +0100 Subject: [PATCH] Change to asyncio, keep connection to fb alive, use data from ha for comparing --- .gitignore | 1 + fritz_temp_sync/fritzbox.py | 115 +++++++++++++------- fritz_temp_sync/homeassistant.py | 174 ++++++++++++++++++++++--------- fritz_temp_sync/sync_ha_fb.py | 130 ++++++++++++++++------- 4 files changed, 301 insertions(+), 119 deletions(-) diff --git a/.gitignore b/.gitignore index 432bc96..def54fa 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ dist/ downloads/ eggs/ .eggs/ +.venv/ lib/ lib64/ parts/ diff --git a/fritz_temp_sync/fritzbox.py b/fritz_temp_sync/fritzbox.py index df5ea0c..b1d1b15 100755 --- a/fritz_temp_sync/fritzbox.py +++ b/fritz_temp_sync/fritzbox.py @@ -1,4 +1,7 @@ -from typing import Optional, Tuple +import asyncio +from asyncio import Task +from datetime import datetime, timedelta +from typing import Optional, Tuple, Dict import requests import json import re @@ -6,23 +9,34 @@ import hashlib import xml.etree.ElementTree as ET import logging + class FritzBox: - def __init__(self, url:str, password:str, user:str = None) -> None: + def __init__(self, url: str, password: str, user: str = None) -> None: self._endpoints = { "login": "login_sid.lua?version=2", "logout": "index.lua", "data": "data.lua" } - self.url = url - self.session = requests.Session() - self.password = password - self.sid = None + self.url: str = url + self.user: Optional[str] = user + self.session: requests.Session = requests.Session() + self.password: str = password + self.sid: Optional[str] = None + self.update_time: Dict[str, datetime] = {} + self.hold_connection: Optional[Task] = None + + async def hold_connection_alive(self) -> None: + while True: + # Session automatically destroyed after 20m of inactivity + await asyncio.sleep(19*60) + self.check_session() def _calc_challenge_v2(self, challenge: str) -> str: logging.debug(f"Calculate v2 challenge: {challenge}") - chall_regex = re.compile("2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") - + chall_regex = re.compile( + "2\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)\$(?P[0-9a-zA-Z]+)") + chall_parts = chall_regex.match(challenge).groupdict() salt1: bytes = bytes.fromhex(chall_parts["salt1"]) iter1: int = int(chall_parts["iter1"]) @@ -42,8 +56,24 @@ class FritzBox: response = challenge + "-" + hashlib.md5(response).hexdigest() return response - def login(self, user:str = None) -> bool: - logging.debug(f"login user {user}") + def check_session(self) -> None: + data = { + "xhr": 1, + "sid": self.sid, + "lang": "de", + "page": "overview", + "xhrId": "first", + "noMenuRef": 1 + } + r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + if len(r.history) > 0: + if not self.login(): + logging.error("Failed to login to Fritz!Box") + else: + logging.info("Already logged in") + + def login(self, user: str = None) -> bool: + logging.info(f"Login user {user} to Fritz!Box") challenge = None r = self.session.get(f"{self.url}/{self._endpoints['login']}") xml = ET.fromstring(r.text) @@ -65,7 +95,7 @@ class FritzBox: response = self._calc_challenge_v1(challenge) data = { - "username": user, + "username": user, "response": response } @@ -75,27 +105,31 @@ class FritzBox: for elem in xml: if elem.tag == "SID": self.sid = elem.text - - logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}") + + logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") + if len(self.sid) != self.sid.count("0"): + self.hold_connection = asyncio.create_task(self.hold_connection_alive()) return len(self.sid) != self.sid.count("0") def logout(self) -> bool: - logging.debug("logout") + logging.info("logout") data = { - "xhr":1, + "xhr": 1, "sid": self.sid, "logout": 1, - "no_sidrenew":""} + "no_sidrenew": ""} r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) + if self.hold_connection is not None: + self.hold_connection.cancel() return r.status_code == 200 - def list_devices(self): + def list_devices(self) -> Optional[Dict]: data = { - "xhr": 1, - "sid": self.sid, - "lang": "de", - "page":"sh_dev", + "xhr": 1, + "sid": self.sid, + "lang": "de", + "page": "sh_dev", "xhrId": "all" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) @@ -106,7 +140,7 @@ class FritzBox: else: return None devices = json.loads(r.text)["data"]["devices"] - + return devices def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]: @@ -115,11 +149,12 @@ class FritzBox: return None devices = self.list_devices() + device = None for device in devices: if device["id"] == id or device["displayName"] == name: break device = None - + if device is None: logging.debug(f"Device {id} {name} not found") return None @@ -135,32 +170,42 @@ class FritzBox: return current_temp, current_offset, device["id"], device["displayName"] - def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str): + def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str) -> None: data = { - "xhr": 1, - "sid": self.sid, - "lang": "de", - "device": device_id, + "xhr": 1, + "sid": self.sid, + "lang": "de", + "device": device_id, "page": "home_auto_hkr_edit" } r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) data = { - "xhr":1, + "xhr": 1, "sid": self.sid, "lang": "de", "device": device_id, "view": "", "back_to_page": "sh_dev", "ule_device_name": device_name, - "WindowOpenTrigger":8, - "WindowOpenTimer":10, + "WindowOpenTrigger": 8, + "WindowOpenTimer": 10, "tempsensor": "own", "Roomtemp": f"{current_temp}", - "ExtTempsensorID":"tochoose", + "ExtTempsensorID": "tochoose", "Offset": f"{offset}", - "apply":"", - "oldpage":"/net/home_auto_hkr_edit.lua" + "apply": "", + "oldpage": "/net/home_auto_hkr_edit.lua" } - r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) + + def correct_offset(self, device_name: str, real_temp: float): + if device_name in self.update_time.keys(): + logging.info(f"Last update for {device_name} {datetime.now() - self.update_time[device_name]} ago") + delta = timedelta(minutes=5) + if device_name not in self.update_time.keys() or (datetime.now() - self.update_time[device_name]) > delta: + current_temp, current_offset, idx, name = self.get_device_data(name=device_name) + new_offset = current_offset + real_temp - current_temp + logging.info(f"Should update offset from {current_offset} to {new_offset}") + self.update_time[device_name] = datetime.now() diff --git a/fritz_temp_sync/homeassistant.py b/fritz_temp_sync/homeassistant.py index d694979..61e8b4b 100755 --- a/fritz_temp_sync/homeassistant.py +++ b/fritz_temp_sync/homeassistant.py @@ -1,66 +1,146 @@ from __future__ import annotations +import asyncio import json import logging -from typing import Callable -import websocket -import time +from asyncio import Queue, Task, Event, Lock +from typing import Callable, Dict, Optional +import websockets + +""" +- sender fun, bekommt packete per queue +- receiver fun, schreibt packete in map id -> msg bzw events in queue(?) +- blockieren beim auf antwort warten per "pseudo-queue", in die sich alle wartenden eintragen und warten, dass sie leer ist, receiver leert queue wenn irgendeine nachricht rein kommt +- andere (auch außerhalb) können dann auf neue daten warten und ggf verarbeiten + +""" + class HomeAssistantAPI: - def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None: + def __init__(self, token: str, url: str) -> None: self.token = token self.msg_id = 1 - self.ws = None - self.subscriptions = {} - self.init_callback = initialize + self.msg_id_lock = Lock() + self.ws: websockets.WebSocketClientProtocol = None + self.url = url + self.receiver: Optional[Task] = None + self.sender: Optional[Task] = None + self.sending_queue: Queue = Queue() + self.authenticated: Event = Event() + self.events: Dict[int, Queue] = {} + self.responses: Dict[int, Dict] = {} + self.response_events: Dict[int, Event] = {} + self.response_lock: Lock = Lock() - def handle_message(self, ws: websocket.WebSocket, msg: str) -> None: - if self.ws is None: - self.ws = ws + async def connect(self): + self.ws = await websockets.connect(self.url) + self.sender = asyncio.create_task(self.sending()) + await self.auth() + self.receiver = asyncio.create_task(self.receiving()) - message: object = json.loads(msg) - - if message["type"] == "auth_required": - response = { + async def wait_for_close(self): + await self.ws.wait_closed() + + async def receiving(self): + logging.debug("Start receiving") + async for message in self.ws: + msg: Dict = json.loads(message) + if msg["type"] == "event": + if msg["id"] not in self.events.keys(): + logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}") + continue + await self.events[msg["id"]].put(msg["event"]) + else: + async with self.response_lock: + self.responses[msg["id"]] = msg + if msg["id"] in self.response_events.keys(): + self.response_events[msg["id"]].set() + + async def wait_for(self, idx): + async with self.response_lock: + if idx in self.responses.keys(): + msg = self.responses[idx] + del self.responses[idx] + return msg + self.response_events[idx] = Event() + + await self.response_events[idx].wait() + async with self.response_lock: + del self.response_events[idx] + if idx not in self.responses.keys(): + logging.error("Response ID not found") + return None + msg = self.responses[idx] + del self.responses[idx] + return msg + + async def exit_loop(self): + if self.sender is not None: + self.sender.cancel() + if self.receiver is not None: + self.receiver.cancel() + asyncio.get_running_loop().stop() + + async def auth(self): + msg = json.loads(await self.ws.recv()) + if msg["type"] != "auth_required": + logging.error("Authentication error: Not required") + await self.exit_loop() + response = { "type": "auth", "access_token": self.token } - logging.debug(response) - ws.send(json.dumps(response)) - return - elif message["type"] == "auth_invalid": + await self.sending_queue.put(response) + msg = json.loads(await self.ws.recv()) + if msg["type"] == "auth_invalid": logging.info("Auth failed") - ws.close() - return None - elif message["type"] == "auth_ok": + await self.exit_loop() + elif msg["type"] == "auth_ok": logging.debug("Authenticated") - self.init_callback(self) - self.init_callback = None - return - elif message["type"] == "event": - event = message["event"] - if event["event_type"] in self.subscriptions.keys(): - self.subscriptions[event["event_type"]](event) - + self.authenticated.set() else: - print("Received", message) - - def subscribe_event(self, event_type: str, callback: Callable[[object], None]): + logging.error(f"Unknown answer for auth: {msg}") + await self.exit_loop() - if self.ws is None: - logging.debug("Websocket not set") - return + async def sending(self): + while msg := await self.sending_queue.get(): + await self.ws.send(json.dumps(msg)) + + async def subscribe_event(self, event_type: str): + await self.authenticated.wait() - if event_type in self.subscriptions.keys(): - logging.warning(f"Already subscribed to {event_type}") - return - logging.info(f"Subscribe to {event_type}") - self.subscriptions[event_type] = callback - response = { - "id": self.msg_id, - "type": "subscribe_events", - "event_type": event_type - } - self.msg_id += 1 - self.ws.send(json.dumps(response)) + async with self.msg_id_lock: + msg_id = self.msg_id + response = { + "id": msg_id, + "type": "subscribe_events", + "event_type": event_type + } + self.events[msg_id] = Queue() + self.msg_id += 1 + await self.sending_queue.put(response) + return msg_id + + async def get_states(self): + await self.authenticated.wait() + async with self.msg_id_lock: + message = { + "id": self.msg_id, + "type": "get_states" + } + self.msg_id += 1 + await self.sending_queue.put(message) + + response = await self.wait_for(message["id"]) + # ToDo: Error handling + return response["result"] + + async def get_device_state(self, entity_id: str): + device_states = await self.get_states() + for device_state in device_states: + if device_state["entity_id"] == entity_id: + return device_state + + return None + diff --git a/fritz_temp_sync/sync_ha_fb.py b/fritz_temp_sync/sync_ha_fb.py index 8ca2a44..d2ec8f2 100755 --- a/fritz_temp_sync/sync_ha_fb.py +++ b/fritz_temp_sync/sync_ha_fb.py @@ -3,31 +3,59 @@ hier die verbindungen zu HA aufbauen etc außerdem das vergleichen der werte und dass anstoßen der updates """ +import asyncio import os +from typing import Dict + from fritzbox import FritzBox from homeassistant import HomeAssistantAPI import logging -import websocket import json -mappings = {} +sensor_mappings = {} +thermostate_mappings = {} -def handle_event(event): - entity_id = event["data"]["entity_id"] - if entity_id in mappings.keys(): - new_state = event["data"]["new_state"] - logging.debug(entity_id) - logging.debug(new_state["attributes"]["temperature"]) - rounded = round(float(new_state["attributes"]["temperature"])*2)/2 - logging.debug(rounded) - if new_state["attributes"]["device_class"] == "temperature": - if entity_id in mappings.keys(): +async def handle_event(idx: int): + logging.debug(f"Wait for events for {idx}") + + while event := await ha.events[idx].get(): + entity_id = event["data"]["entity_id"] + if entity_id in sensor_mappings.keys() or entity_id in thermostate_mappings.keys(): + state = await ha.get_device_state(entity_id) + new_state = event["data"]["new_state"] + logging.info(f"received changed state from {entity_id}") + if entity_id in thermostate_mappings.keys() and state["state"] != "unavailable": + therm_temp = new_state["attributes"]["current_temperature"] + therm_name = new_state["attributes"]["friendly_name"] + sensor = thermostate_mappings[entity_id] + sensor_state = await ha.get_device_state(sensor) + sensor_temp = round(float(sensor_state["attributes"]["temperature"]) * 2) / 2 + if therm_temp != sensor_temp: + logging.info(f"{therm_name}: {therm_temp}") + logging.info(f"{sensor}: {sensor_state['attributes']['temperature']} ({sensor_temp})") + fb.correct_offset(therm_name, sensor_temp) + + elif entity_id in sensor_mappings.keys(): + sensor_temp = round(float(new_state["attributes"]["temperature"]) * 2) / 2 + """ fb.login() logged = False - for thermostate in mappings[entity_id]: + """ + for thermostate in sensor_mappings[entity_id]: + therm_state = await ha.get_device_state(thermostate) + if therm_state["state"] == "unavailable": + continue + therm_temp = float(therm_state["attributes"]["current_temperature"]) + therm_name = therm_state["attributes"]["friendly_name"] + if therm_temp != sensor_temp: + logging.info(f"{therm_name}: {therm_temp}") + logging.info(f"{entity_id}: {new_state['attributes']['temperature']} ({sensor_temp})") + fb.correct_offset(therm_name, sensor_temp) + """ current_temp, current_offset, id, name = fb.get_device_data(name=thermostate) if not logged: - logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})") + logging.info( + f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})") logged = True logging.info(f"Current measurement from {thermostate}: {current_temp}") new_offset = current_offset + rounded - current_temp @@ -42,37 +70,65 @@ def handle_event(event): else: logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}") fb.logout() + """ -def on_error(ws, error): - print(error) -def on_close(ws, close_status_code, close_msg): - pass - -def on_open(ws): - pass - -def init(ha: HomeAssistantAPI): +async def init(ha: HomeAssistantAPI): + await ha.connect() logging.debug("Subscribe") - ha.subscribe_event("state_changed", handle_event) + state_changed_id = await ha.subscribe_event("state_changed") + logging.debug(state_changed_id) + asyncio.create_task(handle_event(state_changed_id)) + fb.login() + await ha.wait_for_close() + logging.info("Websocket closed, shutting down..") + asyncio.get_running_loop().stop() + + +async def migrate_config(config_path: str, ha: HomeAssistantAPI): + config = json.load(open(config_path)) + therm_ids = {} + for state in await ha.get_states(): + if state["entity_id"].startswith("climate.") and "friendly_name" in state["attributes"].keys(): + therm_ids[state["attributes"]["friendly_name"]] = state["entity_id"] + + mappings = [] + for mapping in config["mappings"]: + if not mapping["thermostate"].startswith("climate."): + mapping["thermostate"] = therm_ids[mapping["thermostate"]] + mappings.append(mapping) + config["mappings"] = mappings + json.dump(open(config_path), config) + + return config + logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s") -config = json.load(open("/data/options.json")) +config_path = "/data/options.json" +config_path = "options.json" +config = json.load(open(config_path)) logging.debug(config) -for mapping in config["mappings"]: - if mapping["sensor"] not in mappings.keys(): - mappings[mapping["sensor"]] = [] - mappings[mapping["sensor"]].append(mapping["thermostate"]) +loop = asyncio.get_event_loop() fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"]) -ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init) +supervisor_url = "ws://supervisor/core/websocket" +supervisor_url = "ws://192.168.124.187:8123/api/websocket" +ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], supervisor_url) -websocket.enableTrace(False) -ws = websocket.WebSocketApp("ws://supervisor/core/websocket", - on_open=on_open, - on_message=ha.handle_message, - on_error=on_error, - on_close=on_close) +if '"thermostate": "climate.' not in open(config_path).read(): + config = loop.run_until_complete(migrate_config(config_path, ha)) + logging.info(config) + exit() -ws.run_forever() +for mapping in config["mappings"]: + if mapping["sensor"] not in sensor_mappings.keys(): + sensor_mappings[mapping["sensor"]] = [] + sensor_mappings[mapping["sensor"]].append(mapping["thermostate"]) + thermostate_mappings[mapping["thermostate"]] = mapping["sensor"] + +loop.create_task(init(ha)) +try: + loop.run_forever() +except KeyboardInterrupt: + pass