Compare commits

..

2 Commits

Author SHA1 Message Date
962e2c84af More changes i guess 2022-12-04 13:59:02 +01:00
924eb57b34 now works with fb 7360 2022-12-04 13:58:53 +01:00
4 changed files with 181 additions and 92 deletions

View File

@ -2,7 +2,8 @@ ARG BUILD_FROM
FROM $BUILD_FROM FROM $BUILD_FROM
# Install requirements for add-on # Install requirements for add-on
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests RUN apk update && apk add --no-cache python3 py-pip
RUN python3 -m pip install websockets requests beautifulsoup4
WORKDIR /data WORKDIR /data

View File

@ -1,6 +1,6 @@
name: "Fritz!Box Temperature Sync Dev" name: "Fritz!Box Temperature Sync Dev"
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant" description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
version: "0.4.3" version: "0.5.0"
startup: "application" startup: "application"
stage: "stable" stage: "stable"
slug: "fritz_temp_sync_dev" slug: "fritz_temp_sync_dev"
@ -16,6 +16,8 @@ options:
fritzbox: fritzbox:
url: "http://fritz.box" url: "http://fritz.box"
password: null password: null
verify_ssl: true
old_fb: false
mappings: mappings:
- sensor: null - sensor: null
thermostate: null thermostate: null
@ -25,8 +27,9 @@ schema:
url: url url: url
username: "str?" username: "str?"
password: str password: str
verify_ssl: bool
old_fb: bool
mappings: mappings:
- sensor: str - sensor: str
thermostate: str thermostate: str
update_timeout: int update_timeout: int
log_level: "str?"

View File

