329 lines
12 KiB
Python
Executable File
329 lines
12 KiB
Python
Executable File
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from asyncio import Task
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Union
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
from device import Device
|
|
|
|
|
|
class FritzBox:
|
|
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False,
|
|
verify_ssl: bool = True, old_fb: bool = False) -> None:
|
|
self._endpoints = {
|
|
"login": "login_sid.lua?version=2",
|
|
"logout": "index.lua",
|
|
"data": "data.lua",
|
|
"device_details": "net/home_auto_hkr_edit.lua"
|
|
}
|
|
self.url: str = url
|
|
self.dry_run: bool = dry_run
|
|
self.user: Optional[str] = user
|
|
self.session: requests.Session = requests.Session()
|
|
self.password: str = password
|
|
self.sid: Optional[str] = None
|
|
self.update_timeout: int = update_timeout
|
|
self.update_time: Dict[str, datetime] = {}
|
|
self.hold_connection: Optional[Task] = None
|
|
self.verify_ssl = verify_ssl
|
|
self.old_fb = old_fb
|
|
|
|
if not verify_ssl:
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
async def hold_connection_alive(self) -> None:
|
|
while True:
|
|
# Session automatically destroyed after 20m of inactivity
|
|
# according to the manual
|
|
await asyncio.sleep(19 * 60)
|
|
self.check_session()
|
|
|
|
def _calc_challenge_v2(self, challenge: str) -> str:
|
|
|
|
logging.debug(f"Calculate v2 challenge: {challenge}")
|
|
chall_regex = re.compile(
|
|
"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
|
|
|
chall_parts = chall_regex.match(challenge).groupdict()
|
|
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
|
|
iter1: int = int(chall_parts["iter1"])
|
|
salt2: bytes = bytes.fromhex(chall_parts["salt2"])
|
|
iter2: int = int(chall_parts["iter2"])
|
|
|
|
hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1)
|
|
response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex()
|
|
|
|
return response
|
|
|
|
def _calc_challenge_v1(self, challenge: str) -> str:
|
|
""" Calculate the response for a challenge using legacy MD5 """
|
|
logging.debug(f"Calculate v1 challenge: {challenge}")
|
|
response = f"{challenge}-{self.password}"
|
|
response = response.encode("utf_16_le")
|
|
response = challenge + "-" + hashlib.md5(response).hexdigest()
|
|
return response
|
|
|
|
def check_session(self) -> None:
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"page": "overview",
|
|
"xhrId": "first",
|
|
"noMenuRef": 1
|
|
}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
if len(r.history) > 0:
|
|
if not self.login():
|
|
logging.error("Failed to login to Fritz!Box")
|
|
else:
|
|
logging.debug("Already logged in")
|
|
|
|
def login(self) -> bool:
|
|
logging.info(f"Login user {self.user} to Fritz!Box")
|
|
challenge = None
|
|
r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl)
|
|
xml = ET.fromstring(r.text)
|
|
for elem in xml:
|
|
if elem.tag == "SID":
|
|
self.sid = elem.text
|
|
elif elem.tag == "Challenge":
|
|
challenge = elem.text
|
|
elif self.user is None and elem.tag == "Users":
|
|
for user_elem in elem:
|
|
if "fritz" in user_elem.text:
|
|
self.user = user_elem.text
|
|
|
|
assert challenge is not None and self.user is not None
|
|
|
|
if challenge.startswith("2$"):
|
|
response = self._calc_challenge_v2(challenge)
|
|
else:
|
|
response = self._calc_challenge_v1(challenge)
|
|
|
|
data = {
|
|
"username": self.user,
|
|
"response": response
|
|
}
|
|
|
|
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl)
|
|
logging.debug(r.text)
|
|
xml = ET.fromstring(r.text)
|
|
for elem in xml:
|
|
if elem.tag == "SID":
|
|
self.sid = elem.text
|
|
|
|
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
|
|
if len(self.sid) != self.sid.count("0"):
|
|
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
|
|
return len(self.sid) != self.sid.count("0")
|
|
|
|
def logout(self) -> bool:
|
|
logging.info("logout")
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"logout": 1,
|
|
"no_sidrenew": ""}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl)
|
|
if self.hold_connection is not None:
|
|
self.hold_connection.cancel()
|
|
|
|
return r.status_code == 200
|
|
|
|
def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]:
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"page": "sh",
|
|
"xhrId": "all"
|
|
}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
logging.debug(r.text[:100])
|
|
if len(r.history) > 0:
|
|
if self.login():
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
else:
|
|
return None
|
|
|
|
devices_overview_raw = BeautifulSoup(r.text, "html.parser")
|
|
table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table
|
|
|
|
devices_raw = []
|
|
|
|
for r in table_rows.find_all("tr"):
|
|
name = r.findAll("td", {'class': ['name', 'cut_overflow']})
|
|
button = r.findAll("td", {'class': 'btncolumn'})
|
|
temperature = r.findAll("td", {'class': 'temperature'})
|
|
|
|
if name is None or len(name) < 1:
|
|
continue
|
|
if button is None or len(button) < 1:
|
|
continue
|
|
if temperature is None or len(temperature) < 1:
|
|
continue
|
|
|
|
name = name[0].string
|
|
id = int(button[0].button["value"])
|
|
print("Temperature raw:", temperature)
|
|
try:
|
|
temperature = float(temperature[0].text.split(" ")[0].replace(",", "."))
|
|
except:
|
|
print("Error parsing temperature.")
|
|
return []
|
|
|
|
request_data = {
|
|
"device": id,
|
|
"sid": self.sid,
|
|
"xhr": 1
|
|
}
|
|
|
|
r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data,
|
|
verify=self.verify_ssl)
|
|
|
|
device_content_raw = BeautifulSoup(r.text, "html.parser")
|
|
offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"])
|
|
devices_raw.append({
|
|
"name": name,
|
|
"display_name": name,
|
|
"id": id,
|
|
"temperature": temperature,
|
|
"offset": offset
|
|
})
|
|
|
|
return devices_raw
|
|
|
|
def list_devices(self) -> Optional[List[Device]]:
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"page": "sh_dev",
|
|
"xhrId": "all"
|
|
}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
logging.debug(r.text[:100])
|
|
if len(r.history) > 0:
|
|
if self.login():
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
else:
|
|
return None
|
|
devices: List[Device] = []
|
|
for device in json.loads(r.text)["data"]["devices"]:
|
|
devices.append(Device(device))
|
|
|
|
return devices
|
|
|
|
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
|
|
if idx is None and name is None:
|
|
logging.debug("No id or name given")
|
|
return None
|
|
if self.old_fb:
|
|
devices = self.list_devices_old()
|
|
else:
|
|
devices = self.list_devices()
|
|
device = None
|
|
for device in devices:
|
|
if isinstance(device, dict):
|
|
if device["id"] == idx or device["display_name"] == name:
|
|
break
|
|
else:
|
|
if device.id == idx or device.display_name == name:
|
|
break
|
|
device = None
|
|
|
|
if device is None:
|
|
logging.debug(f"Device {idx} {name} not found")
|
|
return None
|
|
|
|
return device
|
|
|
|
def set_offset(self, device: Union[Device, Dict]) -> None:
|
|
if self.dry_run:
|
|
logging.warning("No updates in dry-run-mode")
|
|
return
|
|
|
|
old_type = isinstance(device, dict)
|
|
if old_type:
|
|
device_id = device["id"]
|
|
else:
|
|
device_id = device.id
|
|
|
|
if not old_type:
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"device": device_id,
|
|
"page": "home_auto_hkr_edit"
|
|
}
|
|
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"view": "",
|
|
"back_to_page": "sh_dev",
|
|
"apply": "",
|
|
"oldpage": "/net/home_auto_hkr_edit.lua"
|
|
}
|
|
data.update(device.to_web_data())
|
|
|
|
else:
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"no_sidrenew": "",
|
|
"device": device_id,
|
|
"view": "",
|
|
"back_to_page": "/net/home_auto_overview.lua",
|
|
"ule_device_name": device["name"],
|
|
"Offset": device["offset"],
|
|
"apply": "",
|
|
"oldpage": "/net/home_auto_hkr_edit.lua",
|
|
}
|
|
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
|
|
|
|
def correct_offset(self, device_name: str, real_temp: float):
|
|
elapsed = None
|
|
if device_name in self.update_time.keys():
|
|
elapsed = datetime.now() - self.update_time[device_name]
|
|
logging.info(f"Last update for {device_name} {elapsed} ago")
|
|
delta = timedelta(minutes=self.update_timeout)
|
|
if device_name not in self.update_time.keys() or elapsed > delta:
|
|
device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name)
|
|
logging.info(f"device: {device}")
|
|
if device is None:
|
|
return
|
|
|
|
offset_changed = False
|
|
if self.old_fb:
|
|
new_offset = device["offset"] + real_temp - device["temperature"]
|
|
offset_changed = new_offset != device["offset"]
|
|
logging.info(f"Update offset from {device['offset']} to {new_offset} (changed: {offset_changed})")
|
|
else:
|
|
new_offset = device.get_offset() + real_temp - device.get_temperature()
|
|
offset_changed = new_offset != device.get_offset()
|
|
logging.info(f"Update offset from {device.get_offset()} to {new_offset} (changed: {offset_changed})")
|
|
|
|
if offset_changed:
|
|
if self.old_fb:
|
|
device["offset"] = new_offset
|
|
else:
|
|
device.set_offset(new_offset)
|
|
self.set_offset(device)
|
|
self.update_time[device_name] = datetime.now()
|