Compare commits

...

2 Commits

Author SHA1 Message Date
962e2c84af More changes i guess 2022-12-04 13:59:02 +01:00
924eb57b34 now works with fb 7360 2022-12-04 13:58:53 +01:00
4 changed files with 174 additions and 54 deletions

View File

@ -3,7 +3,7 @@ FROM $BUILD_FROM
# Install requirements for add-on # Install requirements for add-on
RUN apk update && apk add --no-cache python3 py-pip RUN apk update && apk add --no-cache python3 py-pip
RUN python3 -m pip install websockets requests RUN python3 -m pip install websockets requests beautifulsoup4
WORKDIR /data WORKDIR /data

View File

@ -1,6 +1,6 @@
name: "Fritz!Box Temperature Sync Dev" name: "Fritz!Box Temperature Sync Dev"
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant" description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
version: "0.4.3" version: "0.5.0"
startup: "application" startup: "application"
stage: "stable" stage: "stable"
slug: "fritz_temp_sync_dev" slug: "fritz_temp_sync_dev"
@ -16,6 +16,8 @@ options:
fritzbox: fritzbox:
url: "http://fritz.box" url: "http://fritz.box"
password: null password: null
verify_ssl: true
old_fb: false
mappings: mappings:
- sensor: null - sensor: null
thermostate: null thermostate: null
@ -25,6 +27,8 @@ schema:
url: url url: url
username: "str?" username: "str?"
password: str password: str
verify_ssl: bool
old_fb: bool
mappings: mappings:
- sensor: str - sensor: str
thermostate: str thermostate: str

View File