@ -8,18 +8,22 @@ import re
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from asyncio import Task from asyncio import Task
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Dict, List from typing import Optional, Dict, List, Union
import requests import requests
from bs4 import BeautifulSoup
from device import Device from device import Device
class FritzBox: class FritzBox:
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None: def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False,
verify_ssl: bool = True, old_fb: bool = False) -> None:
self._endpoints = { self._endpoints = {
"login": "login_sid.lua?version=2", "login": "login_sid.lua?version=2",
"logout": "index.lua", "logout": "index.lua",
"data": "data.lua" "data": "data.lua",
"device_details": "net/home_auto_hkr_edit.lua"
} }
self.url: str = url self.url: str = url
self.dry_run: bool = dry_run self.dry_run: bool = dry_run
@ -30,6 +34,12 @@ class FritzBox:
self.update_timeout: int = update_timeout self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {} self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None self.hold_connection: Optional[Task] = None
self.verify_ssl = verify_ssl
self.old_fb = old_fb
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def hold_connection_alive(self) -> None: async def hold_connection_alive(self) -> None:
while True: while True:
@ -42,7 +52,7 @@ class FritzBox:
logging.debug(f"Calculate v2 challenge: {challenge}") logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile( chall_regex = re.compile(
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)") "2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict() chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"]) salt1: bytes = bytes.fromhex(chall_parts["salt1"])
@ -72,7 +82,7 @@ class FritzBox:
"xhrId": "first", "xhrId": "first",
"noMenuRef": 1 "noMenuRef": 1
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
if len(r.history) > 0: if len(r.history) > 0:
if not self.login(): if not self.login():
logging.error("Failed to login to Fritz!Box") logging.error("Failed to login to Fritz!Box")
@ -82,7 +92,7 @@ class FritzBox:
def login(self) -> bool: def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box") logging.info(f"Login user {self.user} to Fritz!Box")
challenge = None challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}") r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl)
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
for elem in xml: for elem in xml:
if elem.tag == "SID": if elem.tag == "SID":
@ -106,7 +116,7 @@ class FritzBox:
"response": response "response": response
} }
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl)
logging.debug(r.text) logging.debug(r.text)
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
for elem in xml: for elem in xml:
@ -125,12 +135,69 @@ class FritzBox:
"sid": self.sid, "sid": self.sid,
"logout": 1, "logout": 1,
"no_sidrenew": ""} "no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl)
if self.hold_connection is not None: if self.hold_connection is not None:
self.hold_connection.cancel() self.hold_connection.cancel()
return r.status_code == 200 return r.status_code == 200
def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else:
return None
devices_overview_raw = BeautifulSoup(r.text, "html.parser")
table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table
devices_raw = []
for r in table_rows.find_all("tr"):
name = r.findAll("td", {'class': ['name', 'cut_overflow']})
button = r.findAll("td", {'class': 'btncolumn'})
temperature = r.findAll("td", {'class': 'temperature'})
if name is None or len(name) < 1:
continue
if button is None or len(button) < 1:
continue
if temperature is None or len(temperature) < 1:
continue
name = name[0].string
id = int(button[0].button["value"])
temperature = float(temperature[0].string.split(" ")[0].replace(",", "."))
request_data = {
"device": id,
"sid": self.sid,
"xhr": 1
}
r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data,
verify=self.verify_ssl)
device_content_raw = BeautifulSoup(r.text, "html.parser")
offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"])
devices_raw.append({
"name": name,
"display_name": name,
"id": id,
"temperature": temperature,
"offset": offset
})
return devices_raw
def list_devices(self) -> Optional[List[Device]]: def list_devices(self) -> Optional[List[Device]]:
data = { data = {
"xhr": 1, "xhr": 1,
@ -139,11 +206,11 @@ class FritzBox:
"page": "sh_dev", "page": "sh_dev",
"xhrId": "all" "xhrId": "all"
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100]) logging.debug(r.text[:100])
if len(r.history) > 0: if len(r.history) > 0:
if self.login(): if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else: else:
return None return None
devices: List[Device] = [] devices: List[Device] = []
@ -156,12 +223,18 @@ class FritzBox:
if idx is None and name is None: if idx is None and name is None:
logging.debug("No id or name given") logging.debug("No id or name given")
return None return None
if self.old_fb:
devices = self.list_devices() devices = self.list_devices_old()
else:
devices = self.list_devices()
device = None device = None
for device in devices: for device in devices:
if device.id == idx or device.display_name == name: if isinstance(device, dict):
break if device["id"] == idx or device["display_name"] == name:
break
else:
if device.id == idx or device.display_name == name:
break
device = None device = None
if device is None: if device is None:
@ -170,31 +243,53 @@ class FritzBox:
return device return device
def set_offset(self, device: Device) -> None: def set_offset(self, device: Union[Device, Dict]) -> None:
if self.dry_run: if self.dry_run:
logging.warning("No updates in dry-run-mode") logging.warning("No updates in dry-run-mode")
return return
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device.id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
data = { old_type = isinstance(device, dict)
"xhr": 1, if old_type:
"sid": self.sid, device_id = device["id"]
"lang": "de", else:
"view": "", device_id = device.id
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) if not old_type:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"view": "",
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
else:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"no_sidrenew": "",
"device": device_id,
"view": "",
"back_to_page": "/net/home_auto_overview.lua",
"ule_device_name": device["name"],
"Offset": device["offset"],
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua",
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
def correct_offset(self, device_name: str, real_temp: float): def correct_offset(self, device_name: str, real_temp: float):
elapsed = None elapsed = None
@ -203,11 +298,21 @@ class FritzBox:
logging.info(f"Last update for {device_name} {elapsed} ago") logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout) delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta: if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Device] = self.get_device_data(name=device_name) device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name)
logging.info(f"device: {device}")
if device is None: if device is None:
return return
new_offset = device.get_offset() + real_temp - device.get_temperature()
logging.info(f"Update offset from {device.get_offset()} to {new_offset}") if self.old_fb:
device.set_offset(new_offset) new_offset = device["offset"] + real_temp - device["temperature"]
logging.info(f"Update offset from {device['offset']} to {new_offset}")
else:
new_offset = device.get_offset() + real_temp - device.get_temperature()
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
if self.old_fb:
device["offset"] = new_offset
else:
device.set_offset(new_offset)
self.set_offset(device) self.set_offset(device)
self.update_time[device.display_name] = datetime.now() self.update_time[device_name] = datetime.now()

View File

