Compare commits

..

No commits in common. "master" and "v0.1.0" have entirely different histories.

8 changed files with 203 additions and 1214 deletions

5
.gitignore vendored
View File

@ -15,7 +15,6 @@ dist/
downloads/
eggs/
.eggs/
.venv/
lib/
lib64/
parts/
@ -169,7 +168,6 @@ pip-selfcheck.json
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
@ -241,6 +239,3 @@ fabric.properties
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
# local config test
options.json
.DS_Store

View File

@ -3,20 +3,14 @@ FROM $BUILD_FROM
# Install requirements for add-on
RUN apk update && apk add --no-cache python3 py-pip
RUN python3 -m pip install websocket-client requests
# Prepare venv
RUN python3 -m venv /opt/venv
RUN source /opt/venv/bin/activate && \
python3 -m pip install websockets requests beautifulsoup4
WORKDIR /srv
WORKDIR /data
# Copy data for add-on
COPY sync_ha_fb.py /srv
COPY fritzbox.py /srv
COPY homeassistant.py /srv
COPY device.py /srv
COPY run.sh /
RUN chmod a+x /run.sh

View File

@ -1,11 +1,8 @@
name: "Fritz!Box Temperature Sync Dev"
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
version: "0.5.4"
startup: "application"
stage: "stable"
slug: "fritz_temp_sync_dev"
name: "Fritz!Box Temperature Sync"
description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant"
version: "0.1.0"
slug: "fritz_temp_sync"
homeassistant_api: true
init: false
arch:
- aarch64
- amd64
@ -16,20 +13,13 @@ options:
fritzbox:
url: "http://fritz.box"
password: null
verify_ssl: true
old_fb: false
mappings:
- sensor: null
thermostate: null
update_timeout: 15
schema:
fritzbox:
url: url
username: "str?"
password: str
verify_ssl: bool
old_fb: bool
mappings:
- sensor: str
thermostate: str
update_timeout: int

View File