@ -8,18 +8,22 @@ import re
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from asyncio import Task from asyncio import Task
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Dict, List from typing import Optional, Dict, List, Union
import requests import requests
from bs4 import BeautifulSoup
from device import Device from device import Device
class FritzBox: class FritzBox:
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None: def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False,
verify_ssl: bool = True, old_fb: bool = False) -> None:
self._endpoints = { self._endpoints = {
"login": "login_sid.lua?version=2", "login": "login_sid.lua?version=2",
"logout": "index.lua", "logout": "index.lua",
"data": "data.lua" "data": "data.lua",
"device_details": "net/home_auto_hkr_edit.lua"
} }
self.url: str = url self.url: str = url
self.dry_run: bool = dry_run self.dry_run: bool = dry_run
@ -30,6 +34,12 @@ class FritzBox:
self.update_timeout: int = update_timeout self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {} self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None self.hold_connection: Optional[Task] = None
self.verify_ssl = verify_ssl
self.old_fb = old_fb
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def hold_connection_alive(self) -> None: async def hold_connection_alive(self) -> None:
while True: while True:
@ -72,7 +82,7 @@ class FritzBox:
"xhrId": "first", "xhrId": "first",
"noMenuRef": 1 "noMenuRef": 1
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
if len(r.history) > 0: if len(r.history) > 0:
if not self.login(): if not self.login():
logging.error("Failed to login to Fritz!Box") logging.error("Failed to login to Fritz!Box")
@ -82,7 +92,7 @@ class FritzBox:
def login(self) -> bool: def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box") logging.info(f"Login user {self.user} to Fritz!Box")
challenge = None challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}") r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl)
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
for elem in xml: for elem in xml:
if elem.tag == "SID": if elem.tag == "SID":
@ -106,7 +116,7 @@ class FritzBox:
"response": response "response": response
} }
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl)
logging.debug(r.text) logging.debug(r.text)
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
for elem in xml: for elem in xml:
@ -125,12 +135,69 @@ class FritzBox:
"sid": self.sid, "sid": self.sid,
"logout": 1, "logout": 1,
"no_sidrenew": ""} "no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl)
if self.hold_connection is not None: if self.hold_connection is not None:
self.hold_connection.cancel() self.hold_connection.cancel()
return r.status_code == 200 return r.status_code == 200
def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else:
return None
devices_overview_raw = BeautifulSoup(r.text, "html.parser")
table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table
devices_raw = []
for r in table_rows.find_all("tr"):
name = r.findAll("td", {'class': ['name', 'cut_overflow']})
button = r.findAll("td", {'class': 'btncolumn'})
temperature = r.findAll("td", {'class': 'temperature'})
if name is None or len(name) < 1:
continue
if button is None or len(button) < 1:
continue
if temperature is None or len(temperature) < 1:
continue
name = name[0].string
id = int(button[0].button["value"])
temperature = float(temperature[0].string.split(" ")[0].replace(",", "."))
request_data = {
"device": id,
"sid": self.sid,
"xhr": 1
}
r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data,
verify=self.verify_ssl)
device_content_raw = BeautifulSoup(r.text, "html.parser")
offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"])
devices_raw.append({
"name": name,
"display_name": name,
"id": id,
"temperature": temperature,
"offset": offset
})
return devices_raw
def list_devices(self) -> Optional[List[Device]]: def list_devices(self) -> Optional[List[Device]]:
data = { data = {
"xhr": 1, "xhr": 1,
@ -139,11 +206,11 @@ class FritzBox:
"page": "sh_dev", "page": "sh_dev",
"xhrId": "all" "xhrId": "all"
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100]) logging.debug(r.text[:100])
if len(r.history) > 0: if len(r.history) > 0:
if self.login(): if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else: else:
return None return None
devices: List[Device] = [] devices: List[Device] = []
@ -156,12 +223,18 @@ class FritzBox:
if idx is None and name is None: if idx is None and name is None:
logging.debug("No id or name given") logging.debug("No id or name given")
return None return None
if self.old_fb:
devices = self.list_devices() devices = self.list_devices_old()
else:
devices = self.list_devices()
device = None device = None
for device in devices: for device in devices:
if device.id == idx or device.display_name == name: if isinstance(device, dict):
break if device["id"] == idx or device["display_name"] == name:
break
else:
if device.id == idx or device.display_name == name:
break
device = None device = None
if device is None: if device is None:
@ -170,31 +243,53 @@ class FritzBox:
return device return device
def set_offset(self, device: Device) -> None: def set_offset(self, device: Union[Device, Dict]) -> None:
if self.dry_run: if self.dry_run:
logging.warning("No updates in dry-run-mode") logging.warning("No updates in dry-run-mode")
return return
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device.id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
data = { old_type = isinstance(device, dict)
"xhr": 1, if old_type:
"sid": self.sid, device_id = device["id"]
"lang": "de", else:
"view": "", device_id = device.id
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) if not old_type:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"view": "",
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
else:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"no_sidrenew": "",
"device": device_id,
"view": "",
"back_to_page": "/net/home_auto_overview.lua",
"ule_device_name": device["name"],
"Offset": device["offset"],
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua",
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
def correct_offset(self, device_name: str, real_temp: float): def correct_offset(self, device_name: str, real_temp: float):
elapsed = None elapsed = None
@ -203,11 +298,21 @@ class FritzBox:
logging.info(f"Last update for {device_name} {elapsed} ago") logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout) delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta: if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Device] = self.get_device_data(name=device_name) device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name)
logging.info(f"device: {device}")
if device is None: if device is None:
return return
new_offset = device.get_offset() + real_temp - device.get_temperature()
logging.info(f"Update offset from {device.get_offset()} to {new_offset}") if self.old_fb:
device.set_offset(new_offset) new_offset = device["offset"] + real_temp - device["temperature"]
logging.info(f"Update offset from {device['offset']} to {new_offset}")
else:
new_offset = device.get_offset() + real_temp - device.get_temperature()
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
if self.old_fb:
device["offset"] = new_offset
else:
device.set_offset(new_offset)
self.set_offset(device) self.set_offset(device)
self.update_time[device.display_name] = datetime.now() self.update_time[device_name] = datetime.now()

View File

