Compare commits

..

No commits in common. "master" and "v0.1.0" have entirely different histories.

8 changed files with 187 additions and 1093 deletions

5
.gitignore vendored
View File

@ -15,7 +15,6 @@ dist/
downloads/ downloads/
eggs/ eggs/
.eggs/ .eggs/
.venv/
lib/ lib/
lib64/ lib64/
parts/ parts/
@ -169,7 +168,6 @@ pip-selfcheck.json
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff # User-specific stuff
.idea/
.idea/**/workspace.xml .idea/**/workspace.xml
.idea/**/tasks.xml .idea/**/tasks.xml
.idea/**/usage.statistics.xml .idea/**/usage.statistics.xml
@ -241,6 +239,3 @@ fabric.properties
# Android studio 3.1+ serialized cache file # Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser .idea/caches/build_file_checksums.ser
# local config test
options.json
.DS_Store

View File

@ -2,7 +2,8 @@ ARG BUILD_FROM
FROM $BUILD_FROM FROM $BUILD_FROM
# Install requirements for add-on # Install requirements for add-on
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests RUN apk update && apk add --no-cache python3 py-pip
RUN python3 -m pip install websocket-client requests
WORKDIR /data WORKDIR /data
@ -10,7 +11,6 @@ WORKDIR /data
COPY sync_ha_fb.py /srv COPY sync_ha_fb.py /srv
COPY fritzbox.py /srv COPY fritzbox.py /srv
COPY homeassistant.py /srv COPY homeassistant.py /srv
COPY device.py /srv
COPY run.sh / COPY run.sh /
RUN chmod a+x /run.sh RUN chmod a+x /run.sh

View File

@ -1,11 +1,8 @@
name: "Fritz!Box Temperature Sync Dev" name: "Fritz!Box Temperature Sync"
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant" description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant"
version: "0.4.3" version: "0.1.0"
startup: "application" slug: "fritz_temp_sync"
stage: "stable"
slug: "fritz_temp_sync_dev"
homeassistant_api: true homeassistant_api: true
init: false
arch: arch:
- aarch64 - aarch64
- amd64 - amd64
@ -19,14 +16,10 @@ options:
mappings: mappings:
- sensor: null - sensor: null
thermostate: null thermostate: null
update_timeout: 15
schema: schema:
fritzbox: fritzbox:
url: url url: url
username: "str?"
password: str password: str
mappings: mappings:
- sensor: str - sensor: str
thermostate: str thermostate: str
update_timeout: int
log_level: "str?"

View File