@ -1,714 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum, IntFlag, auto
from typing import Dict, List, Optional, Union
import typing
class WeekDay(IntFlag):
MON = 0b1
TUE = 0b10
WED = 0b100
THU = 0b1000
FRI = 0b10000
SAT = 0b100000
SUN = 0b1000000
class Manufacturer:
def __init__(self, name: str):
self.name: str = name
def __repr__(self):
return f"name: {self.name}"
def to_json(self):
return {"name": self.name}
@staticmethod
def parse_dict(manufacturer: Dict):
return Manufacturer(manufacturer["name"])
class FirmwareVersion:
def __init__(self, search: bool, current: str, update: bool, running: bool):
self.search: bool = search
self.current: str = current
self.update: bool = update
self.running: bool = running
def __repr__(self):
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
def to_json(self):
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
@staticmethod
def parse_dict(firmware_version: dict):
return FirmwareVersion(firmware_version["search"],
firmware_version["current"],
firmware_version["update"],
firmware_version["running"])
class PushService:
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
self.mail_address: str = mail_address
self.unit_settings: List = unit_settings
self.is_enabled: bool = is_enabled
def __repr__(self):
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
f"isEnabled: {self.is_enabled}>"
def to_json(self):
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
@staticmethod
def parse_dict(push_service: Dict):
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
class SkillType(Enum):
SmartHomeTemperatureSensor = auto()
SmartHomeThermostat = auto()
SmartHomeBattery = auto()
class Skill(ABC):
def __init__(self, skill_type: SkillType):
self.type: SkillType = skill_type
@abstractmethod
def to_json(self):
pass
@abstractmethod
def to_web_data(self, device_id: int):
pass
@staticmethod
def parse_dict(skill: Dict):
skill_type = SkillType[skill["type"]]
if skill_type == SkillType.SmartHomeTemperatureSensor:
return TemperatureSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeThermostat:
return ThermostatSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeBattery:
return BatterySkill.parse_dict(skill)
else:
raise NotImplementedError(skill_type)
class PresetName(Enum):
LOWER_TEMPERATURE = auto()
UPPER_TEMPERATURE = auto()
HOLIDAY_TEMPERATURE = auto()
class Preset:
def __init__(self, name: PresetName, temperature: Optional[int]):
self.name: PresetName = name
self.temperature: Optional[int] = temperature
def __repr__(self):
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
def to_json(self):
return {"name": self.name.name, "temperature": self.temperature}
class Description:
def __init__(self, action: str, preset_temperature: Optional[Preset]):
self.action: str = action
self.preset_temperature: Optional[Preset] = preset_temperature
def __repr__(self):
desc = f"<Description action: {self.action}"
if self.preset_temperature is not None:
desc += f"; presetTemperaturet: {self.preset_temperature}"
desc += " >"
return desc
def to_json(self):
if self.preset_temperature is not None:
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
return {"action": self.action}
@staticmethod
def parse_dict(description: Dict):
preset: Optional[Preset] = None
if "presetTemperature" in description.keys():
preset = Preset(PresetName[description["presetTemperature"]["name"]],
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
return Description(description["action"], preset)
class Repetition(Enum):
NONE = auto()
YEARLY = auto()
class TimeSetting:
def __init__(self, start_date: Optional[str], start_time: Optional[str],
end_date: Optional[str] = None, end_time: Optional[str] = None,
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
self.start_date: Optional[str] = start_date
self.start_time: Optional[str] = start_time
self.end_date: Optional[str] = end_date
self.end_time: Optional[str] = end_time
self.repetition: Repetition = repetition
self.day_of_week: Optional[WeekDay] = day_of_week
def __repr__(self):
desc = f"<TimeSettings "
if self.day_of_week is not None:
desc += f"dayOfWeek: {self.day_of_week.name}; "
if self.start_date is not None:
desc += f"startDate: {self.start_date} "
if self.start_time is not None:
desc += f"startTime: {self.start_time} "
if self.end_date is not None:
desc += f"endDate: {self.end_date} "
if self.end_time is not None:
desc += f"endTime: {self.end_time} "
if self.repetition != Repetition.NONE:
desc += f"repetition: {self.repetition.name} "
desc += ">"
return desc
def to_json(self): # ToDo: Return typehints for all to_json
state = {}
if self.start_date is not None:
state["startDate"] = self.start_date
if self.start_time is not None:
state["startTime"] = self.start_time
if self.end_date is not None:
state["endDate"] = self.end_date
if self.end_time is not None:
state["endTime"] = self.end_time
if self.repetition != Repetition.NONE:
state["repetition"] = self.repetition.name
if self.day_of_week is not None:
state = {"dayOfWeek": self.day_of_week.name,
"time": state}
return state
@staticmethod
def parse_dict(time_setting: Dict):
start_date: Optional[str] = None
start_time: Optional[str] = None
end_date: Optional[str] = None
end_time: Optional[str] = None
day_of_week: Optional[WeekDay] = None
repetition: Repetition = Repetition.NONE
if "dayOfWeek" in time_setting.keys():
day_of_week = WeekDay[time_setting["dayOfWeek"]]
time_setting = time_setting["time"]
if "startDate" in time_setting.keys():
start_date = time_setting["startDate"]
if "startTime" in time_setting.keys():
start_time = time_setting["startTime"]
if "endDate" in time_setting.keys():
end_date = time_setting["endDate"]
if "endTime" in time_setting.keys():
end_time = time_setting["endTime"]
if "repetition" in time_setting.keys():
repetition = Repetition[time_setting["repetition"]]
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
class Change:
def __init__(self, description: Description, time_setting: TimeSetting):
self.description: Description = description
self.time_setting: TimeSetting = time_setting
def __repr__(self):
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
def to_json(self):
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
@staticmethod
def parse_dict(change: Dict):
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
class TemperatureDropDetection:
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
self.sensitivity: int = sensitivity
def __repr__(self):
# ToDo
pass
def to_json(self):
return {
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
"sensitivity": self.sensitivity
}
def to_web_data(self, device_id: int):
return {
"WindowOpenTrigger": self.sensitivity + 3,
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
}
@staticmethod
def parse_dict(drop_detection: Dict):
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
drop_detection["sensitivity"])
class ScheduleKind(Enum):
REPETITIVE = auto()
WEEKLY_TIMETABLE = auto()
class ThermostatSkillMode(Enum):
TARGET_TEMPERATURE = auto()
class Action:
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
self.is_enabled: bool = is_enabled
self.time_setting: TimeSetting = time_setting
self.description: Description = description
def __repr__(self):
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
f"description: {self.description} >"
def to_json(self):
return {"isEnabled": self.is_enabled,
"timeSetting": self.time_setting.to_json(),
"description": self.description.to_json()}
@staticmethod
def parse_dict(action: Dict):
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
Description.parse_dict(action["description"]))
class Schedule:
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
self.is_enabled: bool = is_enabled
self.kind: ScheduleKind = kind
self.name: str = name
self.actions: List[Action] = actions
def __repr__(self):
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
f"name: {self.name}; actions: {self.actions}>"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"kind": self.kind.name,
"name": self.name,
"actions": [action.to_json() for action in self.actions]
}
def to_web_data(self, device_id: int):
data = {}
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
enabled_count = 0
for num, holiday in enumerate(self.actions):
num += 1
_, start_month, start_day = holiday.time_setting.start_date.split("-")
_, end_month, end_day = holiday.time_setting.end_date.split("-")
data.update({
f"Holiday{num}StartDay": start_day,
f"Holiday{num}StartMonth": start_month,
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
f"Holiday{num}EndDay": end_day,
f"Holiday{num}EndMonth": end_month,
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
f"Holiday{num}ID": num
})
if holiday.is_enabled:
enabled_count += 1
data["HolidayEnabledCount"] = enabled_count
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
data = {
"SummerStartDay": start_day,
"SummerStartMonth": start_month,
"SummerEndDay": end_day,
"SummerEndMonth": end_month,
"SummerEnabled": 1 if self.is_enabled else 0
}
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
timer_items = {}
for action in self.actions:
if not action.is_enabled:
continue
time = "".join(action.time_setting.start_time.split(":")[:-1])
if time not in timer_items.keys():
timer_items[time] = [0, 0]
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
days = timer_items[time][heat]
days |= action.time_setting.day_of_week
timer_items[time][heat] = days
num = 0
for key in timer_items.keys():
if timer_items[key][0] != 0:
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
num += 1
if timer_items[key][1] != 0:
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
num += 1
else:
raise NotImplementedError(self.name)
return data
@staticmethod
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
return Schedule(schedule["isEnabled"],
ScheduleKind[schedule["kind"]],
schedule["name"],
[Action.parse_dict(action) for action in schedule["actions"]])
class TimeControl:
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
self.is_enabled: bool = is_enabled
self.time_schedules: List[Schedule] = time_schedules
def __repr__(self):
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
}
def to_web_data(self, device_id: int):
data = {}
for schedule in self.time_schedules:
data.update(schedule.to_web_data(device_id))
return data
@staticmethod
def parse_dict(time_control: Dict):
return TimeControl(time_control["isEnabled"],
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
class ThermostatSkill(Skill):
def __init__(self, presets: List[Preset], next_change: Optional[Change],
temperature_drop_detection: TemperatureDropDetection,
target_temp: int, time_control: TimeControl,
mode: ThermostatSkillMode, used_temp_sensor: Unit):
super().__init__(SkillType.SmartHomeThermostat)
self.presets: List[Preset] = presets
self.next_change: Optional[Change] = next_change
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
self.target_temp: int = target_temp
self.time_control: TimeControl = time_control
self.mode: ThermostatSkillMode = mode
self.used_temp_sensor: Unit = used_temp_sensor
def __repr__(self):
# ToDo
pass
def to_web_data(self, device_id: int):
upper = 0.0
lower = 0.0
for preset in self.presets:
if preset.name == PresetName.UPPER_TEMPERATURE:
upper = preset.temperature
elif preset.name == PresetName.LOWER_TEMPERATURE:
lower = preset.temperature
data = {
"Heiztemp": upper,
"Absenktemp": lower
}
if device_id == self.used_temp_sensor.id:
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
else:
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
data.update(self.time_control.to_web_data(device_id))
data.update(self.temperature_drop_detection.to_web_data(device_id))
return data
def to_json(self):
state = {
"type": self.type.name,
"presets": [preset.to_json() for preset in self.presets],
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
"targetTemp": self.target_temp,
"timeControl": self.time_control.to_json(),
"mode": self.mode.name,
"usedTempSensor": self.used_temp_sensor.to_json()
}
if self.next_change is not None:
state["nextChange"] = self.next_change.to_json()
return state
@staticmethod
def parse_dict(skill: Dict):
return ThermostatSkill(
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
skill["targetTemp"],
TimeControl.parse_dict(skill["timeControl"]),
ThermostatSkillMode[skill["mode"]],
Unit.parse_dict(skill["usedTempSensor"], None)
)
class TemperatureSkill(Skill):
def __init__(self, offset: float, current_in_celsius: int):
super().__init__(SkillType.SmartHomeTemperatureSensor)
self.offset: float = offset
self.current_in_celsius: int = current_in_celsius
def __repr__(self):
return f"{self.type.name}: " \
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
def to_json(self):
return {
"type": self.type.name,
"offset": self.offset,
"currentInCelsius": self.current_in_celsius
}
def to_web_data(self, device_id: int):
data = {
"Roomtemp": self.current_in_celsius,
"Offset": self.offset
}
return data
@staticmethod
def parse_dict(skill: Dict):
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
class BatterySkill(Skill):
def __init__(self, charge_level_in_percent: int):
super().__init__(SkillType.SmartHomeBattery)
self.charge_level_in_percent: int = charge_level_in_percent
def __repr__(self):
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
def to_json(self):
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
def to_web_data(self, device_id: int):
return {}
@staticmethod
def parse_dict(skill: Dict):
return BatterySkill(skill["chargeLevelInPercent"])
class UnitTypes(Enum):
BATTERY = auto()
TEMPERATURE_SENSOR = auto()
THERMOSTAT = auto()
class Unit:
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
device: Optional[Device], skills: List[Union[Dict, Skill]],
interaction_controls: Optional[List] = None):
self.type: UnitTypes = unit_type
self.id: int = idx
self.display_name: str = display_name
self.device: Optional[Device] = device
self.skills: List[Skill] = skills
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
def __repr__(self):
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
if self.device is not None:
desc += f"device: {self.device.short_description()}; "
desc += f"skills: {self.skills}; "
if self.interaction_controls is not None:
desc += f"interactionControls: {self.interaction_controls} ;"
return desc
def to_json(self):
state = {
"type": self.type.name,
"id": self.id,
"displayName": self.display_name,
"skills": [skill.to_json() for skill in self.skills]
}
if self.device is not None:
state["device"] = self.device.to_short_json()
if self.interaction_controls is not None:
state["interactionControls"] = self.interaction_controls
return state
def to_web_data(self, device_id: int):
data = {}
for skill in self.skills:
data.update(skill.to_web_data(device_id))
return data
@staticmethod
def parse_dict(unit: Dict, device: Optional[Device]):
return Unit(UnitTypes[unit["type"]],
unit["id"],
unit["displayName"],
device,
[Skill.parse_dict(skill) for skill in unit["skills"]],
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
class DeviceType(Enum):
SmartHomeDevice = auto()
class DeviceCategory(Enum):
THERMOSTAT = auto()
class Device:
def __init__(self, state):
self.type: Optional[DeviceType] = None
self.is_deletable: bool = False
self.id: int = -1
self.master_connection_state = None # ToDo
self.display_name: Optional[str] = None
self.category: Optional[str] = None
self.units = None # ToDo
self.firmware_version: Optional[FirmwareVersion] = None
self.model: Optional[str] = None
self.is_editable: bool = False
self.manufacturer: Optional[Manufacturer] = None
self.push_service: Optional[PushService] = None
self.actor_identification_number: Optional[str] = None
if state is not None:
self.parse_state(state)
def __repr__(self):
# ToDo
pass
def short_description(self):
desc = f"masterConnectionState: {self.master_connection_state}; " \
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
f"manufacturer: {self.manufacturer}; " \
f"actorIdentificationNumber: {self.actor_identification_number}; " \
f"displayName: {self.display_name}"
return desc
def parse_state(self, state):
self.type = DeviceType[state["type"]]
self.is_deletable = state["isDeletable"]
self.id = state["id"]
self.master_connection_state = state["masterConnectionState"]
self.display_name = state["displayName"]
self.category = state["category"]
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
self.model = state["model"]
self.is_editable = state["isEditable"]
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
self.push_service = PushService.parse_dict(state["pushService"])
self.actor_identification_number = state["actorIdentificationNumber"]
def to_web_data(self):
data = {
"device": self.id,
"ule_device_name": self.display_name
}
for unit in self.units:
data.update(unit.to_web_data(self.id))
return data
def to_json(self):
state = {"type": self.type.name,
"isDeletable": self.is_deletable,
"id": self.id,
"masterConnectionState": self.master_connection_state,
"displayName": self.display_name,
"category": self.category,
"units": [unit.to_json() for unit in self.units],
"firmwareVersion": self.firmware_version.to_json(),
"model": self.model,
"isEditable": self.is_editable,
"manufacturer": self.manufacturer.to_json(),
"pushService": self.push_service.to_json(),
"actorIdentificationNumber": self.actor_identification_number}
return state
def set_offset(self, offset: float):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
temp_skill.offset = offset
def get_offset(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.offset
def get_temperature(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.current_in_celsius
def to_short_json(self):
return {
"masterConnectionState": self.master_connection_state,
"type": self.type.name,
"model": self.model,
"id": self.id,
"manufacturer": self.manufacturer.to_json(),
"actorIdentificationNumber": self.actor_identification_number,
"displayName": self.display_name
}

View File

@ -1,58 +1,27 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import re
import xml.etree.ElementTree as ET
from asyncio import Task
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Union
from typing import Optional, Tuple
import requests
from bs4 import BeautifulSoup
from device import Device
import json
import re
import hashlib
import xml.etree.ElementTree as ET
import logging
class FritzBox:
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False,
verify_ssl: bool = True, old_fb: bool = False) -> None:
def __init__(self, url:str, password:str, user:str = None) -> None:
self._endpoints = {
"login": "login_sid.lua?version=2",
"logout": "index.lua",
"data": "data.lua",
"device_details": "net/home_auto_hkr_edit.lua"
"data": "data.lua"
}
self.url: str = url
self.dry_run: bool = dry_run
self.user: Optional[str] = user
self.session: requests.Session = requests.Session()
self.password: str = password
self.sid: Optional[str] = None
self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None
self.verify_ssl = verify_ssl
self.old_fb = old_fb
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
# according to the manual
await asyncio.sleep(19 * 60)
self.check_session()
self.url = url
self.session = requests.Session()
self.password = password
self.sid = None
def _calc_challenge_v2(self, challenge: str) -> str:
logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile(
"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
@ -73,38 +42,22 @@ class FritzBox:
response = challenge + "-" + hashlib.md5(response).hexdigest()
return response
def check_session(self) -> None:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
if len(r.history) > 0:
if not self.login():
logging.error("Failed to login to Fritz!Box")
else:
logging.debug("Already logged in")
def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box")
def login(self, user:str = None) -> bool:
logging.debug(f"login user {user}")
challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl)
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
elif elem.tag == "Challenge":
challenge = elem.text
elif self.user is None and elem.tag == "Users":
elif user is None and elem.tag == "Users":
for user_elem in elem:
if "fritz" in user_elem.text:
self.user = user_elem.text
user = user_elem.text
assert challenge is not None and self.user is not None
assert challenge is not None and user is not None
if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge)
@ -112,217 +65,102 @@ class FritzBox:
response = self._calc_challenge_v1(challenge)
data = {
"username": self.user,
"username": user,
"response": response
}
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl)
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data)
logging.debug(r.text)
xml = ET.fromstring(r.text)
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
if len(self.sid) != self.sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}")
return len(self.sid) != self.sid.count("0")
def logout(self) -> bool:
logging.info("logout")
logging.debug("logout")
data = {
"xhr": 1,
"xhr":1,
"sid": self.sid,
"logout": 1,
"no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl)
if self.hold_connection is not None:
self.hold_connection.cancel()
"no_sidrenew":""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
return r.status_code == 200
def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]:
def list_devices(self):
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh",
"page":"sh_dev",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
else:
return None
devices_overview_raw = BeautifulSoup(r.text, "html.parser")
table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table
devices_raw = []
for r in table_rows.find_all("tr"):
name = r.findAll("td", {'class': ['name', 'cut_overflow']})
button = r.findAll("td", {'class': 'btncolumn'})
temperature = r.findAll("td", {'class': 'temperature'})
if name is None or len(name) < 1:
continue
if button is None or len(button) < 1:
continue
if temperature is None or len(temperature) < 1:
continue
name = name[0].string
id = int(button[0].button["value"])
print("Temperature raw:", temperature)
try:
temperature = float(temperature[0].text.split(" ")[0].replace(",", "."))
except:
print("Error parsing temperature.")
return []
request_data = {
"device": id,
"sid": self.sid,
"xhr": 1
}
r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data,
verify=self.verify_ssl)
device_content_raw = BeautifulSoup(r.text, "html.parser")
offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"])
devices_raw.append({
"name": name,
"display_name": name,
"id": id,
"temperature": temperature,
"offset": offset
})
return devices_raw
def list_devices(self) -> Optional[List[Device]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh_dev",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else:
return None
devices: List[Device] = []
for device in json.loads(r.text)["data"]["devices"]:
devices.append(Device(device))
devices = json.loads(r.text)["data"]["devices"]
return devices
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
if idx is None and name is None:
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
if id is None and name is None:
logging.debug("No id or name given")
return None
if self.old_fb:
devices = self.list_devices_old()
else:
devices = self.list_devices()
device = None
devices = self.list_devices()
for device in devices:
if isinstance(device, dict):
if device["id"] == idx or device["display_name"] == name:
break
else:
if device.id == idx or device.display_name == name:
break
if device["id"] == id or device["displayName"] == name:
break
device = None
if device is None:
logging.debug(f"Device {idx} {name} not found")
logging.debug(f"Device {id} {name} not found")
return None
return device
current_temp = None
current_offset = None
for unit in device["units"]:
if unit["type"] == "TEMPERATURE_SENSOR":
for skill in unit["skills"]:
if skill["type"] == "SmartHomeTemperatureSensor":
current_temp = float(skill["currentInCelsius"])
current_offset = float(skill["offset"])
def set_offset(self, device: Union[Device, Dict]) -> None:
if self.dry_run:
logging.warning("No updates in dry-run-mode")
return
return current_temp, current_offset, device["id"], device["displayName"]
old_type = isinstance(device, dict)
if old_type:
device_id = device["id"]
else:
device_id = device.id
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str):
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
if not old_type:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
data = {
"xhr":1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"view": "",
"back_to_page": "sh_dev",
"ule_device_name": device_name,
"WindowOpenTrigger":8,
"WindowOpenTimer":10,
"tempsensor": "own",
"Roomtemp": f"{current_temp}",
"ExtTempsensorID":"tochoose",
"Offset": f"{offset}",
"apply":"",
"oldpage":"/net/home_auto_hkr_edit.lua"
}
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"view": "",
"back_to_page": "sh_dev",
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
else:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"no_sidrenew": "",
"device": device_id,
"view": "",
"back_to_page": "/net/home_auto_overview.lua",
"ule_device_name": device["name"],
"Offset": device["offset"],
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua",
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
def correct_offset(self, device_name: str, real_temp: float):
elapsed = None
if device_name in self.update_time.keys():
elapsed = datetime.now() - self.update_time[device_name]
logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name)
logging.info(f"device: {device}")
if device is None:
return
offset_changed = False
if self.old_fb:
new_offset = device["offset"] + real_temp - device["temperature"]
offset_changed = new_offset != device["offset"]
logging.info(f"Update offset from {device['offset']} to {new_offset} (changed: {offset_changed})")
else:
new_offset = device.get_offset() + real_temp - device.get_temperature()
offset_changed = new_offset != device.get_offset()
logging.info(f"Update offset from {device.get_offset()} to {new_offset} (changed: {offset_changed})")
if offset_changed:
if self.old_fb:
device["offset"] = new_offset
else:
device.set_offset(new_offset)
self.set_offset(device)
self.update_time[device_name] = datetime.now()
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)

