2026-01-25 02:37:07 +01:00

340 lines
12 KiB
Python
Executable File

from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import re
import xml.etree.ElementTree as ET
from asyncio import Task
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import httpx # type: ignore[import-not-found]
from device import Device
from errors import FritzBoxError
logger = logging.getLogger(__name__)
class FritzBox:
def __init__(
self,
url: str,
password: str,
update_timeout: int,
user: Optional[str] = None,
dry_run: bool = False,
force_ipv4: bool = False,
request_timeout: int = 10,
request_retries: int = 2,
) -> None:
self._endpoints = {
"login": "login_sid.lua?version=2",
"logout": "index.lua",
"data": "data.lua",
}
self.url: str = url
self.dry_run: bool = dry_run
self.user: Optional[str] = user
if force_ipv4:
transport = httpx.AsyncHTTPTransport(local_address="0.0.0.0")
else:
transport = None
self.session: httpx.AsyncClient = httpx.AsyncClient(transport=transport)
self.password: str = password
self.sid: Optional[str] = None
self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task[None]] = None
self.request_timeout: int = request_timeout
self.request_retries: int = request_retries
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
# according to the manual
await asyncio.sleep(19 * 60)
await self.check_session()
async def _request(
self,
method: str,
url: str,
data: Optional[Dict[str, Any]] = None,
follow_redirects: bool = True,
) -> Optional[Any]:
for attempt in range(self.request_retries + 1):
try:
return await self.session.request(
method,
url,
data=data,
timeout=self.request_timeout,
follow_redirects=follow_redirects,
)
except Exception as exc:
if attempt >= self.request_retries:
logger.error("Request failed (%s %s): %s", method, url, exc)
return None
backoff = 2**attempt
logger.warning(
"Request failed (%s %s), retry in %ss: %s",
method,
url,
backoff,
exc,
)
await asyncio.sleep(backoff)
return None
def _calc_challenge_v2(self, challenge: str) -> str:
logger.debug("Calculate v2 challenge")
chall_regex = re.compile(
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)"
)
chall_match = chall_regex.match(challenge)
if not chall_match:
logger.error("Invalid Fritz!Box challenge format")
return ""
chall_parts = chall_match.groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
iter1: int = int(chall_parts["iter1"])
salt2: bytes = bytes.fromhex(chall_parts["salt2"])
iter2: int = int(chall_parts["iter2"])
hash1 = hashlib.pbkdf2_hmac("sha256", self.password.encode(), salt1, iter1)
response = (
salt2.hex() + "$" + hashlib.pbkdf2_hmac("sha256", hash1, salt2, iter2).hex()
)
return response
def _calc_challenge_v1(self, challenge: str) -> str:
"""Calculate the response for a challenge using legacy MD5"""
logger.debug("Calculate v1 challenge")
response = f"{challenge}-{self.password}"
response_bytes = response.encode("utf_16_le")
return challenge + "-" + hashlib.md5(response_bytes).hexdigest()
async def check_session(self) -> None:
data: Dict[str, Any] = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1,
}
r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
if r is None:
return
if len(r.history) > 0:
if not await self.login():
logger.error("Failed to login to Fritz!Box")
else:
logger.debug("Already logged in")
async def login(self) -> bool:
logger.info("Login user %s to Fritz!Box", self.user)
challenge = None
login_url = f"{self.url}/{self._endpoints['login']}"
r = await self._request("GET", login_url, follow_redirects=False)
if r is None:
return False
if getattr(r, "status_code", 200) in {301, 302, 303, 307, 308}:
location = getattr(r, "headers", {}).get("Location")
if location:
retry_url, new_base = self._normalize_login_redirect(location)
logger.debug("Login redirect to %s, retry %s", location, retry_url)
r = await self._request("GET", retry_url, follow_redirects=False)
if r is None:
return False
if new_base:
self.url = new_base
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
elif elem.tag == "Challenge":
challenge = elem.text
elif self.user is None and elem.tag == "Users":
for user_elem in elem:
if user_elem.text and "fritz" in user_elem.text:
self.user = user_elem.text
if challenge is None or self.user is None:
raise FritzBoxError("Missing Fritz!Box login challenge or user")
if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge)
else:
response = self._calc_challenge_v1(challenge)
if not response:
raise FritzBoxError("Failed to compute Fritz!Box login response")
data = {"username": self.user, "response": response}
r = await self._request("POST", f"{self.url}/{self._endpoints['login']}", data)
if r is None:
return False
logger.debug("Login response received")
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
sid = self.sid or ""
logger.info("Authenticated Fritz!Box: %s", len(sid) != sid.count("0"))
if len(sid) != sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
return len(sid) != sid.count("0")
async def logout(self) -> bool:
logger.info("Logout from Fritz!Box")
data: Dict[str, Any] = {
"xhr": 1,
"sid": self.sid,
"logout": 1,
"no_sidrenew": "",
}
r = await self._request("POST", f"{self.url}/{self._endpoints['logout']}", data)
if r is None:
return False
if self.hold_connection is not None:
self.hold_connection.cancel()
return r.status_code == 200
async def list_devices(self) -> Optional[List[Device]]:
data: Dict[str, Any] = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh_dev",
"xhrId": "all",
}
r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
if r is None:
return None
logger.debug("Devices response received")
if len(r.history) > 0:
if await self.login():
r = await self._request(
"POST", f"{self.url}/{self._endpoints['data']}", data
)
if r is None:
return None
else:
return None
devices: List[Device] = []
for device in json.loads(r.text)["data"]["devices"]:
devices.append(Device(device))
return devices
async def get_device_data(
self, idx: Optional[int] = None, name: Optional[str] = None
) -> Optional[Device]:
if idx is None and name is None:
logger.debug("No id or name given")
return None
devices = await self.list_devices()
if devices is None:
return None
found_device: Optional[Device] = None
for candidate in devices:
if candidate.id == idx or candidate.display_name == name:
found_device = candidate
break
if found_device is None:
logger.debug("Device not found for id=%s name=%s", idx, name)
return None
return found_device
async def set_offset(self, device: Device) -> None:
if self.dry_run:
logger.warning("Dry-run enabled, skipping update")
return
data: Dict[str, Any] = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device.id,
"page": "home_auto_hkr_edit",
}
await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"view": "",
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua",
}
data.update(device.to_web_data())
await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
async def correct_offset(self, device_name: str, real_temp: float) -> None:
elapsed = None
if device_name in self.update_time.keys():
elapsed = datetime.now() - self.update_time[device_name]
logger.debug("Last update for %s %s ago", device_name, elapsed)
delta = timedelta(minutes=self.update_timeout)
if (
device_name not in self.update_time.keys()
or elapsed is None
or elapsed > delta
):
device: Optional[Device] = await self.get_device_data(name=device_name)
if device is None:
return
current_offset = device.get_offset()
current_temp = device.get_temperature()
if current_offset is None or current_temp is None:
logger.warning(
"Skipping offset update for %s: missing temperature data",
device_name,
)
return
new_offset = current_offset + real_temp - current_temp
logger.info(
"Update offset for %s from %.2f to %.2f",
device_name,
current_offset,
new_offset,
)
device.set_offset(new_offset)
await self.set_offset(device)
update_key = device.display_name or device_name
self.update_time[update_key] = datetime.now()
else:
logger.debug(
"Skip offset update for %s: last update %s ago (min %s)",
device_name,
elapsed,
delta,
)
async def close(self) -> None:
await self.session.aclose()
def _normalize_login_redirect(self, location: str) -> tuple[str, Optional[str]]:
parsed_location = urlparse(location)
if parsed_location.scheme and parsed_location.netloc:
base = f"{parsed_location.scheme}://{parsed_location.netloc}"
else:
parsed_base = urlparse(self.url)
base = f"{parsed_base.scheme}://{parsed_base.netloc}"
if location.endswith(self._endpoints["login"]):
return location, base
return f"{base}/{self._endpoints['login']}", base