@ -14,7 +14,7 @@ thermostate_mappings = {}
async def handle_event(idx: int): async def handle_event(idx: int):
logging.debug(f"Wait for events for {idx}") logging.info(f"Wait for events for {idx}")
while event := await ha.events[idx].get(): while event := await ha.events[idx].get():
try: try:
@ -28,24 +28,31 @@ async def handle_event(idx: int):
therm_name = new_state["attributes"]["friendly_name"] therm_name = new_state["attributes"]["friendly_name"]
sensor = thermostate_mappings[entity_id] sensor = thermostate_mappings[entity_id]
sensor_state = await ha.get_device_state(sensor) sensor_state = await ha.get_device_state(sensor)
sensor_temp = round(float(sensor_state["attributes"]["temperature"]) * 2) / 2 sensor_state_temp = sensor_state["attributes"]["temperature"] if "temperatur" in sensor_state[
"attributes"] else sensor_state["state"]
sensor_temp = round(float(sensor_state_temp) * 2) / 2
if therm_temp != sensor_temp: if therm_temp != sensor_temp:
logging.info(f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['attributes']['temperature']} ({sensor_temp})") logging.info(
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state_temp} ({sensor_temp})")
fb.correct_offset(therm_name, sensor_temp) fb.correct_offset(therm_name, sensor_temp)
elif entity_id in sensor_mappings.keys(): elif entity_id in sensor_mappings.keys():
sensor_temp = round(float(new_state["attributes"]["temperature"]) * 2) / 2 new_state_temp = new_state["attributes"]["temperature"] if "temperatur" in new_state[
"attributes"] else new_state["state"]
sensor_temp = round(float(new_state_temp) * 2) / 2
logging.info(sensor_temp)
for thermostate in sensor_mappings[entity_id]: for thermostate in sensor_mappings[entity_id]:
therm_state = await ha.get_device_state(thermostate) therm_state = await ha.get_device_state(thermostate)
if therm_state["state"] == "unavailable": if therm_state["state"] == "unavailable":
continue continue
therm_temp = float(therm_state["attributes"]["current_temperature"]) therm_temp = float(therm_state["attributes"]["current_temperature"])
therm_name = therm_state["attributes"]["friendly_name"] therm_name = therm_state["attributes"]["friendly_name"]
if therm_temp != sensor_temp: if therm_temp != sensor_temp or True:
logging.info(f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['attributes']['temperature']} ({sensor_temp})") logging.info(
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state_temp} ({sensor_temp})")
fb.correct_offset(therm_name, sensor_temp) fb.correct_offset(therm_name, sensor_temp)
except KeyError: except KeyError as e:
pass logging.error(e)
async def init(ha: HomeAssistantAPI, fb: FritzBox): async def init(ha: HomeAssistantAPI, fb: FritzBox):
@ -59,6 +66,7 @@ async def init(ha: HomeAssistantAPI, fb: FritzBox):
await ha.wait_for_close() await ha.wait_for_close()
logging.info("Websocket closed, shutting down..") logging.info("Websocket closed, shutting down..")
async def main(): async def main():
config_path = sys.argv[1] config_path = sys.argv[1]
config = json.load(open(config_path)) config = json.load(open(config_path))
@ -71,12 +79,14 @@ async def main():
global fb global fb
fb = FritzBox(url=config["fritzbox"]["url"], fb = FritzBox(url=config["fritzbox"]["url"],
user=config["fritzbox"]["username"], user=config["fritzbox"]["username"],
password=config["fritzbox"]["password"], password=config["fritzbox"]["password"],
update_timeout=config["update_timeout"], update_timeout=config["update_timeout"],
dry_run=False) dry_run=False,
supervisor_url = "ws://supervisor/core/websocket" verify_ssl=config["fritzbox"]["verify_ssl"],
supervisor_token = os.environ["SUPERVISOR_TOKEN"] old_fb=config["fritzbox"]["old_fb"])
supervisor_url = "ws://192.168.176.2:8123/api/websocket"
supervisor_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiIzZWZhZThhYjBhNTE0MDBhYmNjNzE4Yzc3YjE5MzNkNiIsImlhdCI6MTY3MDE1MTQxMCwiZXhwIjoxOTg1NTExNDEwfQ.0_PlrqiUtEjKhsKzID7xpyQGunWlbO5cr1EPtnzO5tE"
global ha global ha
ha = HomeAssistantAPI(supervisor_token, supervisor_url) ha = HomeAssistantAPI(supervisor_token, supervisor_url)
@ -91,4 +101,5 @@ async def main():
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
asyncio.run(main()) asyncio.run(main())