167 lines
5.7 KiB
Python
Executable File
167 lines
5.7 KiB
Python
Executable File
from typing import Optional, Tuple
|
|
import requests
|
|
import json
|
|
import re
|
|
import hashlib
|
|
import xml.etree.ElementTree as ET
|
|
import logging
|
|
|
|
class FritzBox:
|
|
def __init__(self, url:str, password:str, user:str = None) -> None:
|
|
self._endpoints = {
|
|
"login": "login_sid.lua?version=2",
|
|
"logout": "index.lua",
|
|
"data": "data.lua"
|
|
}
|
|
self.url = url
|
|
self.session = requests.Session()
|
|
self.password = password
|
|
self.sid = None
|
|
|
|
def _calc_challenge_v2(self, challenge: str) -> str:
|
|
|
|
logging.debug(f"Calculate v2 challenge: {challenge}")
|
|
chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
|
|
|
chall_parts = chall_regex.match(challenge).groupdict()
|
|
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
|
|
iter1: int = int(chall_parts["iter1"])
|
|
salt2: bytes = bytes.fromhex(chall_parts["salt2"])
|
|
iter2: int = int(chall_parts["iter2"])
|
|
|
|
hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1)
|
|
response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex()
|
|
|
|
return response
|
|
|
|
def _calc_challenge_v1(self, challenge: str) -> str:
|
|
""" Calculate the response for a challenge using legacy MD5 """
|
|
logging.debug(f"Calculate v1 challenge: {challenge}")
|
|
response = f"{challenge}-{self.password}"
|
|
response = response.encode("utf_16_le")
|
|
response = challenge + "-" + hashlib.md5(response).hexdigest()
|
|
return response
|
|
|
|
def login(self, user:str = None) -> bool:
|
|
logging.debug(f"login user {user}")
|
|
challenge = None
|
|
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
|
|
xml = ET.fromstring(r.text)
|
|
for elem in xml:
|
|
if elem.tag == "SID":
|
|
self.sid = elem.text
|
|
elif elem.tag == "Challenge":
|
|
challenge = elem.text
|
|
elif user is None and elem.tag == "Users":
|
|
for user_elem in elem:
|
|
if "fritz" in user_elem.text:
|
|
user = user_elem.text
|
|
|
|
assert challenge is not None and user is not None
|
|
|
|
if challenge.startswith("2$"):
|
|
response = self._calc_challenge_v2(challenge)
|
|
else:
|
|
response = self._calc_challenge_v1(challenge)
|
|
|
|
data = {
|
|
"username": user,
|
|
"response": response
|
|
}
|
|
|
|
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data)
|
|
logging.debug(r.text)
|
|
xml = ET.fromstring(r.text)
|
|
for elem in xml:
|
|
if elem.tag == "SID":
|
|
self.sid = elem.text
|
|
|
|
logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}")
|
|
return len(self.sid) != self.sid.count("0")
|
|
|
|
def logout(self) -> bool:
|
|
logging.debug("logout")
|
|
data = {
|
|
"xhr":1,
|
|
"sid": self.sid,
|
|
"logout": 1,
|
|
"no_sidrenew":""}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
|
|
|
|
return r.status_code == 200
|
|
|
|
def list_devices(self):
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"page":"sh_dev",
|
|
"xhrId": "all"
|
|
}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
|
logging.debug(r.text[:100])
|
|
if len(r.history) > 0:
|
|
if self.login():
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
|
else:
|
|
return None
|
|
devices = json.loads(r.text)["data"]["devices"]
|
|
|
|
return devices
|
|
|
|
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
|
|
if id is None and name is None:
|
|
logging.debug("No id or name given")
|
|
return None
|
|
|
|
devices = self.list_devices()
|
|
for device in devices:
|
|
if device["id"] == id or device["displayName"] == name:
|
|
break
|
|
device = None
|
|
|
|
if device is None:
|
|
logging.debug(f"Device {id} {name} not found")
|
|
return None
|
|
|
|
current_temp = None
|
|
current_offset = None
|
|
for unit in device["units"]:
|
|
if unit["type"] == "TEMPERATURE_SENSOR":
|
|
for skill in unit["skills"]:
|
|
if skill["type"] == "SmartHomeTemperatureSensor":
|
|
current_temp = float(skill["currentInCelsius"])
|
|
current_offset = float(skill["offset"])
|
|
|
|
return current_temp, current_offset, device["id"], device["displayName"]
|
|
|
|
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str):
|
|
data = {
|
|
"xhr": 1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"device": device_id,
|
|
"page": "home_auto_hkr_edit"
|
|
}
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
|
|
|
data = {
|
|
"xhr":1,
|
|
"sid": self.sid,
|
|
"lang": "de",
|
|
"device": device_id,
|
|
"view": "",
|
|
"back_to_page": "sh_dev",
|
|
"ule_device_name": device_name,
|
|
"WindowOpenTrigger":8,
|
|
"WindowOpenTimer":10,
|
|
"tempsensor": "own",
|
|
"Roomtemp": f"{current_temp}",
|
|
"ExtTempsensorID":"tochoose",
|
|
"Offset": f"{offset}",
|
|
"apply":"",
|
|
"oldpage":"/net/home_auto_hkr_edit.lua"
|
|
}
|
|
|
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|