View File

@ -1,152 +1,66 @@
from __future__ import annotations
import asyncio
import json
import logging
from asyncio import Queue, Task, Event, Lock
from typing import Callable, Dict, Optional
import websockets
from websockets.exceptions import InvalidStatusCode
from typing import Callable
import websocket
import time
class HomeAssistantAPI:
def __init__(self, token: str, url: str) -> None:
def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None:
self.token = token
self.msg_id = 1
self.msg_id_lock = Lock()
self.ws: websockets.WebSocketClientProtocol = None
self.url = url
self.receiver: Optional[Task] = None
self.sender: Optional[Task] = None
self.sending_queue: Queue = Queue()
self.authenticated: Event = Event()
self.events: Dict[int, Queue] = {}
self.responses: Dict[int, Dict] = {}
self.response_events: Dict[int, Event] = {}
self.response_lock: Lock = Lock()
self.ws = None
self.subscriptions = {}
self.init_callback = initialize
async def connect(self):
retries = 5
logging.info("Connect to home assistant...")
while True:
try:
self.ws = await websockets.connect(self.url)
self.sender = asyncio.create_task(self.sending())
await self.auth()
self.receiver = asyncio.create_task(self.receiving())
return True
except (InvalidStatusCode, TimeoutError):
if retries > 0:
retries -= 1
await asyncio.sleep(30)
logging.info(f"Retry home assistant connection... ({retries})")
continue
else:
logging.error("Invalid status code while connecting to Home Assistant")
await self.exit_loop()
return False
def handle_message(self, ws: websocket.WebSocket, msg: str) -> None:
if self.ws is None:
self.ws = ws
async def wait_for_close(self):
await self.ws.wait_closed()
message: object = json.loads(msg)
async def receiving(self):
logging.debug("Start receiving")
async for message in self.ws:
msg: Dict = json.loads(message)
if msg["type"] == "event":
if msg["id"] not in self.events.keys():
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
continue
await self.events[msg["id"]].put(msg["event"])
else:
async with self.response_lock:
self.responses[msg["id"]] = msg
if msg["id"] in self.response_events.keys():
self.response_events[msg["id"]].set()
async def wait_for(self, idx):
async with self.response_lock:
if idx in self.responses.keys():
msg = self.responses[idx]
del self.responses[idx]
return msg
self.response_events[idx] = Event()
await self.response_events[idx].wait()
async with self.response_lock:
del self.response_events[idx]
if idx not in self.responses.keys():
logging.error("Response ID not found")
return None
msg = self.responses[idx]
del self.responses[idx]
return msg
async def exit_loop(self):
if self.sender is not None:
self.sender.cancel()
if self.receiver is not None:
self.receiver.cancel()
async def auth(self):
msg = json.loads(await self.ws.recv())
if msg["type"] != "auth_required":
logging.error("Authentication error: Not required")
await self.exit_loop()
response = {
"type": "auth",
"access_token": self.token
}
await self.sending_queue.put(response)
msg = json.loads(await self.ws.recv())
if msg["type"] == "auth_invalid":
if message["type"] == "auth_required":
response = {
"type": "auth",
"access_token": self.token
}
logging.debug(response)
ws.send(json.dumps(response))
return
elif message["type"] == "auth_invalid":
logging.info("Auth failed")
await self.exit_loop()
elif msg["type"] == "auth_ok":
ws.close()
return None
elif message["type"] == "auth_ok":
logging.debug("Authenticated")
self.authenticated.set()
self.init_callback(self)
self.init_callback = None
return
elif message["type"] == "event":
event = message["event"]
if event["event_type"] in self.subscriptions.keys():
self.subscriptions[event["event_type"]](event)
else:
logging.error(f"Unknown answer for auth: {msg}")
await self.exit_loop()
print("Received", message)
async def sending(self):
while msg := await self.sending_queue.get():
await self.ws.send(json.dumps(msg))
def subscribe_event(self, event_type: str, callback: Callable[[object], None]):
async def subscribe_event(self, event_type: str):
await self.authenticated.wait()
if self.ws is None:
logging.debug("Websocket not set")
return
if event_type in self.subscriptions.keys():
logging.warning(f"Already subscribed to {event_type}")
return
logging.info(f"Subscribe to {event_type}")
async with self.msg_id_lock:
msg_id = self.msg_id
response = {
"id": msg_id,
"type": "subscribe_events",
"event_type": event_type
}
self.events[msg_id] = Queue()
self.msg_id += 1
await self.sending_queue.put(response)
return msg_id
async def get_states(self):
await self.authenticated.wait()
async with self.msg_id_lock:
message = {
"id": self.msg_id,
"type": "get_states"
}
self.msg_id += 1
await self.sending_queue.put(message)
response = await self.wait_for(message["id"])
# ToDo: Error handling
return response["result"]
async def get_device_state(self, entity_id: str):
device_states = await self.get_states()
for device_state in device_states:
if device_state["entity_id"] == entity_id:
return device_state
return None
self.subscriptions[event_type] = callback
response = {
"id": self.msg_id,
"type": "subscribe_events",
"event_type": event_type
}
self.msg_id += 1
self.ws.send(json.dumps(response))

