329 lines
12 KiB
Python
Executable File

from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import re
import xml.etree.ElementTree as ET
from asyncio import Task
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Union
import requests
from bs4 import BeautifulSoup
from device import Device
class FritzBox:
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False,
verify_ssl: bool = True, old_fb: bool = False) -> None:
self._endpoints = {
"login": "login_sid.lua?version=2",
"logout": "index.lua",
"data": "data.lua",
"device_details": "net/home_auto_hkr_edit.lua"
}
self.url: str = url
self.dry_run: bool = dry_run
self.user: Optional[str] = user
self.session: requests.Session = requests.Session()
self.password: str = password
self.sid: Optional[str] = None
self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None
self.verify_ssl = verify_ssl
self.old_fb = old_fb
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
# according to the manual
await asyncio.sleep(19 * 60)
self.check_session()
def _calc_challenge_v2(self, challenge: str) -> str:
logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile(
"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
iter1: int = int(chall_parts["iter1"])
salt2: bytes = bytes.fromhex(chall_parts["salt2"])
iter2: int = int(chall_parts["iter2"])
hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1)
response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex()
return response
def _calc_challenge_v1(self, challenge: str) -> str:
""" Calculate the response for a challenge using legacy MD5 """
logging.debug(f"Calculate v1 challenge: {challenge}")
response = f"{challenge}-{self.password}"
response = response.encode("utf_16_le")
response = challenge + "-" + hashlib.md5(response).hexdigest()
return response
def check_session(self) -> None:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
if len(r.history) > 0:
if not self.login():
logging.error("Failed to login to Fritz!Box")
else:
logging.debug("Already logged in")
def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box")
challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl)
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
elif elem.tag == "Challenge":
challenge = elem.text
elif self.user is None and elem.tag == "Users":
for user_elem in elem:
if "fritz" in user_elem.text:
self.user = user_elem.text
assert challenge is not None and self.user is not None
if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge)
else:
response = self._calc_challenge_v1(challenge)
data = {
"username": self.user,
"response": response
}
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl)
logging.debug(r.text)
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
if len(self.sid) != self.sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
return len(self.sid) != self.sid.count("0")
def logout(self) -> bool:
logging.info("logout")
data = {
"xhr": 1,
"sid": self.sid,
"logout": 1,
"no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl)
if self.hold_connection is not None:
self.hold_connection.cancel()
return r.status_code == 200
def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else:
return None
devices_overview_raw = BeautifulSoup(r.text, "html.parser")
table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table
devices_raw = []
for r in table_rows.find_all("tr"):
name = r.findAll("td", {'class': ['name', 'cut_overflow']})
button = r.findAll("td", {'class': 'btncolumn'})
temperature = r.findAll("td", {'class': 'temperature'})
if name is None or len(name) < 1:
continue
if button is None or len(button) < 1:
continue
if temperature is None or len(temperature) < 1:
continue
name = name[0].string
id = int(button[0].button["value"])
print("Temperature raw:", temperature)
try:
temperature = float(temperature[0].text.split(" ")[0].replace(",", "."))
except:
print("Error parsing temperature.")
return []
request_data = {
"device": id,
"sid": self.sid,
"xhr": 1
}
r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data,
verify=self.verify_ssl)
device_content_raw = BeautifulSoup(r.text, "html.parser")
offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"])
devices_raw.append({
"name": name,
"display_name": name,
"id": id,
"temperature": temperature,
"offset": offset
})
return devices_raw
def list_devices(self) -> Optional[List[Device]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh_dev",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else:
return None
devices: List[Device] = []
for device in json.loads(r.text)["data"]["devices"]:
devices.append(Device(device))
return devices
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
if idx is None and name is None:
logging.debug("No id or name given")
return None
if self.old_fb:
devices = self.list_devices_old()
else:
devices = self.list_devices()
device = None
for device in devices:
if isinstance(device, dict):
if device["id"] == idx or device["display_name"] == name:
break
else:
if device.id == idx or device.display_name == name:
break
device = None
if device is None:
logging.debug(f"Device {idx} {name} not found")
return None
return device
def set_offset(self, device: Union[Device, Dict]) -> None:
if self.dry_run:
logging.warning("No updates in dry-run-mode")
return
old_type = isinstance(device, dict)
if old_type:
device_id = device["id"]
else:
device_id = device.id
if not old_type:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"view": "",
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
else:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"no_sidrenew": "",
"device": device_id,
"view": "",
"back_to_page": "/net/home_auto_overview.lua",
"ule_device_name": device["name"],
"Offset": device["offset"],
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua",
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
def correct_offset(self, device_name: str, real_temp: float):
elapsed = None
if device_name in self.update_time.keys():
elapsed = datetime.now() - self.update_time[device_name]
logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name)
logging.info(f"device: {device}")
if device is None:
return
offset_changed = False
if self.old_fb:
new_offset = device["offset"] + real_temp - device["temperature"]
offset_changed = new_offset != device["offset"]
logging.info(f"Update offset from {device['offset']} to {new_offset} (changed: {offset_changed})")
else:
new_offset = device.get_offset() + real_temp - device.get_temperature()
offset_changed = new_offset != device.get_offset()
logging.info(f"Update offset from {device.get_offset()} to {new_offset} (changed: {offset_changed})")
if offset_changed:
if self.old_fb:
device["offset"] = new_offset
else:
device.set_offset(new_offset)
self.set_offset(device)
self.update_time[device_name] = datetime.now()