212 lines
7.6 KiB
Python
Executable File

import asyncio
from asyncio import Task
from datetime import datetime, timedelta
from typing import Optional, Tuple, Dict
import requests
import json
import re
import hashlib
import xml.etree.ElementTree as ET
import logging
class FritzBox:
def __init__(self, url: str, password: str, user: str = None) -> None:
self._endpoints = {
"login": "login_sid.lua?version=2",
"logout": "index.lua",
"data": "data.lua"
}
self.url: str = url
self.user: Optional[str] = user
self.session: requests.Session = requests.Session()
self.password: str = password
self.sid: Optional[str] = None
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
await asyncio.sleep(19*60)
self.check_session()
def _calc_challenge_v2(self, challenge: str) -> str:
logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile(
"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
iter1: int = int(chall_parts["iter1"])
salt2: bytes = bytes.fromhex(chall_parts["salt2"])
iter2: int = int(chall_parts["iter2"])
hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1)
response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex()
return response
def _calc_challenge_v1(self, challenge: str) -> str:
""" Calculate the response for a challenge using legacy MD5 """
logging.debug(f"Calculate v1 challenge: {challenge}")
response = f"{challenge}-{self.password}"
response = response.encode("utf_16_le")
response = challenge + "-" + hashlib.md5(response).hexdigest()
return response
def check_session(self) -> None:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
if len(r.history) > 0:
if not self.login():
logging.error("Failed to login to Fritz!Box")
else:
logging.info("Already logged in")
def login(self, user: str = None) -> bool:
logging.info(f"Login user {user} to Fritz!Box")
challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
elif elem.tag == "Challenge":
challenge = elem.text
elif user is None and elem.tag == "Users":
for user_elem in elem:
if "fritz" in user_elem.text:
user = user_elem.text
assert challenge is not None and user is not None
if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge)
else:
response = self._calc_challenge_v1(challenge)
data = {
"username": user,
"response": response
}
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data)
logging.debug(r.text)
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
if len(self.sid) != self.sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
return len(self.sid) != self.sid.count("0")
def logout(self) -> bool:
logging.info("logout")
data = {
"xhr": 1,
"sid": self.sid,
"logout": 1,
"no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
if self.hold_connection is not None:
self.hold_connection.cancel()
return r.status_code == 200
def list_devices(self) -> Optional[Dict]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh_dev",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
else:
return None
devices = json.loads(r.text)["data"]["devices"]
return devices
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
if id is None and name is None:
logging.debug("No id or name given")
return None
devices = self.list_devices()
device = None
for device in devices:
if device["id"] == id or device["displayName"] == name:
break
device = None
if device is None:
logging.debug(f"Device {id} {name} not found")
return None
current_temp = None
current_offset = None
for unit in device["units"]:
if unit["type"] == "TEMPERATURE_SENSOR":
for skill in unit["skills"]:
if skill["type"] == "SmartHomeTemperatureSensor":
current_temp = float(skill["currentInCelsius"])
current_offset = float(skill["offset"])
return current_temp, current_offset, device["id"], device["displayName"]
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str) -> None:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"view": "",
"back_to_page": "sh_dev",
"ule_device_name": device_name,
"WindowOpenTrigger": 8,
"WindowOpenTimer": 10,
"tempsensor": "own",
"Roomtemp": f"{current_temp}",
"ExtTempsensorID": "tochoose",
"Offset": f"{offset}",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
def correct_offset(self, device_name: str, real_temp: float):
if device_name in self.update_time.keys():
logging.info(f"Last update for {device_name} {datetime.now() - self.update_time[device_name]} ago")
delta = timedelta(minutes=5)
if device_name not in self.update_time.keys() or (datetime.now() - self.update_time[device_name]) > delta:
current_temp, current_offset, idx, name = self.get_device_data(name=device_name)
new_offset = current_offset + real_temp - current_temp
logging.info(f"Should update offset from {current_offset} to {new_offset}")
self.update_time[device_name] = datetime.now()