@ -1,73 +1,58 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import json
import logging
import os import os
import sys import sys
from fritzbox import FritzBox from fritzbox import FritzBox
from homeassistant import HomeAssistantAPI from homeassistant import HomeAssistantAPI
import logging
import json
sensor_mappings = {} sensor_mappings = {}
thermostate_mappings = {} thermostate_mappings = {}
async def handle_event(idx: int): async def handle_event(idx: int):
global ha, fb logging.info(f"Wait for events for {idx}")
logging.debug(f"Wait for events for {idx}")
while event := await ha.events[idx].get(): while event := await ha.events[idx].get():
try: try:
entity_id = event["data"]["entity_id"] entity_id = event["data"]["entity_id"]
if ( if entity_id in sensor_mappings.keys() or entity_id in thermostate_mappings.keys():
entity_id in sensor_mappings.keys()
or entity_id in thermostate_mappings.keys()
):
state = await ha.get_device_state(entity_id) state = await ha.get_device_state(entity_id)
new_state = event["data"]["new_state"] new_state = event["data"]["new_state"]
logging.info( logging.debug(f"received changed state from {entity_id}")
f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}" if entity_id in thermostate_mappings.keys() and state["state"] != "unavailable":
)
if (
entity_id in thermostate_mappings.keys()
and state["state"] != "unavailable"
):
therm_temp = new_state["attributes"]["current_temperature"] therm_temp = new_state["attributes"]["current_temperature"]
therm_name = new_state["attributes"]["friendly_name"] therm_name = new_state["attributes"]["friendly_name"]
sensor = thermostate_mappings[entity_id] sensor = thermostate_mappings[entity_id]
sensor_state = await ha.get_device_state(sensor) sensor_state = await ha.get_device_state(sensor)
sensor_temp = round(float(sensor_state["state"]) * 2) / 2 sensor_state_temp = sensor_state["attributes"]["temperature"] if "temperatur" in sensor_state[
logging.info(f"temps: {therm_temp} {sensor_temp}") "attributes"] else sensor_state["state"]
sensor_temp = round(float(sensor_state_temp) * 2) / 2
if therm_temp != sensor_temp: if therm_temp != sensor_temp:
logging.info( logging.info(
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})" f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state_temp} ({sensor_temp})")
)
fb.correct_offset(therm_name, sensor_temp) fb.correct_offset(therm_name, sensor_temp)
elif entity_id in sensor_mappings.keys(): elif entity_id in sensor_mappings.keys():
logging.info(f"here {sensor_mappings} {entity_id}") new_state_temp = new_state["attributes"]["temperature"] if "temperatur" in new_state[
logging.info(f"{new_state}") "attributes"] else new_state["state"]
sensor_temp = round(float(new_state["state"]) * 2) / 2 sensor_temp = round(float(new_state_temp) * 2) / 2
logging.info(f"entry: {sensor_mappings[entity_id]}") logging.info(sensor_temp)
for thermostate in sensor_mappings[entity_id]: for thermostate in sensor_mappings[entity_id]:
logging.info(thermostate)
therm_state = await ha.get_device_state(thermostate) therm_state = await ha.get_device_state(thermostate)
logging.info(f"{thermostate} {therm_state}")
if therm_state["state"] == "unavailable": if therm_state["state"] == "unavailable":
continue continue
therm_temp = float( therm_temp = float(therm_state["attributes"]["current_temperature"])
therm_state["attributes"]["current_temperature"]
)
therm_name = therm_state["attributes"]["friendly_name"] therm_name = therm_state["attributes"]["friendly_name"]
logging.info(f"Temps: {therm_temp} {sensor_temp}") if therm_temp != sensor_temp or True:
if therm_temp != sensor_temp:
logging.info( logging.info(
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})" f"{therm_name}: {therm_temp}\n{entity_id}: {new_state_temp} ({sensor_temp})")
)
fb.correct_offset(therm_name, sensor_temp) fb.correct_offset(therm_name, sensor_temp)
except KeyError: except KeyError as e:
pass logging.error(e)
async def init(ha: HomeAssistantAPI, fb: FritzBox): async def init(ha: HomeAssistantAPI, fb: FritzBox):
@ -87,26 +72,21 @@ async def main():
config = json.load(open(config_path)) config = json.load(open(config_path))
level = logging.INFO level = logging.INFO
if "log_level" in config: if "log_level" in config:
print(f"Setting log_level {config['log_level']}")
if config["log_level"] == "DEBUG": if config["log_level"] == "DEBUG":
level = logging.DEBUG level = logging.DEBUG
logging.basicConfig( logging.basicConfig(level=level, format="[%(asctime)s] [%(levelname)s] %(message)s")
level=level, format="[%(asctime)s] [%(levelname)s] %(message)s"
)
logging.debug(config) logging.debug(config)
global fb global fb
fb = FritzBox( fb = FritzBox(url=config["fritzbox"]["url"],
url=config["fritzbox"]["url"], user=config["fritzbox"]["username"],
user=config["fritzbox"]["username"], password=config["fritzbox"]["password"],
password=config["fritzbox"]["password"], update_timeout=config["update_timeout"],
update_timeout=config["update_timeout"], dry_run=False,
dry_run=False, verify_ssl=config["fritzbox"]["verify_ssl"],
) old_fb=config["fritzbox"]["old_fb"])
supervisor_url = "ws://supervisor/core/websocket" supervisor_url = "ws://192.168.176.2:8123/api/websocket"
if "SUPERVISOR_URL" in os.environ: supervisor_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiIzZWZhZThhYjBhNTE0MDBhYmNjNzE4Yzc3YjE5MzNkNiIsImlhdCI6MTY3MDE1MTQxMCwiZXhwIjoxOTg1NTExNDEwfQ.0_PlrqiUtEjKhsKzID7xpyQGunWlbO5cr1EPtnzO5tE"
supervisor_url = os.environ["SUPERVISOR_URL"]
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
global ha global ha
ha = HomeAssistantAPI(supervisor_token, supervisor_url) ha = HomeAssistantAPI(supervisor_token, supervisor_url)
@ -115,7 +95,7 @@ async def main():
sensor_mappings[mapping["sensor"]] = [] sensor_mappings[mapping["sensor"]] = []
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"]) sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"] thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}")
try: try:
await init(ha, fb) await init(ha, fb)
except KeyboardInterrupt: except KeyboardInterrupt: