Improve stability etc
This commit is contained in:
parent
5aa94a3c97
commit
4aaddf23e0
2
.gitignore
vendored
2
.gitignore
vendored
@ -244,3 +244,5 @@ fabric.properties
|
||||
# local config test
|
||||
options.json
|
||||
.DS_Store
|
||||
|
||||
options.json
|
||||
@ -2,7 +2,7 @@ ARG BUILD_FROM
|
||||
FROM $BUILD_FROM
|
||||
|
||||
# Install requirements for add-on
|
||||
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests
|
||||
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-httpx
|
||||
|
||||
WORKDIR /data
|
||||
|
||||
|
||||
56
fritz_temp_sync/README.md
Normal file
56
fritz_temp_sync/README.md
Normal file
@ -0,0 +1,56 @@
|
||||
# Fritz!Box Temperature Sync
|
||||
|
||||
Dieses Add-on synchronisiert die gemessene Temperatur von Home-Assistant-Sensoren mit Fritz!DECT-Thermostaten, indem die Offset-Korrektur an der Fritz!Box angepasst wird.
|
||||
|
||||
## Voraussetzungen
|
||||
- Home Assistant Add-on Umgebung
|
||||
- Fritz!Box mit Fritz!DECT-Thermostaten
|
||||
- Home Assistant Entitäten für Thermostate und Temperatursensoren
|
||||
|
||||
## Konfiguration
|
||||
Die Optionen werden über das Add-on-Formular oder die `options.json` gesetzt.
|
||||
|
||||
### Optionen
|
||||
- `fritzbox.url` (URL, erforderlich): Basis-URL der Fritz!Box, z. B. `http://fritz.box`
|
||||
- `fritzbox.username` (string, optional): Benutzername
|
||||
- `fritzbox.password` (string, erforderlich): Passwort
|
||||
- `mappings` (Liste, erforderlich): Zuordnung von Sensor → Thermostat
|
||||
- `update_timeout` (int, erforderlich): Mindestabstand in Minuten zwischen Offset-Updates pro Thermostat
|
||||
- `log_level` (string, optional): z. B. `DEBUG`, `INFO`, `WARNING`
|
||||
- `offset_threshold` (float, optional): Mindestabweichung in °C, ab der ein Offset gesetzt wird (Default 0.5)
|
||||
- `dry_run` (bool, optional): Wenn `true`, werden keine Änderungen an der Fritz!Box vorgenommen
|
||||
|
||||
### Beispiel
|
||||
```json
|
||||
{
|
||||
"fritzbox": {
|
||||
"url": "http://fritz.box",
|
||||
"username": "ha",
|
||||
"password": "***"
|
||||
},
|
||||
"mappings": [
|
||||
{
|
||||
"sensor": "sensor.room_temperature",
|
||||
"thermostate": "climate.room"
|
||||
}
|
||||
],
|
||||
"update_timeout": 15,
|
||||
"log_level": "INFO",
|
||||
"offset_threshold": 0.5,
|
||||
"dry_run": false
|
||||
}
|
||||
```
|
||||
|
||||
## Verhalten
|
||||
- Bei jedem relevanten `state_changed`-Event wird die Differenz zwischen Thermostat- und Sensorwert geprüft.
|
||||
- Erst ab `offset_threshold` erfolgt eine Offset-Korrektur.
|
||||
- Pro Thermostat wird maximal alle `update_timeout` Minuten ein Update durchgeführt.
|
||||
|
||||
## Logging
|
||||
- `INFO`: Start/Stop, erfolgreiche Authentifizierung, Offset-Korrekturen
|
||||
- `WARNING`: Verbindungsprobleme, Retries, Dry-Run-Hinweise
|
||||
- `DEBUG`: Detailzustände und Entscheidungslogik
|
||||
|
||||
## Hinweise
|
||||
- Das Add-on schreibt keine Payload-Snapshots mehr auf die Platte.
|
||||
- Die Zuordnung erfolgt über die Entity-ID-Namen aus Home Assistant.
|
||||
145
fritz_temp_sync/config.py
Normal file
145
fritz_temp_sync/config.py
Normal file
@ -0,0 +1,145 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Iterable, Optional, cast
|
||||
|
||||
from errors import ConfigError
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Mapping:
|
||||
sensor: str
|
||||
thermostate: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FritzBoxConfig:
|
||||
url: str
|
||||
password: str
|
||||
username: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AppConfig:
|
||||
fritzbox: FritzBoxConfig
|
||||
mappings: list[Mapping]
|
||||
update_timeout: int
|
||||
log_level: str = "INFO"
|
||||
offset_threshold: float = 0.5
|
||||
dry_run: bool = False
|
||||
force_ipv4: bool = True
|
||||
log_ws_messages: bool = False
|
||||
log_http_requests: bool = False
|
||||
request_timeout: int = 10
|
||||
request_retries: int = 2
|
||||
|
||||
|
||||
def load_config(path: str) -> AppConfig:
|
||||
with open(path, "r", encoding="utf-8") as config_file:
|
||||
raw = json.load(config_file)
|
||||
|
||||
if not isinstance(raw, dict):
|
||||
raise ConfigError("Config must be a JSON object")
|
||||
raw = cast(dict[str, Any], raw)
|
||||
|
||||
fritzbox = raw.get("fritzbox")
|
||||
if not isinstance(fritzbox, dict):
|
||||
raise ConfigError("Missing or invalid 'fritzbox' config")
|
||||
fritzbox = cast(dict[str, Any], fritzbox)
|
||||
|
||||
url = fritzbox.get("url")
|
||||
password = fritzbox.get("password")
|
||||
username = fritzbox.get("username")
|
||||
|
||||
if not isinstance(url, str) or not url:
|
||||
raise ConfigError("Missing or invalid 'fritzbox.url' config")
|
||||
if not isinstance(password, str) or not password:
|
||||
raise ConfigError("Missing or invalid 'fritzbox.password' config")
|
||||
if username is not None and not isinstance(username, str):
|
||||
raise ConfigError("Invalid 'fritzbox.username' config")
|
||||
|
||||
mappings = _parse_mappings(raw.get("mappings"))
|
||||
|
||||
update_timeout = raw.get("update_timeout")
|
||||
if not isinstance(update_timeout, int):
|
||||
raise ConfigError("Missing or invalid 'update_timeout' config")
|
||||
|
||||
log_level = raw.get("log_level", "INFO")
|
||||
if not isinstance(log_level, str):
|
||||
raise ConfigError("Invalid 'log_level' config")
|
||||
|
||||
offset_threshold = raw.get("offset_threshold", 0.5)
|
||||
if not isinstance(offset_threshold, (int, float)):
|
||||
raise ConfigError("Invalid 'offset_threshold' config")
|
||||
|
||||
dry_run = raw.get("dry_run", False)
|
||||
if not isinstance(dry_run, bool):
|
||||
raise ConfigError("Invalid 'dry_run' config")
|
||||
|
||||
force_ipv4 = raw.get("force_ipv4", True)
|
||||
if not isinstance(force_ipv4, bool):
|
||||
raise ConfigError("Invalid 'force_ipv4' config")
|
||||
|
||||
log_ws_messages = raw.get("log_ws_messages", False)
|
||||
if not isinstance(log_ws_messages, bool):
|
||||
raise ConfigError("Invalid 'log_ws_messages' config")
|
||||
|
||||
log_http_requests = raw.get("log_http_requests", False)
|
||||
if not isinstance(log_http_requests, bool):
|
||||
raise ConfigError("Invalid 'log_http_requests' config")
|
||||
|
||||
request_timeout = raw.get("request_timeout", 10)
|
||||
if not isinstance(request_timeout, int):
|
||||
raise ConfigError("Invalid 'request_timeout' config")
|
||||
|
||||
request_retries = raw.get("request_retries", 2)
|
||||
if not isinstance(request_retries, int):
|
||||
raise ConfigError("Invalid 'request_retries' config")
|
||||
|
||||
return AppConfig(
|
||||
fritzbox=FritzBoxConfig(url=url, password=password, username=username),
|
||||
mappings=mappings,
|
||||
update_timeout=update_timeout,
|
||||
log_level=log_level,
|
||||
offset_threshold=float(offset_threshold),
|
||||
dry_run=dry_run,
|
||||
force_ipv4=force_ipv4,
|
||||
log_ws_messages=log_ws_messages,
|
||||
log_http_requests=log_http_requests,
|
||||
request_timeout=request_timeout,
|
||||
request_retries=request_retries,
|
||||
)
|
||||
|
||||
|
||||
def _parse_mappings(value: Any) -> list[Mapping]:
|
||||
if not isinstance(value, list):
|
||||
raise ConfigError("Missing or invalid 'mappings' config")
|
||||
value = cast(list[Any], value)
|
||||
mappings: list[Mapping] = []
|
||||
seen_pairs: set[tuple[str, str]] = set()
|
||||
seen_thermostats: set[str] = set()
|
||||
for idx, entry in enumerate(_iter_dicts(value)):
|
||||
sensor = entry.get("sensor")
|
||||
thermostate = entry.get("thermostate")
|
||||
if not isinstance(sensor, str) or not isinstance(thermostate, str):
|
||||
raise ConfigError(f"Invalid mapping at index {idx}")
|
||||
if thermostate in seen_thermostats:
|
||||
raise ConfigError(
|
||||
f"Duplicate thermostate mapping at index {idx}: {thermostate}"
|
||||
)
|
||||
pair = (sensor, thermostate)
|
||||
if pair in seen_pairs:
|
||||
raise ConfigError(f"Duplicate mapping at index {idx}: {pair}")
|
||||
seen_thermostats.add(thermostate)
|
||||
seen_pairs.add(pair)
|
||||
mappings.append(Mapping(sensor=sensor, thermostate=thermostate))
|
||||
if not mappings:
|
||||
raise ConfigError("'mappings' config must not be empty")
|
||||
return mappings
|
||||
|
||||
|
||||
def _iter_dicts(items: Iterable[Any]) -> Iterable[dict[str, Any]]:
|
||||
for item in items:
|
||||
if isinstance(item, dict):
|
||||
yield item
|
||||
@ -1,9 +1,9 @@
|
||||
name: "Fritz!Box Temperature Sync Dev"
|
||||
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
|
||||
version: "0.4.3"
|
||||
version: "0.5"
|
||||
startup: "application"
|
||||
stage: "stable"
|
||||
slug: "fritz_temp_sync_dev"
|
||||
slug: "fritz_temp_sync"
|
||||
homeassistant_api: true
|
||||
init: false
|
||||
arch:
|
||||
@ -20,6 +20,14 @@ options:
|
||||
- sensor: null
|
||||
thermostate: null
|
||||
update_timeout: 15
|
||||
log_level: "INFO"
|
||||
offset_threshold: 0.5
|
||||
dry_run: false
|
||||
force_ipv4: true
|
||||
log_ws_messages: false
|
||||
log_http_requests: false
|
||||
request_timeout: 10
|
||||
request_retries: 2
|
||||
schema:
|
||||
fritzbox:
|
||||
url: url
|
||||
@ -30,3 +38,10 @@ schema:
|
||||
thermostate: str
|
||||
update_timeout: int
|
||||
log_level: "str?"
|
||||
offset_threshold: float?
|
||||
dry_run: bool?
|
||||
force_ipv4: bool?
|
||||
log_ws_messages: bool?
|
||||
log_http_requests: bool?
|
||||
request_timeout: int?
|
||||
request_retries: int?
|
||||
|
||||
@ -1,714 +1,276 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum, IntFlag, auto
|
||||
from typing import Dict, List, Optional, Union
|
||||
import typing
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, cast
|
||||
|
||||
|
||||
class WeekDay(IntFlag):
|
||||
MON = 0b1
|
||||
TUE = 0b10
|
||||
WED = 0b100
|
||||
THU = 0b1000
|
||||
FRI = 0b10000
|
||||
SAT = 0b100000
|
||||
SUN = 0b1000000
|
||||
@dataclass
|
||||
class TemperatureSensor:
|
||||
current_celsius: Optional[float]
|
||||
offset: Optional[float]
|
||||
|
||||
|
||||
class Manufacturer:
|
||||
def __init__(self, name: str):
|
||||
self.name: str = name
|
||||
@dataclass
|
||||
class Device:
|
||||
id: int
|
||||
display_name: str
|
||||
temp_sensor: TemperatureSensor
|
||||
raw_state: Dict[str, Any]
|
||||
|
||||
def __repr__(self):
|
||||
return f"name: {self.name}"
|
||||
|
||||
def to_json(self):
|
||||
return {"name": self.name}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(manufacturer: Dict):
|
||||
return Manufacturer(manufacturer["name"])
|
||||
|
||||
|
||||
class FirmwareVersion:
|
||||
def __init__(self, search: bool, current: str, update: bool, running: bool):
|
||||
self.search: bool = search
|
||||
self.current: str = current
|
||||
self.update: bool = update
|
||||
self.running: bool = running
|
||||
|
||||
def __repr__(self):
|
||||
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
|
||||
|
||||
def to_json(self):
|
||||
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(firmware_version: dict):
|
||||
return FirmwareVersion(firmware_version["search"],
|
||||
firmware_version["current"],
|
||||
firmware_version["update"],
|
||||
firmware_version["running"])
|
||||
|
||||
|
||||
class PushService:
|
||||
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
|
||||
self.mail_address: str = mail_address
|
||||
self.unit_settings: List = unit_settings
|
||||
self.is_enabled: bool = is_enabled
|
||||
|
||||
def __repr__(self):
|
||||
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
|
||||
f"isEnabled: {self.is_enabled}>"
|
||||
|
||||
def to_json(self):
|
||||
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(push_service: Dict):
|
||||
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
|
||||
|
||||
|
||||
class SkillType(Enum):
|
||||
SmartHomeTemperatureSensor = auto()
|
||||
SmartHomeThermostat = auto()
|
||||
SmartHomeBattery = auto()
|
||||
|
||||
|
||||
class Skill(ABC):
|
||||
def __init__(self, skill_type: SkillType):
|
||||
self.type: SkillType = skill_type
|
||||
|
||||
@abstractmethod
|
||||
def to_json(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def to_web_data(self, device_id: int):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
skill_type = SkillType[skill["type"]]
|
||||
if skill_type == SkillType.SmartHomeTemperatureSensor:
|
||||
return TemperatureSkill.parse_dict(skill)
|
||||
elif skill_type == SkillType.SmartHomeThermostat:
|
||||
return ThermostatSkill.parse_dict(skill)
|
||||
elif skill_type == SkillType.SmartHomeBattery:
|
||||
return BatterySkill.parse_dict(skill)
|
||||
def __init__(self, state: Dict[str, Any]):
|
||||
self.raw_state = state
|
||||
id_value = state.get("id")
|
||||
if isinstance(id_value, int):
|
||||
self.id = id_value
|
||||
else:
|
||||
raise NotImplementedError(skill_type)
|
||||
|
||||
|
||||
class PresetName(Enum):
|
||||
LOWER_TEMPERATURE = auto()
|
||||
UPPER_TEMPERATURE = auto()
|
||||
HOLIDAY_TEMPERATURE = auto()
|
||||
|
||||
|
||||
class Preset:
|
||||
def __init__(self, name: PresetName, temperature: Optional[int]):
|
||||
self.name: PresetName = name
|
||||
self.temperature: Optional[int] = temperature
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
|
||||
|
||||
def to_json(self):
|
||||
return {"name": self.name.name, "temperature": self.temperature}
|
||||
|
||||
|
||||
class Description:
|
||||
def __init__(self, action: str, preset_temperature: Optional[Preset]):
|
||||
self.action: str = action
|
||||
self.preset_temperature: Optional[Preset] = preset_temperature
|
||||
|
||||
def __repr__(self):
|
||||
desc = f"<Description action: {self.action}"
|
||||
if self.preset_temperature is not None:
|
||||
desc += f"; presetTemperaturet: {self.preset_temperature}"
|
||||
desc += " >"
|
||||
|
||||
return desc
|
||||
|
||||
def to_json(self):
|
||||
if self.preset_temperature is not None:
|
||||
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
|
||||
return {"action": self.action}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(description: Dict):
|
||||
preset: Optional[Preset] = None
|
||||
if "presetTemperature" in description.keys():
|
||||
preset = Preset(PresetName[description["presetTemperature"]["name"]],
|
||||
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
|
||||
return Description(description["action"], preset)
|
||||
|
||||
|
||||
class Repetition(Enum):
|
||||
NONE = auto()
|
||||
YEARLY = auto()
|
||||
|
||||
|
||||
class TimeSetting:
|
||||
def __init__(self, start_date: Optional[str], start_time: Optional[str],
|
||||
end_date: Optional[str] = None, end_time: Optional[str] = None,
|
||||
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
|
||||
self.start_date: Optional[str] = start_date
|
||||
self.start_time: Optional[str] = start_time
|
||||
self.end_date: Optional[str] = end_date
|
||||
self.end_time: Optional[str] = end_time
|
||||
self.repetition: Repetition = repetition
|
||||
self.day_of_week: Optional[WeekDay] = day_of_week
|
||||
|
||||
def __repr__(self):
|
||||
desc = f"<TimeSettings "
|
||||
if self.day_of_week is not None:
|
||||
desc += f"dayOfWeek: {self.day_of_week.name}; "
|
||||
if self.start_date is not None:
|
||||
desc += f"startDate: {self.start_date} "
|
||||
if self.start_time is not None:
|
||||
desc += f"startTime: {self.start_time} "
|
||||
if self.end_date is not None:
|
||||
desc += f"endDate: {self.end_date} "
|
||||
if self.end_time is not None:
|
||||
desc += f"endTime: {self.end_time} "
|
||||
if self.repetition != Repetition.NONE:
|
||||
desc += f"repetition: {self.repetition.name} "
|
||||
desc += ">"
|
||||
|
||||
return desc
|
||||
|
||||
def to_json(self): # ToDo: Return typehints for all to_json
|
||||
state = {}
|
||||
if self.start_date is not None:
|
||||
state["startDate"] = self.start_date
|
||||
if self.start_time is not None:
|
||||
state["startTime"] = self.start_time
|
||||
if self.end_date is not None:
|
||||
state["endDate"] = self.end_date
|
||||
if self.end_time is not None:
|
||||
state["endTime"] = self.end_time
|
||||
if self.repetition != Repetition.NONE:
|
||||
state["repetition"] = self.repetition.name
|
||||
if self.day_of_week is not None:
|
||||
state = {"dayOfWeek": self.day_of_week.name,
|
||||
"time": state}
|
||||
|
||||
return state
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(time_setting: Dict):
|
||||
start_date: Optional[str] = None
|
||||
start_time: Optional[str] = None
|
||||
end_date: Optional[str] = None
|
||||
end_time: Optional[str] = None
|
||||
day_of_week: Optional[WeekDay] = None
|
||||
repetition: Repetition = Repetition.NONE
|
||||
if "dayOfWeek" in time_setting.keys():
|
||||
day_of_week = WeekDay[time_setting["dayOfWeek"]]
|
||||
time_setting = time_setting["time"]
|
||||
if "startDate" in time_setting.keys():
|
||||
start_date = time_setting["startDate"]
|
||||
if "startTime" in time_setting.keys():
|
||||
start_time = time_setting["startTime"]
|
||||
if "endDate" in time_setting.keys():
|
||||
end_date = time_setting["endDate"]
|
||||
if "endTime" in time_setting.keys():
|
||||
end_time = time_setting["endTime"]
|
||||
if "repetition" in time_setting.keys():
|
||||
repetition = Repetition[time_setting["repetition"]]
|
||||
|
||||
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
|
||||
|
||||
|
||||
class Change:
|
||||
def __init__(self, description: Description, time_setting: TimeSetting):
|
||||
self.description: Description = description
|
||||
self.time_setting: TimeSetting = time_setting
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
|
||||
|
||||
def to_json(self):
|
||||
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(change: Dict):
|
||||
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
|
||||
|
||||
|
||||
class TemperatureDropDetection:
|
||||
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
|
||||
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
|
||||
self.sensitivity: int = sensitivity
|
||||
|
||||
def __repr__(self):
|
||||
# ToDo
|
||||
pass
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
|
||||
"sensitivity": self.sensitivity
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
return {
|
||||
"WindowOpenTrigger": self.sensitivity + 3,
|
||||
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(drop_detection: Dict):
|
||||
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
|
||||
drop_detection["sensitivity"])
|
||||
|
||||
|
||||
class ScheduleKind(Enum):
|
||||
REPETITIVE = auto()
|
||||
WEEKLY_TIMETABLE = auto()
|
||||
|
||||
|
||||
class ThermostatSkillMode(Enum):
|
||||
TARGET_TEMPERATURE = auto()
|
||||
|
||||
|
||||
class Action:
|
||||
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
|
||||
self.is_enabled: bool = is_enabled
|
||||
self.time_setting: TimeSetting = time_setting
|
||||
self.description: Description = description
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
|
||||
f"description: {self.description} >"
|
||||
|
||||
def to_json(self):
|
||||
return {"isEnabled": self.is_enabled,
|
||||
"timeSetting": self.time_setting.to_json(),
|
||||
"description": self.description.to_json()}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(action: Dict):
|
||||
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
|
||||
Description.parse_dict(action["description"]))
|
||||
|
||||
|
||||
class Schedule:
|
||||
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
|
||||
self.is_enabled: bool = is_enabled
|
||||
self.kind: ScheduleKind = kind
|
||||
self.name: str = name
|
||||
self.actions: List[Action] = actions
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
|
||||
f"name: {self.name}; actions: {self.actions}>"
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"isEnabled": self.is_enabled,
|
||||
"kind": self.kind.name,
|
||||
"name": self.name,
|
||||
"actions": [action.to_json() for action in self.actions]
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {}
|
||||
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
|
||||
enabled_count = 0
|
||||
for num, holiday in enumerate(self.actions):
|
||||
num += 1
|
||||
_, start_month, start_day = holiday.time_setting.start_date.split("-")
|
||||
_, end_month, end_day = holiday.time_setting.end_date.split("-")
|
||||
data.update({
|
||||
f"Holiday{num}StartDay": start_day,
|
||||
f"Holiday{num}StartMonth": start_month,
|
||||
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
|
||||
f"Holiday{num}EndDay": end_day,
|
||||
f"Holiday{num}EndMonth": end_month,
|
||||
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
|
||||
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
|
||||
f"Holiday{num}ID": num
|
||||
})
|
||||
if holiday.is_enabled:
|
||||
enabled_count += 1
|
||||
data["HolidayEnabledCount"] = enabled_count
|
||||
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
|
||||
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
|
||||
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
|
||||
data = {
|
||||
"SummerStartDay": start_day,
|
||||
"SummerStartMonth": start_month,
|
||||
"SummerEndDay": end_day,
|
||||
"SummerEndMonth": end_month,
|
||||
"SummerEnabled": 1 if self.is_enabled else 0
|
||||
}
|
||||
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
|
||||
timer_items = {}
|
||||
for action in self.actions:
|
||||
if not action.is_enabled:
|
||||
continue
|
||||
time = "".join(action.time_setting.start_time.split(":")[:-1])
|
||||
if time not in timer_items.keys():
|
||||
timer_items[time] = [0, 0]
|
||||
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
|
||||
days = timer_items[time][heat]
|
||||
days |= action.time_setting.day_of_week
|
||||
timer_items[time][heat] = days
|
||||
num = 0
|
||||
for key in timer_items.keys():
|
||||
if timer_items[key][0] != 0:
|
||||
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
|
||||
num += 1
|
||||
if timer_items[key][1] != 0:
|
||||
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
|
||||
num += 1
|
||||
self.id = -1
|
||||
name_value = state.get("displayName")
|
||||
if isinstance(name_value, str):
|
||||
self.display_name = name_value
|
||||
else:
|
||||
raise NotImplementedError(self.name)
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
|
||||
return Schedule(schedule["isEnabled"],
|
||||
ScheduleKind[schedule["kind"]],
|
||||
schedule["name"],
|
||||
[Action.parse_dict(action) for action in schedule["actions"]])
|
||||
|
||||
|
||||
class TimeControl:
|
||||
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
|
||||
self.is_enabled: bool = is_enabled
|
||||
self.time_schedules: List[Schedule] = time_schedules
|
||||
|
||||
def __repr__(self):
|
||||
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"isEnabled": self.is_enabled,
|
||||
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {}
|
||||
for schedule in self.time_schedules:
|
||||
data.update(schedule.to_web_data(device_id))
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(time_control: Dict):
|
||||
return TimeControl(time_control["isEnabled"],
|
||||
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
|
||||
|
||||
|
||||
class ThermostatSkill(Skill):
|
||||
def __init__(self, presets: List[Preset], next_change: Optional[Change],
|
||||
temperature_drop_detection: TemperatureDropDetection,
|
||||
target_temp: int, time_control: TimeControl,
|
||||
mode: ThermostatSkillMode, used_temp_sensor: Unit):
|
||||
super().__init__(SkillType.SmartHomeThermostat)
|
||||
self.presets: List[Preset] = presets
|
||||
self.next_change: Optional[Change] = next_change
|
||||
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
|
||||
self.target_temp: int = target_temp
|
||||
self.time_control: TimeControl = time_control
|
||||
self.mode: ThermostatSkillMode = mode
|
||||
self.used_temp_sensor: Unit = used_temp_sensor
|
||||
|
||||
def __repr__(self):
|
||||
# ToDo
|
||||
pass
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
upper = 0.0
|
||||
lower = 0.0
|
||||
for preset in self.presets:
|
||||
if preset.name == PresetName.UPPER_TEMPERATURE:
|
||||
upper = preset.temperature
|
||||
elif preset.name == PresetName.LOWER_TEMPERATURE:
|
||||
lower = preset.temperature
|
||||
|
||||
data = {
|
||||
"Heiztemp": upper,
|
||||
"Absenktemp": lower
|
||||
}
|
||||
|
||||
if device_id == self.used_temp_sensor.id:
|
||||
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
|
||||
else:
|
||||
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
|
||||
|
||||
data.update(self.time_control.to_web_data(device_id))
|
||||
|
||||
data.update(self.temperature_drop_detection.to_web_data(device_id))
|
||||
|
||||
return data
|
||||
|
||||
def to_json(self):
|
||||
state = {
|
||||
"type": self.type.name,
|
||||
"presets": [preset.to_json() for preset in self.presets],
|
||||
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
|
||||
"targetTemp": self.target_temp,
|
||||
"timeControl": self.time_control.to_json(),
|
||||
"mode": self.mode.name,
|
||||
"usedTempSensor": self.used_temp_sensor.to_json()
|
||||
}
|
||||
if self.next_change is not None:
|
||||
state["nextChange"] = self.next_change.to_json()
|
||||
|
||||
return state
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
return ThermostatSkill(
|
||||
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
|
||||
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
|
||||
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
|
||||
skill["targetTemp"],
|
||||
TimeControl.parse_dict(skill["timeControl"]),
|
||||
ThermostatSkillMode[skill["mode"]],
|
||||
Unit.parse_dict(skill["usedTempSensor"], None)
|
||||
self.display_name = ""
|
||||
self.temp_sensor = TemperatureSensor(
|
||||
current_celsius=self._get_temp_value("currentInCelsius"),
|
||||
offset=self._get_temp_value("offset"),
|
||||
)
|
||||
|
||||
|
||||
class TemperatureSkill(Skill):
|
||||
def __init__(self, offset: float, current_in_celsius: int):
|
||||
super().__init__(SkillType.SmartHomeTemperatureSensor)
|
||||
self.offset: float = offset
|
||||
self.current_in_celsius: int = current_in_celsius
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.type.name}: " \
|
||||
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"type": self.type.name,
|
||||
"offset": self.offset,
|
||||
"currentInCelsius": self.current_in_celsius
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {
|
||||
"Roomtemp": self.current_in_celsius,
|
||||
"Offset": self.offset
|
||||
}
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
|
||||
|
||||
|
||||
class BatterySkill(Skill):
|
||||
def __init__(self, charge_level_in_percent: int):
|
||||
super().__init__(SkillType.SmartHomeBattery)
|
||||
self.charge_level_in_percent: int = charge_level_in_percent
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
|
||||
|
||||
def to_json(self):
|
||||
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
return BatterySkill(skill["chargeLevelInPercent"])
|
||||
|
||||
|
||||
class UnitTypes(Enum):
|
||||
BATTERY = auto()
|
||||
TEMPERATURE_SENSOR = auto()
|
||||
THERMOSTAT = auto()
|
||||
|
||||
|
||||
class Unit:
|
||||
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
|
||||
device: Optional[Device], skills: List[Union[Dict, Skill]],
|
||||
interaction_controls: Optional[List] = None):
|
||||
self.type: UnitTypes = unit_type
|
||||
self.id: int = idx
|
||||
self.display_name: str = display_name
|
||||
self.device: Optional[Device] = device
|
||||
self.skills: List[Skill] = skills
|
||||
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
|
||||
|
||||
def __repr__(self):
|
||||
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
|
||||
if self.device is not None:
|
||||
desc += f"device: {self.device.short_description()}; "
|
||||
desc += f"skills: {self.skills}; "
|
||||
if self.interaction_controls is not None:
|
||||
desc += f"interactionControls: {self.interaction_controls} ;"
|
||||
return desc
|
||||
|
||||
def to_json(self):
|
||||
state = {
|
||||
"type": self.type.name,
|
||||
"id": self.id,
|
||||
"displayName": self.display_name,
|
||||
"skills": [skill.to_json() for skill in self.skills]
|
||||
}
|
||||
if self.device is not None:
|
||||
state["device"] = self.device.to_short_json()
|
||||
if self.interaction_controls is not None:
|
||||
state["interactionControls"] = self.interaction_controls
|
||||
|
||||
return state
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {}
|
||||
for skill in self.skills:
|
||||
data.update(skill.to_web_data(device_id))
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(unit: Dict, device: Optional[Device]):
|
||||
return Unit(UnitTypes[unit["type"]],
|
||||
unit["id"],
|
||||
unit["displayName"],
|
||||
device,
|
||||
[Skill.parse_dict(skill) for skill in unit["skills"]],
|
||||
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
|
||||
|
||||
|
||||
class DeviceType(Enum):
|
||||
SmartHomeDevice = auto()
|
||||
|
||||
|
||||
class DeviceCategory(Enum):
|
||||
THERMOSTAT = auto()
|
||||
|
||||
|
||||
class Device:
|
||||
def __init__(self, state):
|
||||
self.type: Optional[DeviceType] = None
|
||||
self.is_deletable: bool = False
|
||||
self.id: int = -1
|
||||
self.master_connection_state = None # ToDo
|
||||
self.display_name: Optional[str] = None
|
||||
self.category: Optional[str] = None
|
||||
self.units = None # ToDo
|
||||
self.firmware_version: Optional[FirmwareVersion] = None
|
||||
self.model: Optional[str] = None
|
||||
self.is_editable: bool = False
|
||||
self.manufacturer: Optional[Manufacturer] = None
|
||||
self.push_service: Optional[PushService] = None
|
||||
self.actor_identification_number: Optional[str] = None
|
||||
|
||||
if state is not None:
|
||||
self.parse_state(state)
|
||||
|
||||
def __repr__(self):
|
||||
# ToDo
|
||||
pass
|
||||
|
||||
def short_description(self):
|
||||
desc = f"masterConnectionState: {self.master_connection_state}; " \
|
||||
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
|
||||
f"manufacturer: {self.manufacturer}; " \
|
||||
f"actorIdentificationNumber: {self.actor_identification_number}; " \
|
||||
f"displayName: {self.display_name}"
|
||||
|
||||
return desc
|
||||
|
||||
def parse_state(self, state):
|
||||
self.type = DeviceType[state["type"]]
|
||||
self.is_deletable = state["isDeletable"]
|
||||
self.id = state["id"]
|
||||
self.master_connection_state = state["masterConnectionState"]
|
||||
self.display_name = state["displayName"]
|
||||
self.category = state["category"]
|
||||
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
|
||||
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
|
||||
self.model = state["model"]
|
||||
self.is_editable = state["isEditable"]
|
||||
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
|
||||
self.push_service = PushService.parse_dict(state["pushService"])
|
||||
self.actor_identification_number = state["actorIdentificationNumber"]
|
||||
|
||||
def to_web_data(self):
|
||||
data = {
|
||||
def to_web_data(self) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {
|
||||
"device": self.id,
|
||||
"ule_device_name": self.display_name
|
||||
"ule_device_name": self.display_name,
|
||||
}
|
||||
for unit in self.units:
|
||||
data.update(unit.to_web_data(self.id))
|
||||
|
||||
units = self._get_units()
|
||||
for unit in units:
|
||||
data.update(self._unit_to_web_data(unit))
|
||||
return data
|
||||
|
||||
def to_json(self):
|
||||
state = {"type": self.type.name,
|
||||
"isDeletable": self.is_deletable,
|
||||
"id": self.id,
|
||||
"masterConnectionState": self.master_connection_state,
|
||||
"displayName": self.display_name,
|
||||
"category": self.category,
|
||||
"units": [unit.to_json() for unit in self.units],
|
||||
"firmwareVersion": self.firmware_version.to_json(),
|
||||
"model": self.model,
|
||||
"isEditable": self.is_editable,
|
||||
"manufacturer": self.manufacturer.to_json(),
|
||||
"pushService": self.push_service.to_json(),
|
||||
"actorIdentificationNumber": self.actor_identification_number}
|
||||
def set_offset(self, offset: float) -> None:
|
||||
self.temp_sensor.offset = offset
|
||||
self._set_temp_value("offset", offset)
|
||||
|
||||
return state
|
||||
def get_offset(self) -> Optional[float]:
|
||||
return self.temp_sensor.offset
|
||||
|
||||
def set_offset(self, offset: float):
|
||||
temp_sensor: Optional[Unit] = None
|
||||
for unit in self.units:
|
||||
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||
temp_sensor = unit
|
||||
break
|
||||
temp_skill: Optional[TemperatureSkill] = None
|
||||
for skill in temp_sensor.skills:
|
||||
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||
break
|
||||
temp_skill.offset = offset
|
||||
def get_temperature(self) -> Optional[float]:
|
||||
return self.temp_sensor.current_celsius
|
||||
|
||||
def get_offset(self):
|
||||
temp_sensor: Optional[Unit] = None
|
||||
for unit in self.units:
|
||||
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||
temp_sensor = unit
|
||||
break
|
||||
temp_skill: Optional[TemperatureSkill] = None
|
||||
for skill in temp_sensor.skills:
|
||||
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||
break
|
||||
return temp_skill.offset
|
||||
def _get_temp_value(self, key: str) -> Optional[float]:
|
||||
sensor_skill = self._find_device_temp_skill()
|
||||
if sensor_skill is None:
|
||||
return None
|
||||
value = sensor_skill.get(key)
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
return None
|
||||
|
||||
def get_temperature(self):
|
||||
temp_sensor: Optional[Unit] = None
|
||||
for unit in self.units:
|
||||
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||
temp_sensor = unit
|
||||
break
|
||||
temp_skill: Optional[TemperatureSkill] = None
|
||||
for skill in temp_sensor.skills:
|
||||
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||
break
|
||||
return temp_skill.current_in_celsius
|
||||
def _set_temp_value(self, key: str, value: float) -> None:
|
||||
sensor_skill = self._find_device_temp_skill()
|
||||
if sensor_skill is None:
|
||||
return
|
||||
sensor_skill[key] = value
|
||||
|
||||
def to_short_json(self):
|
||||
return {
|
||||
"masterConnectionState": self.master_connection_state,
|
||||
"type": self.type.name,
|
||||
"model": self.model,
|
||||
"id": self.id,
|
||||
"manufacturer": self.manufacturer.to_json(),
|
||||
"actorIdentificationNumber": self.actor_identification_number,
|
||||
"displayName": self.display_name
|
||||
}
|
||||
def _find_device_temp_skill(self) -> Optional[Dict[str, Any]]:
|
||||
for unit in self._as_dict_list(self.raw_state.get("units")):
|
||||
if unit.get("type") != "TEMPERATURE_SENSOR":
|
||||
continue
|
||||
if unit.get("id") != self.id:
|
||||
continue
|
||||
for skill in self._as_dict_list(unit.get("skills")):
|
||||
if skill.get("type") == "SmartHomeTemperatureSensor":
|
||||
return skill
|
||||
return None
|
||||
|
||||
def _get_units(self) -> List[Dict[str, Any]]:
|
||||
return self._as_dict_list(self.raw_state.get("units"))
|
||||
|
||||
def _unit_to_web_data(self, unit: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {}
|
||||
for skill in self._as_dict_list(unit.get("skills")):
|
||||
skill_type = skill.get("type")
|
||||
if skill_type == "SmartHomeThermostat":
|
||||
data.update(self._thermostat_to_web_data(skill))
|
||||
elif skill_type == "SmartHomeTemperatureSensor":
|
||||
data.update(self._temperature_to_web_data(skill))
|
||||
return data
|
||||
|
||||
def _temperature_to_web_data(self, skill: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {}
|
||||
current = skill.get("currentInCelsius")
|
||||
offset = skill.get("offset")
|
||||
if current is not None:
|
||||
data["Roomtemp"] = current
|
||||
if offset is not None:
|
||||
data["Offset"] = offset
|
||||
return data
|
||||
|
||||
def _thermostat_to_web_data(self, skill: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {}
|
||||
|
||||
upper = 0.0
|
||||
lower = 0.0
|
||||
for preset in self._as_dict_list(skill.get("presets")):
|
||||
if preset.get("name") == "UPPER_TEMPERATURE":
|
||||
upper = self._as_float(preset.get("temperature"), upper)
|
||||
elif preset.get("name") == "LOWER_TEMPERATURE":
|
||||
lower = self._as_float(preset.get("temperature"), lower)
|
||||
data.update({"Heiztemp": upper, "Absenktemp": lower})
|
||||
|
||||
used_temp_sensor = self._as_dict(skill.get("usedTempSensor"))
|
||||
if used_temp_sensor is not None:
|
||||
used_id = used_temp_sensor.get("id")
|
||||
if used_id == self.id:
|
||||
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
|
||||
else:
|
||||
data.update({"ExtTempsensorID": used_id, "tempsensor": "extern"})
|
||||
|
||||
time_control = self._as_dict(skill.get("timeControl"))
|
||||
if time_control is not None:
|
||||
data.update(self._time_control_to_web_data(time_control))
|
||||
|
||||
drop_detection = self._as_dict(skill.get("temperatureDropDetection"))
|
||||
if drop_detection is not None:
|
||||
data.update(self._drop_detection_to_web_data(drop_detection))
|
||||
|
||||
return data
|
||||
|
||||
def _drop_detection_to_web_data(self, drop: Dict[str, Any]) -> Dict[str, Any]:
|
||||
sensitivity = drop.get("sensitivity")
|
||||
do_not_heat = drop.get("doNotHeatOffsetInMinutes")
|
||||
data: Dict[str, Any] = {}
|
||||
if sensitivity is not None:
|
||||
data["WindowOpenTrigger"] = sensitivity + 3
|
||||
if do_not_heat is not None:
|
||||
data["WindowOpenTimer"] = do_not_heat
|
||||
return data
|
||||
|
||||
def _time_control_to_web_data(self, time_control: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {}
|
||||
for schedule in self._as_dict_list(time_control.get("timeSchedules")):
|
||||
data.update(self._schedule_to_web_data(schedule))
|
||||
return data
|
||||
|
||||
def _schedule_to_web_data(self, schedule: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {}
|
||||
kind = schedule.get("kind")
|
||||
name = schedule.get("name")
|
||||
actions = self._as_dict_list(schedule.get("actions"))
|
||||
|
||||
if kind == "REPETITIVE" and name == "HOLIDAYS":
|
||||
enabled_count = 0
|
||||
for num, holiday in enumerate(actions, start=1):
|
||||
time_setting = self._as_dict(holiday.get("timeSetting"))
|
||||
if time_setting is None:
|
||||
continue
|
||||
start_date = self._as_str(time_setting.get("startDate"), "0000-00-00")
|
||||
end_date = self._as_str(time_setting.get("endDate"), "0000-00-00")
|
||||
start_time = self._as_str(time_setting.get("startTime"), "00:00:00")
|
||||
end_time = self._as_str(time_setting.get("endTime"), "00:00:00")
|
||||
_, start_month, start_day = start_date.split("-")
|
||||
_, end_month, end_day = end_date.split("-")
|
||||
data.update(
|
||||
{
|
||||
f"Holiday{num}StartDay": start_day,
|
||||
f"Holiday{num}StartMonth": start_month,
|
||||
f"Holiday{num}StartHour": start_time.split(":")[0],
|
||||
f"Holiday{num}EndDay": end_day,
|
||||
f"Holiday{num}EndMonth": end_month,
|
||||
f"Holiday{num}EndHour": end_time.split(":")[0],
|
||||
f"Holiday{num}Enabled": 1 if holiday.get("isEnabled") else 0,
|
||||
f"Holiday{num}ID": num,
|
||||
}
|
||||
)
|
||||
if holiday.get("isEnabled"):
|
||||
enabled_count += 1
|
||||
data["HolidayEnabledCount"] = enabled_count
|
||||
elif kind == "REPETITIVE" and name == "SUMMER_TIME" and actions:
|
||||
action = actions[0]
|
||||
time_setting = self._as_dict(action.get("timeSetting"))
|
||||
if time_setting is not None:
|
||||
start_date = self._as_str(time_setting.get("startDate"), "0000-00-00")
|
||||
end_date = self._as_str(time_setting.get("endDate"), "0000-00-00")
|
||||
_, start_month, start_day = start_date.split("-")
|
||||
_, end_month, end_day = end_date.split("-")
|
||||
data = {
|
||||
"SummerStartDay": start_day,
|
||||
"SummerStartMonth": start_month,
|
||||
"SummerEndDay": end_day,
|
||||
"SummerEndMonth": end_month,
|
||||
"SummerEnabled": 1 if schedule.get("isEnabled") else 0,
|
||||
}
|
||||
elif kind == "WEEKLY_TIMETABLE" and name == "TEMPERATURE":
|
||||
timer_items: Dict[str, List[int]] = {}
|
||||
for action in actions:
|
||||
if not action.get("isEnabled"):
|
||||
continue
|
||||
time_setting = self._as_dict(action.get("timeSetting"))
|
||||
if time_setting is None:
|
||||
continue
|
||||
start_time = self._as_str(time_setting.get("startTime"), "")
|
||||
if not start_time:
|
||||
continue
|
||||
time = "".join(start_time.split(":")[:-1])
|
||||
if time not in timer_items:
|
||||
timer_items[time] = [0, 0]
|
||||
description = self._as_dict(action.get("description"))
|
||||
preset_temperature = None
|
||||
if description is not None:
|
||||
preset_temperature = self._as_dict(
|
||||
description.get("presetTemperature")
|
||||
)
|
||||
heat = 0
|
||||
if (
|
||||
preset_temperature is not None
|
||||
and preset_temperature.get("name") == "UPPER_TEMPERATURE"
|
||||
):
|
||||
heat = 1
|
||||
day_of_week = self._as_int(time_setting.get("dayOfWeek"), 0)
|
||||
timer_items[time][heat] |= day_of_week
|
||||
num = 0
|
||||
for key, value in timer_items.items():
|
||||
if value[0] != 0:
|
||||
data.update({f"timer_item_{num}": f"{key};0;{value[0]}"})
|
||||
num += 1
|
||||
if value[1] != 0:
|
||||
data.update({f"timer_item_{num}": f"{key};1;{value[1]}"})
|
||||
num += 1
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def _as_dict_list(value: Any) -> List[Dict[str, Any]]:
|
||||
result: List[Dict[str, Any]] = []
|
||||
if not isinstance(value, list):
|
||||
return result
|
||||
items = cast(List[Any], value)
|
||||
for item in items:
|
||||
if isinstance(item, dict):
|
||||
result.append(cast(Dict[str, Any], item))
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _as_dict(value: Any) -> Optional[Dict[str, Any]]:
|
||||
if isinstance(value, dict):
|
||||
return cast(Dict[str, Any], value)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _as_str(value: Any, default: str) -> str:
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def _as_int(value: Any, default: int) -> int:
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def _as_float(value: Any, default: float) -> float:
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
return default
|
||||
|
||||
|
||||
__all__ = ["Device", "TemperatureSensor"]
|
||||
|
||||
10
fritz_temp_sync/errors.py
Normal file
10
fritz_temp_sync/errors.py
Normal file
@ -0,0 +1,10 @@
|
||||
class ConfigError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class HomeAssistantError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class FritzBoxError(RuntimeError):
|
||||
pass
|
||||
@ -8,81 +8,153 @@ import re
|
||||
import xml.etree.ElementTree as ET
|
||||
from asyncio import Task
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict, List
|
||||
|
||||
import requests
|
||||
from typing import Any, Dict, List, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import httpx # type: ignore[import-not-found]
|
||||
from device import Device
|
||||
from errors import FritzBoxError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FritzBox:
|
||||
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
password: str,
|
||||
update_timeout: int,
|
||||
user: Optional[str] = None,
|
||||
dry_run: bool = False,
|
||||
force_ipv4: bool = False,
|
||||
request_timeout: int = 10,
|
||||
request_retries: int = 2,
|
||||
) -> None:
|
||||
self._endpoints = {
|
||||
"login": "login_sid.lua?version=2",
|
||||
"logout": "index.lua",
|
||||
"data": "data.lua"
|
||||
"data": "data.lua",
|
||||
}
|
||||
self.url: str = url
|
||||
self.dry_run: bool = dry_run
|
||||
self.user: Optional[str] = user
|
||||
self.session: requests.Session = requests.Session()
|
||||
if force_ipv4:
|
||||
transport = httpx.AsyncHTTPTransport(local_address="0.0.0.0")
|
||||
else:
|
||||
transport = None
|
||||
self.session: httpx.AsyncClient = httpx.AsyncClient(transport=transport)
|
||||
self.password: str = password
|
||||
self.sid: Optional[str] = None
|
||||
self.update_timeout: int = update_timeout
|
||||
self.update_time: Dict[str, datetime] = {}
|
||||
self.hold_connection: Optional[Task] = None
|
||||
self.hold_connection: Optional[Task[None]] = None
|
||||
self.request_timeout: int = request_timeout
|
||||
self.request_retries: int = request_retries
|
||||
|
||||
async def hold_connection_alive(self) -> None:
|
||||
while True:
|
||||
# Session automatically destroyed after 20m of inactivity
|
||||
# according to the manual
|
||||
await asyncio.sleep(19 * 60)
|
||||
self.check_session()
|
||||
await self.check_session()
|
||||
|
||||
async def _request(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
data: Optional[Dict[str, Any]] = None,
|
||||
follow_redirects: bool = True,
|
||||
) -> Optional[Any]:
|
||||
for attempt in range(self.request_retries + 1):
|
||||
try:
|
||||
return await self.session.request(
|
||||
method,
|
||||
url,
|
||||
data=data,
|
||||
timeout=self.request_timeout,
|
||||
follow_redirects=follow_redirects,
|
||||
)
|
||||
except Exception as exc:
|
||||
if attempt >= self.request_retries:
|
||||
logger.error("Request failed (%s %s): %s", method, url, exc)
|
||||
return None
|
||||
backoff = 2**attempt
|
||||
logger.warning(
|
||||
"Request failed (%s %s), retry in %ss: %s",
|
||||
method,
|
||||
url,
|
||||
backoff,
|
||||
exc,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
return None
|
||||
|
||||
def _calc_challenge_v2(self, challenge: str) -> str:
|
||||
|
||||
logging.debug(f"Calculate v2 challenge: {challenge}")
|
||||
logger.debug("Calculate v2 challenge")
|
||||
chall_regex = re.compile(
|
||||
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
||||
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)"
|
||||
)
|
||||
|
||||
chall_parts = chall_regex.match(challenge).groupdict()
|
||||
chall_match = chall_regex.match(challenge)
|
||||
if not chall_match:
|
||||
logger.error("Invalid Fritz!Box challenge format")
|
||||
return ""
|
||||
chall_parts = chall_match.groupdict()
|
||||
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
|
||||
iter1: int = int(chall_parts["iter1"])
|
||||
salt2: bytes = bytes.fromhex(chall_parts["salt2"])
|
||||
iter2: int = int(chall_parts["iter2"])
|
||||
|
||||
hash1 = hashlib.pbkdf2_hmac('sha256', self.password.encode(), salt1, iter1)
|
||||
response = salt2.hex() + "$" + hashlib.pbkdf2_hmac('sha256', hash1, salt2, iter2).hex()
|
||||
hash1 = hashlib.pbkdf2_hmac("sha256", self.password.encode(), salt1, iter1)
|
||||
response = (
|
||||
salt2.hex() + "$" + hashlib.pbkdf2_hmac("sha256", hash1, salt2, iter2).hex()
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def _calc_challenge_v1(self, challenge: str) -> str:
|
||||
""" Calculate the response for a challenge using legacy MD5 """
|
||||
logging.debug(f"Calculate v1 challenge: {challenge}")
|
||||
"""Calculate the response for a challenge using legacy MD5"""
|
||||
logger.debug("Calculate v1 challenge")
|
||||
response = f"{challenge}-{self.password}"
|
||||
response = response.encode("utf_16_le")
|
||||
response = challenge + "-" + hashlib.md5(response).hexdigest()
|
||||
return response
|
||||
response_bytes = response.encode("utf_16_le")
|
||||
return challenge + "-" + hashlib.md5(response_bytes).hexdigest()
|
||||
|
||||
def check_session(self) -> None:
|
||||
data = {
|
||||
async def check_session(self) -> None:
|
||||
data: Dict[str, Any] = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"page": "overview",
|
||||
"xhrId": "first",
|
||||
"noMenuRef": 1
|
||||
"noMenuRef": 1,
|
||||
}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
|
||||
if r is None:
|
||||
return
|
||||
if len(r.history) > 0:
|
||||
if not self.login():
|
||||
logging.error("Failed to login to Fritz!Box")
|
||||
if not await self.login():
|
||||
logger.error("Failed to login to Fritz!Box")
|
||||
else:
|
||||
logging.debug("Already logged in")
|
||||
logger.debug("Already logged in")
|
||||
|
||||
def login(self) -> bool:
|
||||
logging.info(f"Login user {self.user} to Fritz!Box")
|
||||
async def login(self) -> bool:
|
||||
logger.info("Login user %s to Fritz!Box", self.user)
|
||||
challenge = None
|
||||
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
|
||||
login_url = f"{self.url}/{self._endpoints['login']}"
|
||||
r = await self._request("GET", login_url, follow_redirects=False)
|
||||
if r is None:
|
||||
return False
|
||||
if getattr(r, "status_code", 200) in {301, 302, 303, 307, 308}:
|
||||
location = getattr(r, "headers", {}).get("Location")
|
||||
if location:
|
||||
retry_url, new_base = self._normalize_login_redirect(location)
|
||||
logger.debug("Login redirect to %s, retry %s", location, retry_url)
|
||||
r = await self._request("GET", retry_url, follow_redirects=False)
|
||||
if r is None:
|
||||
return False
|
||||
if new_base:
|
||||
self.url = new_base
|
||||
xml = ET.fromstring(r.text)
|
||||
for elem in xml:
|
||||
if elem.tag == "SID":
|
||||
@ -91,59 +163,70 @@ class FritzBox:
|
||||
challenge = elem.text
|
||||
elif self.user is None and elem.tag == "Users":
|
||||
for user_elem in elem:
|
||||
if "fritz" in user_elem.text:
|
||||
if user_elem.text and "fritz" in user_elem.text:
|
||||
self.user = user_elem.text
|
||||
|
||||
assert challenge is not None and self.user is not None
|
||||
if challenge is None or self.user is None:
|
||||
raise FritzBoxError("Missing Fritz!Box login challenge or user")
|
||||
|
||||
if challenge.startswith("2$"):
|
||||
response = self._calc_challenge_v2(challenge)
|
||||
else:
|
||||
response = self._calc_challenge_v1(challenge)
|
||||
if not response:
|
||||
raise FritzBoxError("Failed to compute Fritz!Box login response")
|
||||
|
||||
data = {
|
||||
"username": self.user,
|
||||
"response": response
|
||||
}
|
||||
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data)
|
||||
logging.debug(r.text)
|
||||
data = {"username": self.user, "response": response}
|
||||
r = await self._request("POST", f"{self.url}/{self._endpoints['login']}", data)
|
||||
if r is None:
|
||||
return False
|
||||
logger.debug("Login response received")
|
||||
xml = ET.fromstring(r.text)
|
||||
for elem in xml:
|
||||
if elem.tag == "SID":
|
||||
self.sid = elem.text
|
||||
|
||||
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
|
||||
if len(self.sid) != self.sid.count("0"):
|
||||
sid = self.sid or ""
|
||||
logger.info("Authenticated Fritz!Box: %s", len(sid) != sid.count("0"))
|
||||
if len(sid) != sid.count("0"):
|
||||
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
|
||||
return len(self.sid) != self.sid.count("0")
|
||||
return len(sid) != sid.count("0")
|
||||
|
||||
def logout(self) -> bool:
|
||||
logging.info("logout")
|
||||
data = {
|
||||
async def logout(self) -> bool:
|
||||
logger.info("Logout from Fritz!Box")
|
||||
data: Dict[str, Any] = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"logout": 1,
|
||||
"no_sidrenew": ""}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
|
||||
"no_sidrenew": "",
|
||||
}
|
||||
r = await self._request("POST", f"{self.url}/{self._endpoints['logout']}", data)
|
||||
if r is None:
|
||||
return False
|
||||
if self.hold_connection is not None:
|
||||
self.hold_connection.cancel()
|
||||
|
||||
return r.status_code == 200
|
||||
|
||||
def list_devices(self) -> Optional[List[Device]]:
|
||||
data = {
|
||||
async def list_devices(self) -> Optional[List[Device]]:
|
||||
data: Dict[str, Any] = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"page": "sh_dev",
|
||||
"xhrId": "all"
|
||||
"xhrId": "all",
|
||||
}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
logging.debug(r.text[:100])
|
||||
r = await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
|
||||
if r is None:
|
||||
return None
|
||||
logger.debug("Devices response received")
|
||||
if len(r.history) > 0:
|
||||
if self.login():
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
if await self.login():
|
||||
r = await self._request(
|
||||
"POST", f"{self.url}/{self._endpoints['data']}", data
|
||||
)
|
||||
if r is None:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
devices: List[Device] = []
|
||||
@ -152,36 +235,40 @@ class FritzBox:
|
||||
|
||||
return devices
|
||||
|
||||
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
|
||||
async def get_device_data(
|
||||
self, idx: Optional[int] = None, name: Optional[str] = None
|
||||
) -> Optional[Device]:
|
||||
if idx is None and name is None:
|
||||
logging.debug("No id or name given")
|
||||
logger.debug("No id or name given")
|
||||
return None
|
||||
|
||||
devices = self.list_devices()
|
||||
device = None
|
||||
for device in devices:
|
||||
if device.id == idx or device.display_name == name:
|
||||
devices = await self.list_devices()
|
||||
if devices is None:
|
||||
return None
|
||||
found_device: Optional[Device] = None
|
||||
for candidate in devices:
|
||||
if candidate.id == idx or candidate.display_name == name:
|
||||
found_device = candidate
|
||||
break
|
||||
device = None
|
||||
|
||||
if device is None:
|
||||
logging.debug(f"Device {idx} {name} not found")
|
||||
if found_device is None:
|
||||
logger.debug("Device not found for id=%s name=%s", idx, name)
|
||||
return None
|
||||
|
||||
return device
|
||||
return found_device
|
||||
|
||||
def set_offset(self, device: Device) -> None:
|
||||
async def set_offset(self, device: Device) -> None:
|
||||
if self.dry_run:
|
||||
logging.warning("No updates in dry-run-mode")
|
||||
logger.warning("Dry-run enabled, skipping update")
|
||||
return
|
||||
data = {
|
||||
data: Dict[str, Any] = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"device": device.id,
|
||||
"page": "home_auto_hkr_edit"
|
||||
"page": "home_auto_hkr_edit",
|
||||
}
|
||||
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
|
||||
|
||||
data = {
|
||||
"xhr": 1,
|
||||
@ -190,24 +277,63 @@ class FritzBox:
|
||||
"view": "",
|
||||
"back_to_page": "sh_dev",
|
||||
"apply": "",
|
||||
"oldpage": "/net/home_auto_hkr_edit.lua"
|
||||
"oldpage": "/net/home_auto_hkr_edit.lua",
|
||||
}
|
||||
data.update(device.to_web_data())
|
||||
|
||||
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
await self._request("POST", f"{self.url}/{self._endpoints['data']}", data)
|
||||
|
||||
def correct_offset(self, device_name: str, real_temp: float):
|
||||
async def correct_offset(self, device_name: str, real_temp: float) -> None:
|
||||
elapsed = None
|
||||
if device_name in self.update_time.keys():
|
||||
elapsed = datetime.now() - self.update_time[device_name]
|
||||
logging.info(f"Last update for {device_name} {elapsed} ago")
|
||||
logger.debug("Last update for %s %s ago", device_name, elapsed)
|
||||
delta = timedelta(minutes=self.update_timeout)
|
||||
if device_name not in self.update_time.keys() or elapsed > delta:
|
||||
device: Optional[Device] = self.get_device_data(name=device_name)
|
||||
if (
|
||||
device_name not in self.update_time.keys()
|
||||
or elapsed is None
|
||||
or elapsed > delta
|
||||
):
|
||||
device: Optional[Device] = await self.get_device_data(name=device_name)
|
||||
if device is None:
|
||||
return
|
||||
new_offset = device.get_offset() + real_temp - device.get_temperature()
|
||||
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
|
||||
current_offset = device.get_offset()
|
||||
current_temp = device.get_temperature()
|
||||
if current_offset is None or current_temp is None:
|
||||
logger.warning(
|
||||
"Skipping offset update for %s: missing temperature data",
|
||||
device_name,
|
||||
)
|
||||
return
|
||||
new_offset = current_offset + real_temp - current_temp
|
||||
logger.info(
|
||||
"Update offset for %s from %.2f to %.2f",
|
||||
device_name,
|
||||
current_offset,
|
||||
new_offset,
|
||||
)
|
||||
device.set_offset(new_offset)
|
||||
self.set_offset(device)
|
||||
self.update_time[device.display_name] = datetime.now()
|
||||
await self.set_offset(device)
|
||||
update_key = device.display_name or device_name
|
||||
self.update_time[update_key] = datetime.now()
|
||||
else:
|
||||
logger.debug(
|
||||
"Skip offset update for %s: last update %s ago (min %s)",
|
||||
device_name,
|
||||
elapsed,
|
||||
delta,
|
||||
)
|
||||
|
||||
async def close(self) -> None:
|
||||
await self.session.aclose()
|
||||
|
||||
def _normalize_login_redirect(self, location: str) -> tuple[str, Optional[str]]:
|
||||
parsed_location = urlparse(location)
|
||||
if parsed_location.scheme and parsed_location.netloc:
|
||||
base = f"{parsed_location.scheme}://{parsed_location.netloc}"
|
||||
else:
|
||||
parsed_base = urlparse(self.url)
|
||||
base = f"{parsed_base.scheme}://{parsed_base.netloc}"
|
||||
if location.endswith(self._endpoints["login"]):
|
||||
return location, base
|
||||
return f"{base}/{self._endpoints['login']}", base
|
||||
|
||||
23
fritz_temp_sync/ha_types.py
Normal file
23
fritz_temp_sync/ha_types.py
Normal file
@ -0,0 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
class HAAttributes(TypedDict, total=False):
|
||||
current_temperature: float
|
||||
friendly_name: str
|
||||
|
||||
|
||||
class HAState(TypedDict, total=False):
|
||||
entity_id: str
|
||||
state: str
|
||||
attributes: HAAttributes
|
||||
|
||||
|
||||
class HAEventData(TypedDict):
|
||||
entity_id: str
|
||||
new_state: HAState
|
||||
|
||||
|
||||
class HAEvent(TypedDict):
|
||||
data: HAEventData
|
||||
@ -3,34 +3,39 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from asyncio import Queue, Task, Event, Lock
|
||||
from typing import Callable, Dict, Optional
|
||||
import websockets
|
||||
from asyncio import Event, Lock, Queue, Task
|
||||
from typing import Any, Dict, Optional, cast
|
||||
|
||||
from errors import HomeAssistantError
|
||||
from ha_types import HAEvent
|
||||
from websockets.client import connect as ws_connect
|
||||
from websockets.exceptions import InvalidStatusCode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HomeAssistantAPI:
|
||||
def __init__(self, token: str, url: str) -> None:
|
||||
self.token = token
|
||||
self.msg_id = 1
|
||||
self.msg_id_lock = Lock()
|
||||
self.ws: websockets.WebSocketClientProtocol = None
|
||||
self.ws: Any = None
|
||||
self.url = url
|
||||
self.receiver: Optional[Task] = None
|
||||
self.sender: Optional[Task] = None
|
||||
self.sending_queue: Queue = Queue()
|
||||
self.receiver: Optional[Task[Any]] = None
|
||||
self.sender: Optional[Task[Any]] = None
|
||||
self.sending_queue: Queue[Dict[str, Any]] = Queue()
|
||||
self.authenticated: Event = Event()
|
||||
self.events: Dict[int, Queue] = {}
|
||||
self.responses: Dict[int, Dict] = {}
|
||||
self.events: Dict[int, Queue[HAEvent]] = {}
|
||||
self.responses: Dict[int, Dict[str, Any]] = {}
|
||||
self.response_events: Dict[int, Event] = {}
|
||||
self.response_lock: Lock = Lock()
|
||||
|
||||
async def connect(self):
|
||||
retries = 5
|
||||
logging.info("Connect to home assistant...")
|
||||
logger.info("Connect to Home Assistant")
|
||||
while True:
|
||||
try:
|
||||
self.ws = await websockets.connect(self.url)
|
||||
self.ws = await ws_connect(self.url)
|
||||
self.sender = asyncio.create_task(self.sending())
|
||||
await self.auth()
|
||||
self.receiver = asyncio.create_task(self.receiving())
|
||||
@ -39,23 +44,30 @@ class HomeAssistantAPI:
|
||||
if retries > 0:
|
||||
retries -= 1
|
||||
await asyncio.sleep(30)
|
||||
logging.info(f"Retry home assistant connection... ({retries})")
|
||||
logger.warning(
|
||||
"Retry Home Assistant connection (%s retries left)", retries
|
||||
)
|
||||
continue
|
||||
else:
|
||||
logging.error("Invalid status code while connecting to Home Assistant")
|
||||
await self.exit_loop()
|
||||
return False
|
||||
logger.error("Invalid status code while connecting to Home Assistant")
|
||||
await self.exit_loop()
|
||||
raise HomeAssistantError(
|
||||
"Invalid status code while connecting to Home Assistant"
|
||||
)
|
||||
|
||||
async def wait_for_close(self):
|
||||
await self.ws.wait_closed()
|
||||
|
||||
async def receiving(self):
|
||||
logging.debug("Start receiving")
|
||||
logger.debug("Start receiving")
|
||||
async for message in self.ws:
|
||||
msg: Dict = json.loads(message)
|
||||
msg: Dict[str, Any] = json.loads(cast(str, message))
|
||||
if msg["type"] == "event":
|
||||
if msg["id"] not in self.events.keys():
|
||||
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
|
||||
logger.warning(
|
||||
"Received event for not subscribed id: %s %s",
|
||||
msg["id"],
|
||||
msg.get("event_type"),
|
||||
)
|
||||
continue
|
||||
await self.events[msg["id"]].put(msg["event"])
|
||||
else:
|
||||
@ -64,7 +76,7 @@ class HomeAssistantAPI:
|
||||
if msg["id"] in self.response_events.keys():
|
||||
self.response_events[msg["id"]].set()
|
||||
|
||||
async def wait_for(self, idx):
|
||||
async def wait_for(self, idx: int):
|
||||
async with self.response_lock:
|
||||
if idx in self.responses.keys():
|
||||
msg = self.responses[idx]
|
||||
@ -76,7 +88,7 @@ class HomeAssistantAPI:
|
||||
async with self.response_lock:
|
||||
del self.response_events[idx]
|
||||
if idx not in self.responses.keys():
|
||||
logging.error("Response ID not found")
|
||||
logger.warning("Response ID not found")
|
||||
return None
|
||||
msg = self.responses[idx]
|
||||
del self.responses[idx]
|
||||
@ -88,26 +100,31 @@ class HomeAssistantAPI:
|
||||
if self.receiver is not None:
|
||||
self.receiver.cancel()
|
||||
|
||||
async def close(self) -> None:
|
||||
await self.exit_loop()
|
||||
if self.ws is not None:
|
||||
try:
|
||||
await self.ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def auth(self):
|
||||
msg = json.loads(await self.ws.recv())
|
||||
if msg["type"] != "auth_required":
|
||||
logging.error("Authentication error: Not required")
|
||||
await self.exit_loop()
|
||||
response = {
|
||||
"type": "auth",
|
||||
"access_token": self.token
|
||||
}
|
||||
raise HomeAssistantError("Authentication error: Not required")
|
||||
response: Dict[str, Any] = {"type": "auth", "access_token": self.token}
|
||||
await self.sending_queue.put(response)
|
||||
msg = json.loads(await self.ws.recv())
|
||||
if msg["type"] == "auth_invalid":
|
||||
logging.info("Auth failed")
|
||||
await self.exit_loop()
|
||||
raise HomeAssistantError("Auth failed")
|
||||
elif msg["type"] == "auth_ok":
|
||||
logging.debug("Authenticated")
|
||||
logger.info("Authenticated")
|
||||
self.authenticated.set()
|
||||
else:
|
||||
logging.error(f"Unknown answer for auth: {msg}")
|
||||
await self.exit_loop()
|
||||
raise HomeAssistantError(f"Unknown answer for auth: {msg}")
|
||||
|
||||
async def sending(self):
|
||||
while msg := await self.sending_queue.get():
|
||||
@ -116,34 +133,33 @@ class HomeAssistantAPI:
|
||||
async def subscribe_event(self, event_type: str):
|
||||
await self.authenticated.wait()
|
||||
|
||||
logging.info(f"Subscribe to {event_type}")
|
||||
logger.info("Subscribe to %s", event_type)
|
||||
async with self.msg_id_lock:
|
||||
msg_id = self.msg_id
|
||||
response = {
|
||||
response: Dict[str, Any] = {
|
||||
"id": msg_id,
|
||||
"type": "subscribe_events",
|
||||
"event_type": event_type
|
||||
"event_type": event_type,
|
||||
}
|
||||
self.events[msg_id] = Queue()
|
||||
self.msg_id += 1
|
||||
await self.sending_queue.put(response)
|
||||
return msg_id
|
||||
|
||||
async def get_states(self):
|
||||
async def get_states(self) -> list[Dict[str, Any]]:
|
||||
await self.authenticated.wait()
|
||||
async with self.msg_id_lock:
|
||||
message = {
|
||||
"id": self.msg_id,
|
||||
"type": "get_states"
|
||||
}
|
||||
message: Dict[str, Any] = {"id": self.msg_id, "type": "get_states"}
|
||||
self.msg_id += 1
|
||||
await self.sending_queue.put(message)
|
||||
|
||||
response = await self.wait_for(message["id"])
|
||||
response = await self.wait_for(cast(int, message["id"]))
|
||||
# ToDo: Error handling
|
||||
return response["result"]
|
||||
if response is None:
|
||||
return []
|
||||
return cast(list[Dict[str, Any]], response.get("result", []))
|
||||
|
||||
async def get_device_state(self, entity_id: str):
|
||||
async def get_device_state(self, entity_id: str) -> Optional[Dict[str, Any]]:
|
||||
device_states = await self.get_states()
|
||||
for device_state in device_states:
|
||||
if device_state["entity_id"] == entity_id:
|
||||
|
||||
16
fritz_temp_sync/logging_config.py
Normal file
16
fritz_temp_sync/logging_config.py
Normal file
@ -0,0 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
|
||||
def configure_logging(level: Union[str, int]) -> None:
|
||||
numeric_level = level
|
||||
if isinstance(level, str):
|
||||
numeric_level = logging.getLevelName(level.upper())
|
||||
if isinstance(numeric_level, str):
|
||||
numeric_level = logging.INFO
|
||||
logging.basicConfig(
|
||||
level=numeric_level,
|
||||
format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s",
|
||||
)
|
||||
215
fritz_temp_sync/sync_controller.py
Normal file
215
fritz_temp_sync/sync_controller.py
Normal file
@ -0,0 +1,215 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any, Optional, cast
|
||||
|
||||
from config import Mapping
|
||||
from errors import FritzBoxError, HomeAssistantError
|
||||
from fritzbox import FritzBox
|
||||
from ha_types import HAEvent
|
||||
from homeassistant import HomeAssistantAPI
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SyncController:
|
||||
def __init__(
|
||||
self, ha: HomeAssistantAPI, fb: FritzBox, offset_threshold: float
|
||||
) -> None:
|
||||
self.ha = ha
|
||||
self.fb = fb
|
||||
self.offset_threshold = offset_threshold
|
||||
self.sensor_mappings: dict[str, list[str]] = {}
|
||||
self.thermostate_mappings: dict[str, str] = {}
|
||||
self._offset_queue: asyncio.Queue[Optional[tuple[str, float]]] = asyncio.Queue()
|
||||
self._worker_task: Optional[asyncio.Task[None]] = None
|
||||
self._event_task: Optional[asyncio.Task[None]] = None
|
||||
|
||||
def load_mappings(self, mappings: list[Mapping]) -> None:
|
||||
for mapping in mappings:
|
||||
sensor = mapping.sensor
|
||||
thermostate = mapping.thermostate
|
||||
if sensor not in self.sensor_mappings.keys():
|
||||
self.sensor_mappings[sensor] = []
|
||||
self.sensor_mappings[sensor].append(thermostate)
|
||||
self.thermostate_mappings[thermostate] = sensor
|
||||
logger.debug(
|
||||
"Loaded %s sensor mappings and %s thermostat mappings",
|
||||
len(self.sensor_mappings),
|
||||
len(self.thermostate_mappings),
|
||||
)
|
||||
|
||||
async def handle_event(self, idx: int):
|
||||
logger.debug("Wait for events for subscription %s", idx)
|
||||
|
||||
event_queue = cast(Any, self.ha).events[idx]
|
||||
while event := await event_queue.get():
|
||||
event = cast(HAEvent, event)
|
||||
try:
|
||||
entity_id = event["data"]["entity_id"]
|
||||
if not entity_id:
|
||||
continue
|
||||
if (
|
||||
entity_id in self.sensor_mappings.keys()
|
||||
or entity_id in self.thermostate_mappings.keys()
|
||||
):
|
||||
state = cast(Any, await self.ha.get_device_state(entity_id))
|
||||
if not isinstance(state, dict):
|
||||
continue
|
||||
state = cast(dict[str, Any], state)
|
||||
new_state = cast(dict[str, Any], event["data"]["new_state"])
|
||||
logger.debug(
|
||||
"state_changed for %s (is_thermostat=%s is_sensor=%s state=%s)",
|
||||
entity_id,
|
||||
entity_id in self.thermostate_mappings.keys(),
|
||||
entity_id in self.sensor_mappings.keys(),
|
||||
state.get("state"),
|
||||
)
|
||||
if (
|
||||
entity_id in self.thermostate_mappings.keys()
|
||||
and state.get("state") != "unavailable"
|
||||
):
|
||||
therm_temp = cast(
|
||||
float, new_state["attributes"]["current_temperature"]
|
||||
)
|
||||
therm_name = cast(str, new_state["attributes"]["friendly_name"])
|
||||
sensor = self.thermostate_mappings[entity_id]
|
||||
sensor_state = cast(Any, await self.ha.get_device_state(sensor))
|
||||
if not isinstance(sensor_state, dict):
|
||||
continue
|
||||
sensor_state = cast(dict[str, Any], sensor_state)
|
||||
sensor_temp = (
|
||||
round(float(cast(str, sensor_state["state"])) * 2) / 2
|
||||
)
|
||||
logger.debug(
|
||||
"Temps for %s: thermostat=%s sensor=%s",
|
||||
entity_id,
|
||||
therm_temp,
|
||||
sensor_temp,
|
||||
)
|
||||
diff = abs(therm_temp - sensor_temp)
|
||||
if diff < self.offset_threshold:
|
||||
logger.debug(
|
||||
"Offset diff %.2f below threshold %.2f for %s",
|
||||
diff,
|
||||
self.offset_threshold,
|
||||
therm_name,
|
||||
)
|
||||
continue
|
||||
if therm_temp != sensor_temp:
|
||||
logger.info(
|
||||
"Offset correction: %s thermostat=%s sensor=%s (from %s)",
|
||||
therm_name,
|
||||
therm_temp,
|
||||
sensor_temp,
|
||||
sensor,
|
||||
)
|
||||
await self._enqueue_offset(therm_name, sensor_temp)
|
||||
|
||||
elif entity_id in self.sensor_mappings.keys():
|
||||
logger.debug("sensor update for %s", entity_id)
|
||||
sensor_temp = (
|
||||
round(float(cast(str, new_state["state"])) * 2) / 2
|
||||
)
|
||||
logger.debug(
|
||||
"thermostats for %s: %s",
|
||||
entity_id,
|
||||
self.sensor_mappings[entity_id],
|
||||
)
|
||||
for thermostate in self.sensor_mappings[entity_id]:
|
||||
logger.debug("check thermostat %s", thermostate)
|
||||
therm_state = cast(
|
||||
Any, await self.ha.get_device_state(thermostate)
|
||||
)
|
||||
if not isinstance(therm_state, dict):
|
||||
continue
|
||||
therm_state = cast(dict[str, Any], therm_state)
|
||||
if therm_state.get("state") == "unavailable":
|
||||
logger.debug("thermostat %s unavailable", thermostate)
|
||||
continue
|
||||
therm_temp = float(
|
||||
cast(
|
||||
float,
|
||||
therm_state["attributes"]["current_temperature"],
|
||||
)
|
||||
)
|
||||
therm_name = cast(
|
||||
str, therm_state["attributes"]["friendly_name"]
|
||||
)
|
||||
logger.debug(
|
||||
"Temps for %s: thermostat=%s sensor=%s",
|
||||
thermostate,
|
||||
therm_temp,
|
||||
sensor_temp,
|
||||
)
|
||||
diff = abs(therm_temp - sensor_temp)
|
||||
if diff < self.offset_threshold:
|
||||
logger.debug(
|
||||
"Offset diff %.2f below threshold %.2f for %s",
|
||||
diff,
|
||||
self.offset_threshold,
|
||||
therm_name,
|
||||
)
|
||||
continue
|
||||
if therm_temp != sensor_temp:
|
||||
logger.info(
|
||||
"Offset correction: %s thermostat=%s sensor=%s (from %s)",
|
||||
therm_name,
|
||||
therm_temp,
|
||||
sensor_temp,
|
||||
entity_id,
|
||||
)
|
||||
await self._enqueue_offset(therm_name, sensor_temp)
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
async def _enqueue_offset(self, device_name: str, sensor_temp: float) -> None:
|
||||
await self._offset_queue.put((device_name, sensor_temp))
|
||||
|
||||
async def _offset_worker(self) -> None:
|
||||
while True:
|
||||
item = await self._offset_queue.get()
|
||||
if item is None:
|
||||
break
|
||||
device_name, sensor_temp = item
|
||||
try:
|
||||
await self.fb.correct_offset(device_name, sensor_temp)
|
||||
except FritzBoxError as exc:
|
||||
logger.error("Offset update failed for %s: %s", device_name, exc)
|
||||
|
||||
async def run(self) -> None:
|
||||
try:
|
||||
await self.ha.connect()
|
||||
except HomeAssistantError as exc:
|
||||
logger.error("Home Assistant error: %s", exc)
|
||||
return
|
||||
logger.debug("Subscribing to state_changed")
|
||||
state_changed_id = await self.ha.subscribe_event("state_changed")
|
||||
logger.debug(state_changed_id)
|
||||
self._event_task = asyncio.create_task(self.handle_event(state_changed_id))
|
||||
try:
|
||||
try:
|
||||
if not await self.fb.login():
|
||||
raise FritzBoxError("Failed to login to Fritz!Box")
|
||||
except FritzBoxError as exc:
|
||||
logger.error("Fritz!Box error: %s", exc)
|
||||
return
|
||||
logger.info("Health check OK")
|
||||
self._worker_task = asyncio.create_task(self._offset_worker())
|
||||
try:
|
||||
await self.ha.wait_for_close()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
await self.shutdown()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
if self._event_task is not None:
|
||||
self._event_task.cancel()
|
||||
await self._offset_queue.put(None)
|
||||
if self._worker_task is not None:
|
||||
await self._worker_task
|
||||
await self.ha.close()
|
||||
await self.fb.close()
|
||||
logger.info("Shutdown complete")
|
||||
@ -1,125 +1,69 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from config import load_config
|
||||
from errors import ConfigError
|
||||
from fritzbox import FritzBox
|
||||
from homeassistant import HomeAssistantAPI
|
||||
from logging_config import configure_logging
|
||||
from sync_controller import SyncController
|
||||
|
||||
sensor_mappings = {}
|
||||
thermostate_mappings = {}
|
||||
|
||||
|
||||
async def handle_event(idx: int):
|
||||
global ha, fb
|
||||
logging.debug(f"Wait for events for {idx}")
|
||||
|
||||
while event := await ha.events[idx].get():
|
||||
try:
|
||||
entity_id = event["data"]["entity_id"]
|
||||
if (
|
||||
entity_id in sensor_mappings.keys()
|
||||
or entity_id in thermostate_mappings.keys()
|
||||
):
|
||||
state = await ha.get_device_state(entity_id)
|
||||
new_state = event["data"]["new_state"]
|
||||
logging.info(
|
||||
f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}"
|
||||
)
|
||||
if (
|
||||
entity_id in thermostate_mappings.keys()
|
||||
and state["state"] != "unavailable"
|
||||
):
|
||||
therm_temp = new_state["attributes"]["current_temperature"]
|
||||
therm_name = new_state["attributes"]["friendly_name"]
|
||||
sensor = thermostate_mappings[entity_id]
|
||||
sensor_state = await ha.get_device_state(sensor)
|
||||
sensor_temp = round(float(sensor_state["state"]) * 2) / 2
|
||||
logging.info(f"temps: {therm_temp} {sensor_temp}")
|
||||
if therm_temp != sensor_temp:
|
||||
logging.info(
|
||||
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})"
|
||||
)
|
||||
fb.correct_offset(therm_name, sensor_temp)
|
||||
|
||||
elif entity_id in sensor_mappings.keys():
|
||||
logging.info(f"here {sensor_mappings} {entity_id}")
|
||||
logging.info(f"{new_state}")
|
||||
sensor_temp = round(float(new_state["state"]) * 2) / 2
|
||||
logging.info(f"entry: {sensor_mappings[entity_id]}")
|
||||
for thermostate in sensor_mappings[entity_id]:
|
||||
logging.info(thermostate)
|
||||
therm_state = await ha.get_device_state(thermostate)
|
||||
logging.info(f"{thermostate} {therm_state}")
|
||||
if therm_state["state"] == "unavailable":
|
||||
continue
|
||||
therm_temp = float(
|
||||
therm_state["attributes"]["current_temperature"]
|
||||
)
|
||||
therm_name = therm_state["attributes"]["friendly_name"]
|
||||
logging.info(f"Temps: {therm_temp} {sensor_temp}")
|
||||
if therm_temp != sensor_temp:
|
||||
logging.info(
|
||||
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})"
|
||||
)
|
||||
fb.correct_offset(therm_name, sensor_temp)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
async def init(ha: HomeAssistantAPI, fb: FritzBox):
|
||||
if not await ha.connect():
|
||||
return
|
||||
logging.debug("Subscribe")
|
||||
state_changed_id = await ha.subscribe_event("state_changed")
|
||||
logging.debug(state_changed_id)
|
||||
asyncio.create_task(handle_event(state_changed_id))
|
||||
fb.login()
|
||||
await ha.wait_for_close()
|
||||
logging.info("Websocket closed, shutting down..")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def main():
|
||||
config_path = sys.argv[1]
|
||||
config = json.load(open(config_path))
|
||||
level = logging.INFO
|
||||
if "log_level" in config:
|
||||
print(f"Setting log_level {config['log_level']}")
|
||||
if config["log_level"] == "DEBUG":
|
||||
level = logging.DEBUG
|
||||
logging.basicConfig(
|
||||
level=level, format="[%(asctime)s] [%(levelname)s] %(message)s"
|
||||
)
|
||||
logging.debug(config)
|
||||
try:
|
||||
config = load_config(config_path)
|
||||
except ConfigError as exc:
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
logging.error("Invalid config: %s", exc)
|
||||
return
|
||||
configure_logging(config.log_level)
|
||||
logger.debug("Loaded config")
|
||||
if not config.log_ws_messages:
|
||||
logging.getLogger("websockets").setLevel(logging.WARNING)
|
||||
logging.getLogger("websockets.client").setLevel(logging.WARNING)
|
||||
logging.getLogger("websockets.server").setLevel(logging.WARNING)
|
||||
if not config.log_http_requests:
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
||||
|
||||
fritzbox = config.fritzbox
|
||||
|
||||
global fb
|
||||
fb = FritzBox(
|
||||
url=config["fritzbox"]["url"],
|
||||
user=config["fritzbox"]["username"],
|
||||
password=config["fritzbox"]["password"],
|
||||
update_timeout=config["update_timeout"],
|
||||
dry_run=False,
|
||||
url=fritzbox.url,
|
||||
user=fritzbox.username,
|
||||
password=fritzbox.password,
|
||||
update_timeout=config.update_timeout,
|
||||
dry_run=config.dry_run,
|
||||
force_ipv4=config.force_ipv4,
|
||||
request_timeout=config.request_timeout,
|
||||
request_retries=config.request_retries,
|
||||
)
|
||||
supervisor_url = "ws://supervisor/core/websocket"
|
||||
if "SUPERVISOR_URL" in os.environ:
|
||||
supervisor_url = os.environ["SUPERVISOR_URL"]
|
||||
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
|
||||
global ha
|
||||
supervisor_token = os.environ.get("SUPERVISOR_TOKEN")
|
||||
if not supervisor_token:
|
||||
logger.error("Missing SUPERVISOR_TOKEN; cannot connect to Home Assistant")
|
||||
return
|
||||
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
|
||||
|
||||
for mapping in config["mappings"]:
|
||||
if mapping["sensor"] not in sensor_mappings.keys():
|
||||
sensor_mappings[mapping["sensor"]] = []
|
||||
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
|
||||
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
|
||||
logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}")
|
||||
controller = SyncController(ha, fb, config.offset_threshold)
|
||||
controller.load_mappings(config.mappings)
|
||||
try:
|
||||
await init(ha, fb)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
logger.info("Starting sync controller")
|
||||
await controller.run()
|
||||
except (KeyboardInterrupt, asyncio.CancelledError):
|
||||
logger.info("Shutdown requested")
|
||||
await controller.shutdown()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
38
fritz_temp_sync/tests/test_config.py
Normal file
38
fritz_temp_sync/tests/test_config.py
Normal file
@ -0,0 +1,38 @@
|
||||
# pyright: reportMissingImports=false
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest # type: ignore[import-not-found]
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.append(str(ROOT))
|
||||
|
||||
from config import _parse_mappings, load_config # type: ignore[import-not-found]
|
||||
from errors import ConfigError # type: ignore[import-not-found]
|
||||
|
||||
|
||||
def test_parse_mappings_rejects_duplicate_thermostat():
|
||||
with pytest.raises(ConfigError):
|
||||
_parse_mappings(
|
||||
[
|
||||
{"sensor": "sensor.a", "thermostate": "climate.x"},
|
||||
{"sensor": "sensor.b", "thermostate": "climate.x"},
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_load_config_parses_dry_run(tmp_path: Path):
|
||||
payload = {
|
||||
"fritzbox": {"url": "http://fritz.box", "password": "secret"},
|
||||
"mappings": [{"sensor": "sensor.a", "thermostate": "climate.x"}],
|
||||
"update_timeout": 15,
|
||||
"dry_run": True,
|
||||
}
|
||||
config_path = tmp_path / "config.json"
|
||||
config_path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
|
||||
config = load_config(str(config_path))
|
||||
assert config.dry_run is True
|
||||
Loading…
x
Reference in New Issue
Block a user