@ -1,714 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum, IntFlag, auto
from typing import Dict, List, Optional, Union
import typing
class WeekDay(IntFlag):
MON = 0b1
TUE = 0b10
WED = 0b100
THU = 0b1000
FRI = 0b10000
SAT = 0b100000
SUN = 0b1000000
class Manufacturer:
def __init__(self, name: str):
self.name: str = name
def __repr__(self):
return f"name: {self.name}"
def to_json(self):
return {"name": self.name}
@staticmethod
def parse_dict(manufacturer: Dict):
return Manufacturer(manufacturer["name"])
class FirmwareVersion:
def __init__(self, search: bool, current: str, update: bool, running: bool):
self.search: bool = search
self.current: str = current
self.update: bool = update
self.running: bool = running
def __repr__(self):
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
def to_json(self):
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
@staticmethod
def parse_dict(firmware_version: dict):
return FirmwareVersion(firmware_version["search"],
firmware_version["current"],
firmware_version["update"],
firmware_version["running"])
class PushService:
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
self.mail_address: str = mail_address
self.unit_settings: List = unit_settings
self.is_enabled: bool = is_enabled
def __repr__(self):
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
f"isEnabled: {self.is_enabled}>"
def to_json(self):
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
@staticmethod
def parse_dict(push_service: Dict):
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
class SkillType(Enum):
SmartHomeTemperatureSensor = auto()
SmartHomeThermostat = auto()
SmartHomeBattery = auto()
class Skill(ABC):
def __init__(self, skill_type: SkillType):
self.type: SkillType = skill_type
@abstractmethod
def to_json(self):
pass
@abstractmethod
def to_web_data(self, device_id: int):
pass
@staticmethod
def parse_dict(skill: Dict):
skill_type = SkillType[skill["type"]]
if skill_type == SkillType.SmartHomeTemperatureSensor:
return TemperatureSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeThermostat:
return ThermostatSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeBattery:
return BatterySkill.parse_dict(skill)
else:
raise NotImplementedError(skill_type)
class PresetName(Enum):
LOWER_TEMPERATURE = auto()
UPPER_TEMPERATURE = auto()
HOLIDAY_TEMPERATURE = auto()
class Preset:
def __init__(self, name: PresetName, temperature: Optional[int]):
self.name: PresetName = name
self.temperature: Optional[int] = temperature
def __repr__(self):
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
def to_json(self):
return {"name": self.name.name, "temperature": self.temperature}
class Description:
def __init__(self, action: str, preset_temperature: Optional[Preset]):
self.action: str = action
self.preset_temperature: Optional[Preset] = preset_temperature
def __repr__(self):
desc = f"<Description action: {self.action}"
if self.preset_temperature is not None:
desc += f"; presetTemperaturet: {self.preset_temperature}"
desc += " >"
return desc
def to_json(self):
if self.preset_temperature is not None:
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
return {"action": self.action}
@staticmethod
def parse_dict(description: Dict):
preset: Optional[Preset] = None
if "presetTemperature" in description.keys():
preset = Preset(PresetName[description["presetTemperature"]["name"]],
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
return Description(description["action"], preset)
class Repetition(Enum):
NONE = auto()
YEARLY = auto()
class TimeSetting:
def __init__(self, start_date: Optional[str], start_time: Optional[str],
end_date: Optional[str] = None, end_time: Optional[str] = None,
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
self.start_date: Optional[str] = start_date
self.start_time: Optional[str] = start_time
self.end_date: Optional[str] = end_date
self.end_time: Optional[str] = end_time
self.repetition: Repetition = repetition
self.day_of_week: Optional[WeekDay] = day_of_week
def __repr__(self):
desc = f"<TimeSettings "
if self.day_of_week is not None:
desc += f"dayOfWeek: {self.day_of_week.name}; "
if self.start_date is not None:
desc += f"startDate: {self.start_date} "
if self.start_time is not None:
desc += f"startTime: {self.start_time} "
if self.end_date is not None:
desc += f"endDate: {self.end_date} "
if self.end_time is not None:
desc += f"endTime: {self.end_time} "
if self.repetition != Repetition.NONE:
desc += f"repetition: {self.repetition.name} "
desc += ">"
return desc
def to_json(self): # ToDo: Return typehints for all to_json
state = {}
if self.start_date is not None:
state["startDate"] = self.start_date
if self.start_time is not None:
state["startTime"] = self.start_time
if self.end_date is not None:
state["endDate"] = self.end_date
if self.end_time is not None:
state["endTime"] = self.end_time
if self.repetition != Repetition.NONE:
state["repetition"] = self.repetition.name
if self.day_of_week is not None:
state = {"dayOfWeek": self.day_of_week.name,
"time": state}
return state
@staticmethod
def parse_dict(time_setting: Dict):
start_date: Optional[str] = None
start_time: Optional[str] = None
end_date: Optional[str] = None
end_time: Optional[str] = None
day_of_week: Optional[WeekDay] = None
repetition: Repetition = Repetition.NONE
if "dayOfWeek" in time_setting.keys():
day_of_week = WeekDay[time_setting["dayOfWeek"]]
time_setting = time_setting["time"]
if "startDate" in time_setting.keys():
start_date = time_setting["startDate"]
if "startTime" in time_setting.keys():
start_time = time_setting["startTime"]
if "endDate" in time_setting.keys():
end_date = time_setting["endDate"]
if "endTime" in time_setting.keys():
end_time = time_setting["endTime"]
if "repetition" in time_setting.keys():
repetition = Repetition[time_setting["repetition"]]
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
class Change:
def __init__(self, description: Description, time_setting: TimeSetting):
self.description: Description = description
self.time_setting: TimeSetting = time_setting
def __repr__(self):
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
def to_json(self):
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
@staticmethod
def parse_dict(change: Dict):
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
class TemperatureDropDetection:
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
self.sensitivity: int = sensitivity
def __repr__(self):
# ToDo
pass
def to_json(self):
return {
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
"sensitivity": self.sensitivity
}
def to_web_data(self, device_id: int):
return {
"WindowOpenTrigger": self.sensitivity + 3,
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
}
@staticmethod
def parse_dict(drop_detection: Dict):
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
drop_detection["sensitivity"])
class ScheduleKind(Enum):
REPETITIVE = auto()
WEEKLY_TIMETABLE = auto()
class ThermostatSkillMode(Enum):
TARGET_TEMPERATURE = auto()
class Action:
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
self.is_enabled: bool = is_enabled
self.time_setting: TimeSetting = time_setting
self.description: Description = description
def __repr__(self):
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
f"description: {self.description} >"
def to_json(self):
return {"isEnabled": self.is_enabled,
"timeSetting": self.time_setting.to_json(),
"description": self.description.to_json()}
@staticmethod
def parse_dict(action: Dict):
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
Description.parse_dict(action["description"]))
class Schedule:
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
self.is_enabled: bool = is_enabled
self.kind: ScheduleKind = kind
self.name: str = name
self.actions: List[Action] = actions
def __repr__(self):
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
f"name: {self.name}; actions: {self.actions}>"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"kind": self.kind.name,
"name": self.name,
"actions": [action.to_json() for action in self.actions]
}
def to_web_data(self, device_id: int):
data = {}
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
enabled_count = 0
for num, holiday in enumerate(self.actions):
num += 1
_, start_month, start_day = holiday.time_setting.start_date.split("-")
_, end_month, end_day = holiday.time_setting.end_date.split("-")
data.update({
f"Holiday{num}StartDay": start_day,
f"Holiday{num}StartMonth": start_month,
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
f"Holiday{num}EndDay": end_day,
f"Holiday{num}EndMonth": end_month,
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
f"Holiday{num}ID": num
})
if holiday.is_enabled:
enabled_count += 1
data["HolidayEnabledCount"] = enabled_count
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
data = {
"SummerStartDay": start_day,
"SummerStartMonth": start_month,
"SummerEndDay": end_day,
"SummerEndMonth": end_month,
"SummerEnabled": 1 if self.is_enabled else 0
}
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
timer_items = {}
for action in self.actions:
if not action.is_enabled:
continue
time = "".join(action.time_setting.start_time.split(":")[:-1])
if time not in timer_items.keys():
timer_items[time] = [0, 0]
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
days = timer_items[time][heat]
days |= action.time_setting.day_of_week
timer_items[time][heat] = days
num = 0
for key in timer_items.keys():
if timer_items[key][0] != 0:
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
num += 1
if timer_items[key][1] != 0:
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
num += 1
else:
raise NotImplementedError(self.name)
return data
@staticmethod
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
return Schedule(schedule["isEnabled"],
ScheduleKind[schedule["kind"]],
schedule["name"],
[Action.parse_dict(action) for action in schedule["actions"]])
class TimeControl:
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
self.is_enabled: bool = is_enabled
self.time_schedules: List[Schedule] = time_schedules
def __repr__(self):
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
}
def to_web_data(self, device_id: int):
data = {}
for schedule in self.time_schedules:
data.update(schedule.to_web_data(device_id))
return data
@staticmethod
def parse_dict(time_control: Dict):
return TimeControl(time_control["isEnabled"],
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
class ThermostatSkill(Skill):
def __init__(self, presets: List[Preset], next_change: Optional[Change],
temperature_drop_detection: TemperatureDropDetection,
target_temp: int, time_control: TimeControl,
mode: ThermostatSkillMode, used_temp_sensor: Unit):
super().__init__(SkillType.SmartHomeThermostat)
self.presets: List[Preset] = presets
self.next_change: Optional[Change] = next_change
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
self.target_temp: int = target_temp
self.time_control: TimeControl = time_control
self.mode: ThermostatSkillMode = mode
self.used_temp_sensor: Unit = used_temp_sensor
def __repr__(self):
# ToDo
pass
def to_web_data(self, device_id: int):
upper = 0.0
lower = 0.0
for preset in self.presets:
if preset.name == PresetName.UPPER_TEMPERATURE:
upper = preset.temperature
elif preset.name == PresetName.LOWER_TEMPERATURE:
lower = preset.temperature
data = {
"Heiztemp": upper,
"Absenktemp": lower
}
if device_id == self.used_temp_sensor.id:
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
else:
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
data.update(self.time_control.to_web_data(device_id))
data.update(self.temperature_drop_detection.to_web_data(device_id))
return data
def to_json(self):
state = {
"type": self.type.name,
"presets": [preset.to_json() for preset in self.presets],
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
"targetTemp": self.target_temp,
"timeControl": self.time_control.to_json(),
"mode": self.mode.name,
"usedTempSensor": self.used_temp_sensor.to_json()
}
if self.next_change is not None:
state["nextChange"] = self.next_change.to_json()
return state
@staticmethod
def parse_dict(skill: Dict):
return ThermostatSkill(
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
skill["targetTemp"],
TimeControl.parse_dict(skill["timeControl"]),
ThermostatSkillMode[skill["mode"]],
Unit.parse_dict(skill["usedTempSensor"], None)
)
class TemperatureSkill(Skill):
def __init__(self, offset: float, current_in_celsius: int):
super().__init__(SkillType.SmartHomeTemperatureSensor)
self.offset: float = offset
self.current_in_celsius: int = current_in_celsius
def __repr__(self):
return f"{self.type.name}: " \
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
def to_json(self):
return {
"type": self.type.name,
"offset": self.offset,
"currentInCelsius": self.current_in_celsius
}
def to_web_data(self, device_id: int):
data = {
"Roomtemp": self.current_in_celsius,
"Offset": self.offset
}
return data
@staticmethod
def parse_dict(skill: Dict):
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
class BatterySkill(Skill):
def __init__(self, charge_level_in_percent: int):
super().__init__(SkillType.SmartHomeBattery)
self.charge_level_in_percent: int = charge_level_in_percent
def __repr__(self):
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
def to_json(self):
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
def to_web_data(self, device_id: int):
return {}
@staticmethod
def parse_dict(skill: Dict):
return BatterySkill(skill["chargeLevelInPercent"])
class UnitTypes(Enum):
BATTERY = auto()
TEMPERATURE_SENSOR = auto()
THERMOSTAT = auto()
class Unit:
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
device: Optional[Device], skills: List[Union[Dict, Skill]],
interaction_controls: Optional[List] = None):
self.type: UnitTypes = unit_type
self.id: int = idx
self.display_name: str = display_name
self.device: Optional[Device] = device
self.skills: List[Skill] = skills
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
def __repr__(self):
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
if self.device is not None:
desc += f"device: {self.device.short_description()}; "
desc += f"skills: {self.skills}; "
if self.interaction_controls is not None:
desc += f"interactionControls: {self.interaction_controls} ;"
return desc
def to_json(self):
state = {
"type": self.type.name,
"id": self.id,
"displayName": self.display_name,
"skills": [skill.to_json() for skill in self.skills]
}
if self.device is not None:
state["device"] = self.device.to_short_json()
if self.interaction_controls is not None:
state["interactionControls"] = self.interaction_controls
return state
def to_web_data(self, device_id: int):
data = {}
for skill in self.skills:
data.update(skill.to_web_data(device_id))
return data
@staticmethod
def parse_dict(unit: Dict, device: Optional[Device]):
return Unit(UnitTypes[unit["type"]],
unit["id"],
unit["displayName"],
device,
[Skill.parse_dict(skill) for skill in unit["skills"]],
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
class DeviceType(Enum):
SmartHomeDevice = auto()
class DeviceCategory(Enum):
THERMOSTAT = auto()
class Device:
def __init__(self, state):
self.type: Optional[DeviceType] = None
self.is_deletable: bool = False
self.id: int = -1
self.master_connection_state = None # ToDo
self.display_name: Optional[str] = None
self.category: Optional[str] = None
self.units = None # ToDo
self.firmware_version: Optional[FirmwareVersion] = None
self.model: Optional[str] = None
self.is_editable: bool = False
self.manufacturer: Optional[Manufacturer] = None
self.push_service: Optional[PushService] = None
self.actor_identification_number: Optional[str] = None
if state is not None:
self.parse_state(state)
def __repr__(self):
# ToDo
pass
def short_description(self):
desc = f"masterConnectionState: {self.master_connection_state}; " \
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
f"manufacturer: {self.manufacturer}; " \
f"actorIdentificationNumber: {self.actor_identification_number}; " \
f"displayName: {self.display_name}"
return desc
def parse_state(self, state):
self.type = DeviceType[state["type"]]
self.is_deletable = state["isDeletable"]
self.id = state["id"]
self.master_connection_state = state["masterConnectionState"]
self.display_name = state["displayName"]
self.category = state["category"]
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
self.model = state["model"]
self.is_editable = state["isEditable"]
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
self.push_service = PushService.parse_dict(state["pushService"])
self.actor_identification_number = state["actorIdentificationNumber"]
def to_web_data(self):
data = {
"device": self.id,
"ule_device_name": self.display_name
}
for unit in self.units:
data.update(unit.to_web_data(self.id))
return data
def to_json(self):
state = {"type": self.type.name,
"isDeletable": self.is_deletable,
"id": self.id,
"masterConnectionState": self.master_connection_state,
"displayName": self.display_name,
"category": self.category,
"units": [unit.to_json() for unit in self.units],
"firmwareVersion": self.firmware_version.to_json(),
"model": self.model,
"isEditable": self.is_editable,
"manufacturer": self.manufacturer.to_json(),
"pushService": self.push_service.to_json(),
"actorIdentificationNumber": self.actor_identification_number}
return state
def set_offset(self, offset: float):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
temp_skill.offset = offset
def get_offset(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.offset
def get_temperature(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.current_in_celsius
def to_short_json(self):
return {
"masterConnectionState": self.master_connection_state,
"type": self.type.name,
"model": self.model,
"id": self.id,
"manufacturer": self.manufacturer.to_json(),
"actorIdentificationNumber": self.actor_identification_number,
"displayName": self.display_name
}

View File

@ -1,49 +1,28 @@
from __future__ import annotations from typing import Optional, Tuple
import asyncio
import hashlib
import json
import logging
import re
import xml.etree.ElementTree as ET
from asyncio import Task
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import requests import requests
import json
from device import Device import re
import hashlib
import xml.etree.ElementTree as ET
import logging
class FritzBox: class FritzBox:
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None: def __init__(self, url:str, password:str, user:str = None) -> None:
self._endpoints = { self._endpoints = {
"login": "login_sid.lua?version=2", "login": "login_sid.lua?version=2",
"logout": "index.lua", "logout": "index.lua",
"data": "data.lua" "data": "data.lua"
} }
self.url: str = url self.url = url
self.dry_run: bool = dry_run self.session = requests.Session()
self.user: Optional[str] = user self.password = password
self.session: requests.Session = requests.Session() self.sid = None
self.password: str = password
self.sid: Optional[str] = None
self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
# according to the manual
await asyncio.sleep(19 * 60)
self.check_session()
def _calc_challenge_v2(self, challenge: str) -> str: def _calc_challenge_v2(self, challenge: str) -> str:
logging.debug(f"Calculate v2 challenge: {challenge}") logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile( chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict() chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"]) salt1: bytes = bytes.fromhex(chall_parts["salt1"])
iter1: int = int(chall_parts["iter1"]) iter1: int = int(chall_parts["iter1"])
@ -63,24 +42,8 @@ class FritzBox:
response = challenge + "-" + hashlib.md5(response).hexdigest() response = challenge + "-" + hashlib.md5(response).hexdigest()
return response return response
def check_session(self) -> None: def login(self, user:str = None) -> bool:
data = { logging.debug(f"login user {user}")
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
if len(r.history) > 0:
if not self.login():
logging.error("Failed to login to Fritz!Box")
else:
logging.debug("Already logged in")
def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box")
challenge = None challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}") r = self.session.get(f"{self.url}/{self._endpoints['login']}")
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
@ -89,12 +52,12 @@ class FritzBox:
self.sid = elem.text self.sid = elem.text
elif elem.tag == "Challenge": elif elem.tag == "Challenge":
challenge = elem.text challenge = elem.text
elif self.user is None and elem.tag == "Users": elif user is None and elem.tag == "Users":
for user_elem in elem: for user_elem in elem:
if "fritz" in user_elem.text: if "fritz" in user_elem.text:
self.user = user_elem.text user = user_elem.text
assert challenge is not None and self.user is not None assert challenge is not None and user is not None
if challenge.startswith("2$"): if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge) response = self._calc_challenge_v2(challenge)
@ -102,7 +65,7 @@ class FritzBox:
response = self._calc_challenge_v1(challenge) response = self._calc_challenge_v1(challenge)
data = { data = {
"username": self.user, "username": user,
"response": response "response": response
} }
@ -112,31 +75,27 @@ class FritzBox:
for elem in xml: for elem in xml:
if elem.tag == "SID": if elem.tag == "SID":
self.sid = elem.text self.sid = elem.text
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}") logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}")
if len(self.sid) != self.sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
return len(self.sid) != self.sid.count("0") return len(self.sid) != self.sid.count("0")
def logout(self) -> bool: def logout(self) -> bool:
logging.info("logout") logging.debug("logout")
data = { data = {
"xhr": 1, "xhr":1,
"sid": self.sid, "sid": self.sid,
"logout": 1, "logout": 1,
"no_sidrenew": ""} "no_sidrenew":""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
if self.hold_connection is not None:
self.hold_connection.cancel()
return r.status_code == 200 return r.status_code == 200
def list_devices(self) -> Optional[List[Device]]: def list_devices(self):
data = { data = {
"xhr": 1, "xhr": 1,
"sid": self.sid, "sid": self.sid,
"lang": "de", "lang": "de",
"page": "sh_dev", "page":"sh_dev",
"xhrId": "all" "xhrId": "all"
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
@ -146,68 +105,62 @@ class FritzBox:
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
else: else:
return None return None
devices: List[Device] = [] devices = json.loads(r.text)["data"]["devices"]
for device in json.loads(r.text)["data"]["devices"]:
devices.append(Device(device))
return devices return devices
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]: def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
if idx is None and name is None: if id is None and name is None:
logging.debug("No id or name given") logging.debug("No id or name given")
return None return None
devices = self.list_devices() devices = self.list_devices()
device = None
for device in devices: for device in devices:
if device.id == idx or device.display_name == name: if device["id"] == id or device["displayName"] == name:
break break
device = None device = None
if device is None: if device is None:
logging.debug(f"Device {idx} {name} not found") logging.debug(f"Device {id} {name} not found")
return None return None
return device current_temp = None
current_offset = None
for unit in device["units"]:
if unit["type"] == "TEMPERATURE_SENSOR":
for skill in unit["skills"]:
if skill["type"] == "SmartHomeTemperatureSensor":
current_temp = float(skill["currentInCelsius"])
current_offset = float(skill["offset"])
def set_offset(self, device: Device) -> None: return current_temp, current_offset, device["id"], device["displayName"]
if self.dry_run:
logging.warning("No updates in dry-run-mode") def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str):
return
data = { data = {
"xhr": 1, "xhr": 1,
"sid": self.sid, "sid": self.sid,
"lang": "de", "lang": "de",
"device": device.id, "device": device_id,
"page": "home_auto_hkr_edit" "page": "home_auto_hkr_edit"
} }
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
data = { data = {
"xhr": 1, "xhr":1,
"sid": self.sid, "sid": self.sid,
"lang": "de", "lang": "de",
"device": device_id,
"view": "", "view": "",
"back_to_page": "sh_dev", "back_to_page": "sh_dev",
"apply": "", "ule_device_name": device_name,
"oldpage": "/net/home_auto_hkr_edit.lua" "WindowOpenTrigger":8,
"WindowOpenTimer":10,
"tempsensor": "own",
"Roomtemp": f"{current_temp}",
"ExtTempsensorID":"tochoose",
"Offset": f"{offset}",
"apply":"",
"oldpage":"/net/home_auto_hkr_edit.lua"
} }
data.update(device.to_web_data())
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
def correct_offset(self, device_name: str, real_temp: float):
elapsed = None
if device_name in self.update_time.keys():
elapsed = datetime.now() - self.update_time[device_name]
logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Device] = self.get_device_data(name=device_name)
if device is None:
return
new_offset = device.get_offset() + real_temp - device.get_temperature()
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
device.set_offset(new_offset)
self.set_offset(device)
self.update_time[device.display_name] = datetime.now()

View File

@ -1,152 +1,66 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
import logging import logging
from asyncio import Queue, Task, Event, Lock from typing import Callable
from typing import Callable, Dict, Optional import websocket
import websockets import time
from websockets.exceptions import InvalidStatusCode
class HomeAssistantAPI: class HomeAssistantAPI:
def __init__(self, token: str, url: str) -> None: def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None:
self.token = token self.token = token
self.msg_id = 1 self.msg_id = 1
self.msg_id_lock = Lock() self.ws = None
self.ws: websockets.WebSocketClientProtocol = None self.subscriptions = {}
self.url = url self.init_callback = initialize
self.receiver: Optional[Task] = None
self.sender: Optional[Task] = None
self.sending_queue: Queue = Queue()
self.authenticated: Event = Event()
self.events: Dict[int, Queue] = {}
self.responses: Dict[int, Dict] = {}
self.response_events: Dict[int, Event] = {}
self.response_lock: Lock = Lock()
async def connect(self): def handle_message(self, ws: websocket.WebSocket, msg: str) -> None:
retries = 5 if self.ws is None:
logging.info("Connect to home assistant...") self.ws = ws
while True:
try:
self.ws = await websockets.connect(self.url)
self.sender = asyncio.create_task(self.sending())
await self.auth()
self.receiver = asyncio.create_task(self.receiving())
return True
except (InvalidStatusCode, TimeoutError):
if retries > 0:
retries -= 1
await asyncio.sleep(30)
logging.info(f"Retry home assistant connection... ({retries})")
continue
else:
logging.error("Invalid status code while connecting to Home Assistant")
await self.exit_loop()
return False
async def wait_for_close(self): message: object = json.loads(msg)
await self.ws.wait_closed()
if message["type"] == "auth_required":
async def receiving(self):
logging.debug("Start receiving")
async for message in self.ws:
msg: Dict = json.loads(message)
if msg["type"] == "event":
if msg["id"] not in self.events.keys():
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
continue
await self.events[msg["id"]].put(msg["event"])
else:
async with self.response_lock:
self.responses[msg["id"]] = msg
if msg["id"] in self.response_events.keys():
self.response_events[msg["id"]].set()
async def wait_for(self, idx):
async with self.response_lock:
if idx in self.responses.keys():
msg = self.responses[idx]
del self.responses[idx]
return msg
self.response_events[idx] = Event()
await self.response_events[idx].wait()
async with self.response_lock:
del self.response_events[idx]
if idx not in self.responses.keys():
logging.error("Response ID not found")
return None
msg = self.responses[idx]
del self.responses[idx]
return msg
async def exit_loop(self):
if self.sender is not None:
self.sender.cancel()
if self.receiver is not None:
self.receiver.cancel()
async def auth(self):
msg = json.loads(await self.ws.recv())
if msg["type"] != "auth_required":
logging.error("Authentication error: Not required")
await self.exit_loop()
response = {
"type": "auth",
"access_token": self.token
}
await self.sending_queue.put(response)
msg = json.loads(await self.ws.recv())
if msg["type"] == "auth_invalid":
logging.info("Auth failed")
await self.exit_loop()
elif msg["type"] == "auth_ok":
logging.debug("Authenticated")
self.authenticated.set()
else:
logging.error(f"Unknown answer for auth: {msg}")
await self.exit_loop()
async def sending(self):
while msg := await self.sending_queue.get():
await self.ws.send(json.dumps(msg))
async def subscribe_event(self, event_type: str):
await self.authenticated.wait()
logging.info(f"Subscribe to {event_type}")
async with self.msg_id_lock:
msg_id = self.msg_id
response = { response = {
"id": msg_id, "type": "auth",
"type": "subscribe_events", "access_token": self.token
"event_type": event_type
} }
self.events[msg_id] = Queue() logging.debug(response)
self.msg_id += 1 ws.send(json.dumps(response))
await self.sending_queue.put(response) return
return msg_id elif message["type"] == "auth_invalid":
logging.info("Auth failed")
ws.close()
return None
elif message["type"] == "auth_ok":
logging.debug("Authenticated")
self.init_callback(self)
self.init_callback = None
return
elif message["type"] == "event":
event = message["event"]
if event["event_type"] in self.subscriptions.keys():
self.subscriptions[event["event_type"]](event)
else:
print("Received", message)
def subscribe_event(self, event_type: str, callback: Callable[[object], None]):
async def get_states(self): if self.ws is None:
await self.authenticated.wait() logging.debug("Websocket not set")
async with self.msg_id_lock: return
message = {
"id": self.msg_id,
"type": "get_states"
}
self.msg_id += 1
await self.sending_queue.put(message)
response = await self.wait_for(message["id"]) if event_type in self.subscriptions.keys():
# ToDo: Error handling logging.warning(f"Already subscribed to {event_type}")
return response["result"] return
async def get_device_state(self, entity_id: str): logging.info(f"Subscribe to {event_type}")
device_states = await self.get_states() self.subscriptions[event_type] = callback
for device_state in device_states: response = {
if device_state["entity_id"] == entity_id: "id": self.msg_id,
return device_state "type": "subscribe_events",
"event_type": event_type
return None }
self.msg_id += 1
self.ws.send(json.dumps(response))

View File

@ -1,3 +1,3 @@
#!/usr/bin/with-contenv bashio #!/usr/bin/env bashio
python3 /srv/sync_ha_fb.py /data/options.json python3 /srv/sync_ha_fb.py

View File

@ -1,125 +1,78 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
import asyncio hier die verbindungen zu HA aufbauen etc
import json außerdem das vergleichen der werte und dass anstoßen der updates
import logging """
import os import os
import sys
from fritzbox import FritzBox from fritzbox import FritzBox
from homeassistant import HomeAssistantAPI from homeassistant import HomeAssistantAPI
import logging
import websocket
import json
sensor_mappings = {} mappings = {}
thermostate_mappings = {}
def handle_event(event):
entity_id = event["data"]["entity_id"]
if entity_id in mappings.keys():
new_state = event["data"]["new_state"]
logging.debug(entity_id)
logging.debug(new_state["attributes"]["temperature"])
rounded = round(float(new_state["attributes"]["temperature"])*2)/2
logging.debug(rounded)
if new_state["attributes"]["device_class"] == "temperature":
if entity_id in mappings.keys():
fb.login()
logged = False
for thermostate in mappings[entity_id]:
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
if not logged:
logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})")
logged = True
logging.info(f"Current measurement from {thermostate}: {current_temp}")
new_offset = current_offset + rounded - current_temp
if new_offset != current_offset:
old_offset = current_offset
logging.debug(f"Set offset for {thermostate} from {current_offset} to {new_offset}")
fb.set_offset(current_temp, new_offset, id, name)
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
logging.debug(f"Target: {new_offset} ; Set: {current_offset}")
if new_offset == current_offset:
logging.info(f"Adjustet offset from {old_offset} to {new_offset}")
else:
logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}")
fb.logout()
async def handle_event(idx: int): def on_error(ws, error):
global ha, fb print(error)
logging.debug(f"Wait for events for {idx}")
while event := await ha.events[idx].get(): def on_close(ws, close_status_code, close_msg):
try: pass
entity_id = event["data"]["entity_id"]
if (
entity_id in sensor_mappings.keys()
or entity_id in thermostate_mappings.keys()
):
state = await ha.get_device_state(entity_id)
new_state = event["data"]["new_state"]
logging.info(
f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}"
)
if (
entity_id in thermostate_mappings.keys()
and state["state"] != "unavailable"
):
therm_temp = new_state["attributes"]["current_temperature"]
therm_name = new_state["attributes"]["friendly_name"]
sensor = thermostate_mappings[entity_id]
sensor_state = await ha.get_device_state(sensor)
sensor_temp = round(float(sensor_state["state"]) * 2) / 2
logging.info(f"temps: {therm_temp} {sensor_temp}")
if therm_temp != sensor_temp:
logging.info(
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})"
)
fb.correct_offset(therm_name, sensor_temp)
elif entity_id in sensor_mappings.keys(): def on_open(ws):
logging.info(f"here {sensor_mappings} {entity_id}") pass
logging.info(f"{new_state}")
sensor_temp = round(float(new_state["state"]) * 2) / 2
logging.info(f"entry: {sensor_mappings[entity_id]}")
for thermostate in sensor_mappings[entity_id]:
logging.info(thermostate)
therm_state = await ha.get_device_state(thermostate)
logging.info(f"{thermostate} {therm_state}")
if therm_state["state"] == "unavailable":
continue
therm_temp = float(
therm_state["attributes"]["current_temperature"]
)
therm_name = therm_state["attributes"]["friendly_name"]
logging.info(f"Temps: {therm_temp} {sensor_temp}")
if therm_temp != sensor_temp:
logging.info(
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})"
)
fb.correct_offset(therm_name, sensor_temp)
except KeyError:
pass
def init(ha: HomeAssistantAPI):
async def init(ha: HomeAssistantAPI, fb: FritzBox):
if not await ha.connect():
return
logging.debug("Subscribe") logging.debug("Subscribe")
state_changed_id = await ha.subscribe_event("state_changed") ha.subscribe_event("state_changed", handle_event)
logging.debug(state_changed_id)
asyncio.create_task(handle_event(state_changed_id))
fb.login()
await ha.wait_for_close()
logging.info("Websocket closed, shutting down..")
async def main(): logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s")
config_path = sys.argv[1] config = json.load(open("/data/options.json"))
config = json.load(open(config_path)) logging.debug(config)
level = logging.INFO for mapping in config["mappings"]:
if "log_level" in config: if mapping["sensor"] not in mappings.keys():
print(f"Setting log_level {config['log_level']}") mappings[mapping["sensor"]] = []
if config["log_level"] == "DEBUG": mappings[mapping["sensor"]].append(mapping["thermostate"])
level = logging.DEBUG
logging.basicConfig(
level=level, format="[%(asctime)s] [%(levelname)s] %(message)s"
)
logging.debug(config)
global fb fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"])
fb = FritzBox( ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init)
url=config["fritzbox"]["url"],
user=config["fritzbox"]["username"],
password=config["fritzbox"]["password"],
update_timeout=config["update_timeout"],
dry_run=False,
)
supervisor_url = "ws://supervisor/core/websocket"
if "SUPERVISOR_URL" in os.environ:
supervisor_url = os.environ["SUPERVISOR_URL"]
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
global ha
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
for mapping in config["mappings"]: websocket.enableTrace(False)
if mapping["sensor"] not in sensor_mappings.keys(): ws = websocket.WebSocketApp("ws://supervisor/core/websocket",
sensor_mappings[mapping["sensor"]] = [] on_open=on_open,
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"]) on_message=ha.handle_message,
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"] on_error=on_error,
logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}") on_close=on_close)
try:
await init(ha, fb)
except KeyboardInterrupt:
pass
ws.run_forever()
asyncio.run(main())