View File

@ -1,4 +1,3 @@
#!/usr/bin/with-contenv bashio
#!/usr/bin/env bashio
source /opt/venv/bin/activate
python3 /srv/sync_ha_fb.py /data/options.json
python3 /srv/sync_ha_fb.py

View File

@ -1,105 +1,78 @@
#!/usr/bin/env python3
import asyncio
"""
hier die verbindungen zu HA aufbauen etc
außerdem das vergleichen der werte und dass anstoßen der updates
"""
import os
import sys
from fritzbox import FritzBox
from homeassistant import HomeAssistantAPI
import logging
import websocket
import json
sensor_mappings = {}
thermostate_mappings = {}
mappings = {}
def handle_event(event):
entity_id = event["data"]["entity_id"]
if entity_id in mappings.keys():
new_state = event["data"]["new_state"]
logging.debug(entity_id)
logging.debug(new_state["attributes"]["temperature"])
rounded = round(float(new_state["attributes"]["temperature"])*2)/2
logging.debug(rounded)
if new_state["attributes"]["device_class"] == "temperature":
if entity_id in mappings.keys():
fb.login()
logged = False
for thermostate in mappings[entity_id]:
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
if not logged:
logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})")
logged = True
logging.info(f"Current measurement from {thermostate}: {current_temp}")
new_offset = current_offset + rounded - current_temp
if new_offset != current_offset:
old_offset = current_offset
logging.debug(f"Set offset for {thermostate} from {current_offset} to {new_offset}")
fb.set_offset(current_temp, new_offset, id, name)
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
logging.debug(f"Target: {new_offset} ; Set: {current_offset}")
if new_offset == current_offset:
logging.info(f"Adjustet offset from {old_offset} to {new_offset}")
else:
logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}")
fb.logout()
async def handle_event(idx: int):
logging.info(f"Wait for events for {idx}")
def on_error(ws, error):
print(error)
while event := await ha.events[idx].get():
try:
entity_id = event["data"]["entity_id"]
if entity_id in sensor_mappings.keys() or entity_id in thermostate_mappings.keys():
state = await ha.get_device_state(entity_id)
new_state = event["data"]["new_state"]
logging.debug(f"received changed state from {entity_id}")
if entity_id in thermostate_mappings.keys() and state["state"] != "unavailable":
therm_temp = new_state["attributes"]["current_temperature"]
therm_name = new_state["attributes"]["friendly_name"]
sensor = thermostate_mappings[entity_id]
sensor_state = await ha.get_device_state(sensor)
sensor_state_temp = sensor_state["attributes"]["temperature"] if "temperatur" in sensor_state[
"attributes"] else sensor_state["state"]
sensor_temp = round(float(sensor_state_temp) * 2) / 2
if therm_temp != sensor_temp:
logging.info(
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state_temp} ({sensor_temp})")
fb.correct_offset(therm_name, sensor_temp)
def on_close(ws, close_status_code, close_msg):
pass
elif entity_id in sensor_mappings.keys():
new_state_temp = new_state["attributes"]["temperature"] if "temperatur" in new_state[
"attributes"] else new_state["state"]
sensor_temp = round(float(new_state_temp) * 2) / 2
logging.info(sensor_temp)
for thermostate in sensor_mappings[entity_id]:
therm_state = await ha.get_device_state(thermostate)
if therm_state["state"] == "unavailable":
continue
therm_temp = float(therm_state["attributes"]["current_temperature"])
therm_name = therm_state["attributes"]["friendly_name"]
if therm_temp != sensor_temp or True:
logging.info(
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state_temp} ({sensor_temp})")
fb.correct_offset(therm_name, sensor_temp)
except KeyError as e:
logging.error(e)
def on_open(ws):
pass
async def init(ha: HomeAssistantAPI, fb: FritzBox):
if not await ha.connect():
return
def init(ha: HomeAssistantAPI):
logging.debug("Subscribe")
state_changed_id = await ha.subscribe_event("state_changed")
logging.debug(state_changed_id)
asyncio.create_task(handle_event(state_changed_id))
fb.login()
await ha.wait_for_close()
logging.info("Websocket closed, shutting down..")
ha.subscribe_event("state_changed", handle_event)
async def main():
config_path = sys.argv[1]
config = json.load(open(config_path))
level = logging.INFO
if "log_level" in config:
if config["log_level"] == "DEBUG":
level = logging.DEBUG
logging.basicConfig(level=level, format="[%(asctime)s] [%(levelname)s] %(message)s")
logging.debug(config)
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s")
config = json.load(open("/data/options.json"))
logging.debug(config)
for mapping in config["mappings"]:
if mapping["sensor"] not in mappings.keys():
mappings[mapping["sensor"]] = []
mappings[mapping["sensor"]].append(mapping["thermostate"])
global fb
fb = FritzBox(url=config["fritzbox"]["url"],
user=config["fritzbox"]["username"],
password=config["fritzbox"]["password"],
update_timeout=config["update_timeout"],
dry_run=False,
verify_ssl=config["fritzbox"]["verify_ssl"],
old_fb=config["fritzbox"]["old_fb"])
supervisor_url = "ws://supervisor/core/websocket"
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
global ha
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"])
ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init)
for mapping in config["mappings"]:
if mapping["sensor"] not in sensor_mappings.keys():
sensor_mappings[mapping["sensor"]] = []
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
websocket.enableTrace(False)
ws = websocket.WebSocketApp("ws://supervisor/core/websocket",
on_open=on_open,
on_message=ha.handle_message,
on_error=on_error,
on_close=on_close)
try:
await init(ha, fb)
except KeyboardInterrupt:
pass
asyncio.run(main())
ws.run_forever()