Compare commits

...

20 Commits

Author SHA1 Message Date
5aa94a3c97 Fix readout new state 2024-11-12 00:01:19 +01:00
23140f672f fritz_temp_sync/sync_ha_fb.py aktualisiert 2024-11-11 23:25:03 +01:00
00b161945f fritz_temp_sync/sync_ha_fb.py aktualisiert 2024-11-11 23:21:27 +01:00
89fc6ea73c fritz_temp_sync/sync_ha_fb.py aktualisiert 2024-11-11 23:04:39 +01:00
f6f46fba8f fritz_temp_sync/sync_ha_fb.py aktualisiert 2024-11-11 22:51:47 +01:00
e01842a118 fritz_temp_sync/config.yaml aktualisiert 2024-11-11 22:49:35 +01:00
077ad19255 Use r string for regex 2024-11-11 22:36:51 +01:00
c36c75ac48 Slightly improved logging 2024-11-11 22:35:44 +01:00
431450010e fritz_temp_sync/Dockerfile aktualisiert 2024-11-09 00:54:23 +01:00
076af5add5 fritz_temp_sync/Dockerfile aktualisiert 2024-11-09 00:47:54 +01:00
ce59adccf0 Update for FritzOS 7.50 2022-12-03 21:51:26 +01:00
52c0e90a52 Add delay to retry 2022-02-21 22:52:06 +01:00
f6600c7dda Add username to config #3
retry connection to HA
2022-02-21 22:31:24 +01:00
43c2f0284a Fix config 2022-02-05 23:58:21 +01:00
084bc5ba1c Fix link to HA 2022-02-05 15:45:53 +01:00
a1d7dfa9bd Fix communication with fb/device as class 2022-02-05 15:43:52 +01:00
1a561a45d7 Update offset in fb 2022-01-29 00:21:54 +01:00
8aeb8cb1a6 Update requirements 2022-01-28 22:55:29 +01:00
5a14d251b4 Fix 2022-01-28 22:54:12 +01:00
9bf21d04e3 Change to asyncio, keep connection to fb alive, use data from ha for comparing 2022-01-28 22:37:28 +01:00
8 changed files with 1096 additions and 190 deletions

5
.gitignore vendored
View File

@ -15,6 +15,7 @@ dist/
downloads/
eggs/
.eggs/
.venv/
lib/
lib64/
parts/
@ -168,6 +169,7 @@ pip-selfcheck.json
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
@ -239,3 +241,6 @@ fabric.properties
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
# local config test
options.json
.DS_Store

View File

@ -2,8 +2,7 @@ ARG BUILD_FROM
FROM $BUILD_FROM
# Install requirements for add-on
RUN apk update && apk add --no-cache python3 py-pip
RUN python3 -m pip install websocket-client requests
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests
WORKDIR /data
@ -11,6 +10,7 @@ WORKDIR /data
COPY sync_ha_fb.py /srv
COPY fritzbox.py /srv
COPY homeassistant.py /srv
COPY device.py /srv
COPY run.sh /
RUN chmod a+x /run.sh

View File

@ -1,8 +1,11 @@
name: "Fritz!Box Temperature Sync"
description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant"
version: "0.1.0"
slug: "fritz_temp_sync"
name: "Fritz!Box Temperature Sync Dev"
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
version: "0.4.3"
startup: "application"
stage: "stable"
slug: "fritz_temp_sync_dev"
homeassistant_api: true
init: false
arch:
- aarch64
- amd64
@ -16,10 +19,14 @@ options:
mappings:
- sensor: null
thermostate: null
update_timeout: 15
schema:
fritzbox:
url: url
username: "str?"
password: str
mappings:
- sensor: str
thermostate: str
thermostate: str
update_timeout: int
log_level: "str?"

714
fritz_temp_sync/device.py Executable file
View File

@ -0,0 +1,714 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum, IntFlag, auto
from typing import Dict, List, Optional, Union
import typing
class WeekDay(IntFlag):
MON = 0b1
TUE = 0b10
WED = 0b100
THU = 0b1000
FRI = 0b10000
SAT = 0b100000
SUN = 0b1000000
class Manufacturer:
def __init__(self, name: str):
self.name: str = name
def __repr__(self):
return f"name: {self.name}"
def to_json(self):
return {"name": self.name}
@staticmethod
def parse_dict(manufacturer: Dict):
return Manufacturer(manufacturer["name"])
class FirmwareVersion:
def __init__(self, search: bool, current: str, update: bool, running: bool):
self.search: bool = search
self.current: str = current
self.update: bool = update
self.running: bool = running
def __repr__(self):
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
def to_json(self):
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
@staticmethod
def parse_dict(firmware_version: dict):
return FirmwareVersion(firmware_version["search"],
firmware_version["current"],
firmware_version["update"],
firmware_version["running"])
class PushService:
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
self.mail_address: str = mail_address
self.unit_settings: List = unit_settings
self.is_enabled: bool = is_enabled
def __repr__(self):
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
f"isEnabled: {self.is_enabled}>"
def to_json(self):
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
@staticmethod
def parse_dict(push_service: Dict):
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
class SkillType(Enum):
SmartHomeTemperatureSensor = auto()
SmartHomeThermostat = auto()
SmartHomeBattery = auto()
class Skill(ABC):
def __init__(self, skill_type: SkillType):
self.type: SkillType = skill_type
@abstractmethod
def to_json(self):
pass
@abstractmethod
def to_web_data(self, device_id: int):
pass
@staticmethod
def parse_dict(skill: Dict):
skill_type = SkillType[skill["type"]]
if skill_type == SkillType.SmartHomeTemperatureSensor:
return TemperatureSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeThermostat:
return ThermostatSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeBattery:
return BatterySkill.parse_dict(skill)
else:
raise NotImplementedError(skill_type)
class PresetName(Enum):
LOWER_TEMPERATURE = auto()
UPPER_TEMPERATURE = auto()
HOLIDAY_TEMPERATURE = auto()
class Preset:
def __init__(self, name: PresetName, temperature: Optional[int]):
self.name: PresetName = name
self.temperature: Optional[int] = temperature
def __repr__(self):
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
def to_json(self):
return {"name": self.name.name, "temperature": self.temperature}
class Description:
def __init__(self, action: str, preset_temperature: Optional[Preset]):
self.action: str = action
self.preset_temperature: Optional[Preset] = preset_temperature
def __repr__(self):
desc = f"<Description action: {self.action}"
if self.preset_temperature is not None:
desc += f"; presetTemperaturet: {self.preset_temperature}"
desc += " >"
return desc
def to_json(self):
if self.preset_temperature is not None:
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
return {"action": self.action}
@staticmethod
def parse_dict(description: Dict):
preset: Optional[Preset] = None
if "presetTemperature" in description.keys():
preset = Preset(PresetName[description["presetTemperature"]["name"]],
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
return Description(description["action"], preset)
class Repetition(Enum):
NONE = auto()
YEARLY = auto()
class TimeSetting:
def __init__(self, start_date: Optional[str], start_time: Optional[str],
end_date: Optional[str] = None, end_time: Optional[str] = None,
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
self.start_date: Optional[str] = start_date
self.start_time: Optional[str] = start_time
self.end_date: Optional[str] = end_date
self.end_time: Optional[str] = end_time
self.repetition: Repetition = repetition
self.day_of_week: Optional[WeekDay] = day_of_week
def __repr__(self):
desc = f"<TimeSettings "
if self.day_of_week is not None:
desc += f"dayOfWeek: {self.day_of_week.name}; "
if self.start_date is not None:
desc += f"startDate: {self.start_date} "
if self.start_time is not None:
desc += f"startTime: {self.start_time} "
if self.end_date is not None:
desc += f"endDate: {self.end_date} "
if self.end_time is not None:
desc += f"endTime: {self.end_time} "
if self.repetition != Repetition.NONE:
desc += f"repetition: {self.repetition.name} "
desc += ">"
return desc
def to_json(self): # ToDo: Return typehints for all to_json
state = {}
if self.start_date is not None:
state["startDate"] = self.start_date
if self.start_time is not None:
state["startTime"] = self.start_time
if self.end_date is not None:
state["endDate"] = self.end_date
if self.end_time is not None:
state["endTime"] = self.end_time
if self.repetition != Repetition.NONE:
state["repetition"] = self.repetition.name
if self.day_of_week is not None:
state = {"dayOfWeek": self.day_of_week.name,
"time": state}
return state
@staticmethod
def parse_dict(time_setting: Dict):
start_date: Optional[str] = None
start_time: Optional[str] = None
end_date: Optional[str] = None
end_time: Optional[str] = None
day_of_week: Optional[WeekDay] = None
repetition: Repetition = Repetition.NONE
if "dayOfWeek" in time_setting.keys():
day_of_week = WeekDay[time_setting["dayOfWeek"]]
time_setting = time_setting["time"]
if "startDate" in time_setting.keys():
start_date = time_setting["startDate"]
if "startTime" in time_setting.keys():
start_time = time_setting["startTime"]
if "endDate" in time_setting.keys():
end_date = time_setting["endDate"]
if "endTime" in time_setting.keys():
end_time = time_setting["endTime"]
if "repetition" in time_setting.keys():
repetition = Repetition[time_setting["repetition"]]
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
class Change:
def __init__(self, description: Description, time_setting: TimeSetting):
self.description: Description = description
self.time_setting: TimeSetting = time_setting
def __repr__(self):
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
def to_json(self):
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
@staticmethod
def parse_dict(change: Dict):
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
class TemperatureDropDetection:
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
self.sensitivity: int = sensitivity
def __repr__(self):
# ToDo
pass
def to_json(self):
return {
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
"sensitivity": self.sensitivity
}
def to_web_data(self, device_id: int):
return {
"WindowOpenTrigger": self.sensitivity + 3,
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
}
@staticmethod
def parse_dict(drop_detection: Dict):
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
drop_detection["sensitivity"])
class ScheduleKind(Enum):
REPETITIVE = auto()
WEEKLY_TIMETABLE = auto()
class ThermostatSkillMode(Enum):
TARGET_TEMPERATURE = auto()
class Action:
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
self.is_enabled: bool = is_enabled
self.time_setting: TimeSetting = time_setting
self.description: Description = description
def __repr__(self):
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
f"description: {self.description} >"
def to_json(self):
return {"isEnabled": self.is_enabled,
"timeSetting": self.time_setting.to_json(),
"description": self.description.to_json()}
@staticmethod
def parse_dict(action: Dict):
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
Description.parse_dict(action["description"]))
class Schedule:
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
self.is_enabled: bool = is_enabled
self.kind: ScheduleKind = kind
self.name: str = name
self.actions: List[Action] = actions
def __repr__(self):
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
f"name: {self.name}; actions: {self.actions}>"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"kind": self.kind.name,
"name": self.name,
"actions": [action.to_json() for action in self.actions]
}
def to_web_data(self, device_id: int):
data = {}
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
enabled_count = 0
for num, holiday in enumerate(self.actions):
num += 1
_, start_month, start_day = holiday.time_setting.start_date.split("-")
_, end_month, end_day = holiday.time_setting.end_date.split("-")
data.update({
f"Holiday{num}StartDay": start_day,
f"Holiday{num}StartMonth": start_month,
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
f"Holiday{num}EndDay": end_day,
f"Holiday{num}EndMonth": end_month,
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
f"Holiday{num}ID": num
})
if holiday.is_enabled:
enabled_count += 1
data["HolidayEnabledCount"] = enabled_count
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
data = {
"SummerStartDay": start_day,
"SummerStartMonth": start_month,
"SummerEndDay": end_day,
"SummerEndMonth": end_month,
"SummerEnabled": 1 if self.is_enabled else 0
}
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
timer_items = {}
for action in self.actions:
if not action.is_enabled:
continue
time = "".join(action.time_setting.start_time.split(":")[:-1])
if time not in timer_items.keys():
timer_items[time] = [0, 0]
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
days = timer_items[time][heat]
days |= action.time_setting.day_of_week
timer_items[time][heat] = days
num = 0
for key in timer_items.keys():
if timer_items[key][0] != 0:
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
num += 1
if timer_items[key][1] != 0:
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
num += 1
else:
raise NotImplementedError(self.name)
return data
@staticmethod
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
return Schedule(schedule["isEnabled"],
ScheduleKind[schedule["kind"]],
schedule["name"],
[Action.parse_dict(action) for action in schedule["actions"]])
class TimeControl:
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
self.is_enabled: bool = is_enabled
self.time_schedules: List[Schedule] = time_schedules
def __repr__(self):
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
}
def to_web_data(self, device_id: int):
data = {}
for schedule in self.time_schedules:
data.update(schedule.to_web_data(device_id))
return data
@staticmethod
def parse_dict(time_control: Dict):
return TimeControl(time_control["isEnabled"],
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
class ThermostatSkill(Skill):
def __init__(self, presets: List[Preset], next_change: Optional[Change],
temperature_drop_detection: TemperatureDropDetection,
target_temp: int, time_control: TimeControl,
mode: ThermostatSkillMode, used_temp_sensor: Unit):
super().__init__(SkillType.SmartHomeThermostat)
self.presets: List[Preset] = presets
self.next_change: Optional[Change] = next_change
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
self.target_temp: int = target_temp
self.time_control: TimeControl = time_control
self.mode: ThermostatSkillMode = mode
self.used_temp_sensor: Unit = used_temp_sensor
def __repr__(self):
# ToDo
pass
def to_web_data(self, device_id: int):
upper = 0.0
lower = 0.0
for preset in self.presets:
if preset.name == PresetName.UPPER_TEMPERATURE:
upper = preset.temperature
elif preset.name == PresetName.LOWER_TEMPERATURE:
lower = preset.temperature
data = {
"Heiztemp": upper,
"Absenktemp": lower
}
if device_id == self.used_temp_sensor.id:
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
else:
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
data.update(self.time_control.to_web_data(device_id))
data.update(self.temperature_drop_detection.to_web_data(device_id))
return data
def to_json(self):
state = {
"type": self.type.name,
"presets": [preset.to_json() for preset in self.presets],
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
"targetTemp": self.target_temp,
"timeControl": self.time_control.to_json(),
"mode": self.mode.name,
"usedTempSensor": self.used_temp_sensor.to_json()
}
if self.next_change is not None:
state["nextChange"] = self.next_change.to_json()
return state
@staticmethod
def parse_dict(skill: Dict):
return ThermostatSkill(
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
skill["targetTemp"],
TimeControl.parse_dict(skill["timeControl"]),
ThermostatSkillMode[skill["mode"]],
Unit.parse_dict(skill["usedTempSensor"], None)
)
class TemperatureSkill(Skill):
def __init__(self, offset: float, current_in_celsius: int):
super().__init__(SkillType.SmartHomeTemperatureSensor)
self.offset: float = offset
self.current_in_celsius: int = current_in_celsius
def __repr__(self):
return f"{self.type.name}: " \
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
def to_json(self):
return {
"type": self.type.name,
"offset": self.offset,
"currentInCelsius": self.current_in_celsius
}
def to_web_data(self, device_id: int):
data = {
"Roomtemp": self.current_in_celsius,
"Offset": self.offset
}
return data
@staticmethod
def parse_dict(skill: Dict):
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
class BatterySkill(Skill):
def __init__(self, charge_level_in_percent: int):
super().__init__(SkillType.SmartHomeBattery)
self.charge_level_in_percent: int = charge_level_in_percent
def __repr__(self):
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
def to_json(self):
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
def to_web_data(self, device_id: int):
return {}
@staticmethod
def parse_dict(skill: Dict):
return BatterySkill(skill["chargeLevelInPercent"])
class UnitTypes(Enum):
BATTERY = auto()
TEMPERATURE_SENSOR = auto()
THERMOSTAT = auto()
class Unit:
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
device: Optional[Device], skills: List[Union[Dict, Skill]],
interaction_controls: Optional[List] = None):
self.type: UnitTypes = unit_type
self.id: int = idx
self.display_name: str = display_name
self.device: Optional[Device] = device
self.skills: List[Skill] = skills
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
def __repr__(self):
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
if self.device is not None:
desc += f"device: {self.device.short_description()}; "
desc += f"skills: {self.skills}; "
if self.interaction_controls is not None:
desc += f"interactionControls: {self.interaction_controls} ;"
return desc
def to_json(self):
state = {
"type": self.type.name,
"id": self.id,
"displayName": self.display_name,
"skills": [skill.to_json() for skill in self.skills]
}
if self.device is not None:
state["device"] = self.device.to_short_json()
if self.interaction_controls is not None:
state["interactionControls"] = self.interaction_controls
return state
def to_web_data(self, device_id: int):
data = {}
for skill in self.skills:
data.update(skill.to_web_data(device_id))
return data
@staticmethod
def parse_dict(unit: Dict, device: Optional[Device]):
return Unit(UnitTypes[unit["type"]],
unit["id"],
unit["displayName"],
device,
[Skill.parse_dict(skill) for skill in unit["skills"]],
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
class DeviceType(Enum):
SmartHomeDevice = auto()
class DeviceCategory(Enum):
THERMOSTAT = auto()
class Device:
def __init__(self, state):
self.type: Optional[DeviceType] = None
self.is_deletable: bool = False
self.id: int = -1
self.master_connection_state = None # ToDo
self.display_name: Optional[str] = None
self.category: Optional[str] = None
self.units = None # ToDo
self.firmware_version: Optional[FirmwareVersion] = None
self.model: Optional[str] = None
self.is_editable: bool = False
self.manufacturer: Optional[Manufacturer] = None
self.push_service: Optional[PushService] = None
self.actor_identification_number: Optional[str] = None
if state is not None:
self.parse_state(state)
def __repr__(self):
# ToDo
pass
def short_description(self):
desc = f"masterConnectionState: {self.master_connection_state}; " \
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
f"manufacturer: {self.manufacturer}; " \
f"actorIdentificationNumber: {self.actor_identification_number}; " \
f"displayName: {self.display_name}"
return desc
def parse_state(self, state):
self.type = DeviceType[state["type"]]
self.is_deletable = state["isDeletable"]
self.id = state["id"]
self.master_connection_state = state["masterConnectionState"]
self.display_name = state["displayName"]
self.category = state["category"]
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
self.model = state["model"]
self.is_editable = state["isEditable"]
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
self.push_service = PushService.parse_dict(state["pushService"])
self.actor_identification_number = state["actorIdentificationNumber"]
def to_web_data(self):
data = {
"device": self.id,
"ule_device_name": self.display_name
}
for unit in self.units:
data.update(unit.to_web_data(self.id))
return data
def to_json(self):
state = {"type": self.type.name,
"isDeletable": self.is_deletable,
"id": self.id,
"masterConnectionState": self.master_connection_state,
"displayName": self.display_name,
"category": self.category,
"units": [unit.to_json() for unit in self.units],
"firmwareVersion": self.firmware_version.to_json(),
"model": self.model,
"isEditable": self.is_editable,
"manufacturer": self.manufacturer.to_json(),
"pushService": self.push_service.to_json(),
"actorIdentificationNumber": self.actor_identification_number}
return state
def set_offset(self, offset: float):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
temp_skill.offset = offset
def get_offset(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.offset
def get_temperature(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.current_in_celsius
def to_short_json(self):
return {
"masterConnectionState": self.master_connection_state,
"type": self.type.name,
"model": self.model,
"id": self.id,
"manufacturer": self.manufacturer.to_json(),
"actorIdentificationNumber": self.actor_identification_number,
"displayName": self.display_name
}

View File

@ -1,28 +1,49 @@
from typing import Optional, Tuple
import requests
import json
import re
from __future__ import annotations
import asyncio
import hashlib
import xml.etree.ElementTree as ET
import json
import logging
import re
import xml.etree.ElementTree as ET
from asyncio import Task
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import requests
from device import Device
class FritzBox:
def __init__(self, url:str, password:str, user:str = None) -> None:
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None:
self._endpoints = {
"login": "login_sid.lua?version=2",
"logout": "index.lua",
"data": "data.lua"
}
self.url = url
self.session = requests.Session()
self.password = password
self.sid = None
self.url: str = url
self.dry_run: bool = dry_run
self.user: Optional[str] = user
self.session: requests.Session = requests.Session()
self.password: str = password
self.sid: Optional[str] = None
self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
# according to the manual
await asyncio.sleep(19 * 60)
self.check_session()
def _calc_challenge_v2(self, challenge: str) -> str:
logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_regex = re.compile(
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
iter1: int = int(chall_parts["iter1"])
@ -42,8 +63,24 @@ class FritzBox:
response = challenge + "-" + hashlib.md5(response).hexdigest()
return response
def login(self, user:str = None) -> bool:
logging.debug(f"login user {user}")
def check_session(self) -> None:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
if len(r.history) > 0:
if not self.login():
logging.error("Failed to login to Fritz!Box")
else:
logging.debug("Already logged in")
def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box")
challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
xml = ET.fromstring(r.text)
@ -52,12 +89,12 @@ class FritzBox:
self.sid = elem.text
elif elem.tag == "Challenge":
challenge = elem.text
elif user is None and elem.tag == "Users":
elif self.user is None and elem.tag == "Users":
for user_elem in elem:
if "fritz" in user_elem.text:
user = user_elem.text
self.user = user_elem.text
assert challenge is not None and user is not None
assert challenge is not None and self.user is not None
if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge)
@ -65,7 +102,7 @@ class FritzBox:
response = self._calc_challenge_v1(challenge)
data = {
"username": user,
"username": self.user,
"response": response
}
@ -75,27 +112,31 @@ class FritzBox:
for elem in xml:
if elem.tag == "SID":
self.sid = elem.text
logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}")
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
if len(self.sid) != self.sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
return len(self.sid) != self.sid.count("0")
def logout(self) -> bool:
logging.debug("logout")
logging.info("logout")
data = {
"xhr":1,
"xhr": 1,
"sid": self.sid,
"logout": 1,
"no_sidrenew":""}
"no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
if self.hold_connection is not None:
self.hold_connection.cancel()
return r.status_code == 200
def list_devices(self):
def list_devices(self) -> Optional[List[Device]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page":"sh_dev",
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh_dev",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
@ -105,62 +146,68 @@ class FritzBox:
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
else:
return None
devices = json.loads(r.text)["data"]["devices"]
devices: List[Device] = []
for device in json.loads(r.text)["data"]["devices"]:
devices.append(Device(device))
return devices
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
if id is None and name is None:
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
if idx is None and name is None:
logging.debug("No id or name given")
return None
devices = self.list_devices()
device = None
for device in devices:
if device["id"] == id or device["displayName"] == name:
if device.id == idx or device.display_name == name:
break
device = None
if device is None:
logging.debug(f"Device {id} {name} not found")
logging.debug(f"Device {idx} {name} not found")
return None
current_temp = None
current_offset = None
for unit in device["units"]:
if unit["type"] == "TEMPERATURE_SENSOR":
for skill in unit["skills"]:
if skill["type"] == "SmartHomeTemperatureSensor":
current_temp = float(skill["currentInCelsius"])
current_offset = float(skill["offset"])
return device
return current_temp, current_offset, device["id"], device["displayName"]
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str):
def set_offset(self, device: Device) -> None:
if self.dry_run:
logging.warning("No updates in dry-run-mode")
return
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"page": "home_auto_hkr_edit"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
data = {
"xhr":1,
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device.id,
"page": "home_auto_hkr_edit"
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"device": device_id,
"view": "",
"back_to_page": "sh_dev",
"ule_device_name": device_name,
"WindowOpenTrigger":8,
"WindowOpenTimer":10,
"tempsensor": "own",
"Roomtemp": f"{current_temp}",
"ExtTempsensorID":"tochoose",
"Offset": f"{offset}",
"apply":"",
"oldpage":"/net/home_auto_hkr_edit.lua"
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua"
}
data.update(device.to_web_data())
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
def correct_offset(self, device_name: str, real_temp: float):
elapsed = None
if device_name in self.update_time.keys():
elapsed = datetime.now() - self.update_time[device_name]
logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Device] = self.get_device_data(name=device_name)
if device is None:
return
new_offset = device.get_offset() + real_temp - device.get_temperature()
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
device.set_offset(new_offset)
self.set_offset(device)
self.update_time[device.display_name] = datetime.now()

View File

@ -1,66 +1,152 @@
from __future__ import annotations
import asyncio
import json
import logging
from typing import Callable
import websocket
import time
from asyncio import Queue, Task, Event, Lock
from typing import Callable, Dict, Optional
import websockets
from websockets.exceptions import InvalidStatusCode
class HomeAssistantAPI:
def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None:
def __init__(self, token: str, url: str) -> None:
self.token = token
self.msg_id = 1
self.ws = None
self.subscriptions = {}
self.init_callback = initialize
self.msg_id_lock = Lock()
self.ws: websockets.WebSocketClientProtocol = None
self.url = url
self.receiver: Optional[Task] = None
self.sender: Optional[Task] = None
self.sending_queue: Queue = Queue()
self.authenticated: Event = Event()
self.events: Dict[int, Queue] = {}
self.responses: Dict[int, Dict] = {}
self.response_events: Dict[int, Event] = {}
self.response_lock: Lock = Lock()
def handle_message(self, ws: websocket.WebSocket, msg: str) -> None:
if self.ws is None:
self.ws = ws
async def connect(self):
retries = 5
logging.info("Connect to home assistant...")
while True:
try:
self.ws = await websockets.connect(self.url)
self.sender = asyncio.create_task(self.sending())
await self.auth()
self.receiver = asyncio.create_task(self.receiving())
return True
except (InvalidStatusCode, TimeoutError):
if retries > 0:
retries -= 1
await asyncio.sleep(30)
logging.info(f"Retry home assistant connection... ({retries})")
continue
else:
logging.error("Invalid status code while connecting to Home Assistant")
await self.exit_loop()
return False
message: object = json.loads(msg)
if message["type"] == "auth_required":
response = {
"type": "auth",
"access_token": self.token
}
logging.debug(response)
ws.send(json.dumps(response))
return
elif message["type"] == "auth_invalid":
logging.info("Auth failed")
ws.close()
return None
elif message["type"] == "auth_ok":
logging.debug("Authenticated")
self.init_callback(self)
self.init_callback = None
return
elif message["type"] == "event":
event = message["event"]
if event["event_type"] in self.subscriptions.keys():
self.subscriptions[event["event_type"]](event)
else:
print("Received", message)
def subscribe_event(self, event_type: str, callback: Callable[[object], None]):
async def wait_for_close(self):
await self.ws.wait_closed()
if self.ws is None:
logging.debug("Websocket not set")
return
async def receiving(self):
logging.debug("Start receiving")
async for message in self.ws:
msg: Dict = json.loads(message)
if msg["type"] == "event":
if msg["id"] not in self.events.keys():
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
continue
await self.events[msg["id"]].put(msg["event"])
else:
async with self.response_lock:
self.responses[msg["id"]] = msg
if msg["id"] in self.response_events.keys():
self.response_events[msg["id"]].set()
if event_type in self.subscriptions.keys():
logging.warning(f"Already subscribed to {event_type}")
return
logging.info(f"Subscribe to {event_type}")
self.subscriptions[event_type] = callback
async def wait_for(self, idx):
async with self.response_lock:
if idx in self.responses.keys():
msg = self.responses[idx]
del self.responses[idx]
return msg
self.response_events[idx] = Event()
await self.response_events[idx].wait()
async with self.response_lock:
del self.response_events[idx]
if idx not in self.responses.keys():
logging.error("Response ID not found")
return None
msg = self.responses[idx]
del self.responses[idx]
return msg
async def exit_loop(self):
if self.sender is not None:
self.sender.cancel()
if self.receiver is not None:
self.receiver.cancel()
async def auth(self):
msg = json.loads(await self.ws.recv())
if msg["type"] != "auth_required":
logging.error("Authentication error: Not required")
await self.exit_loop()
response = {
"id": self.msg_id,
"type": "subscribe_events",
"event_type": event_type
"type": "auth",
"access_token": self.token
}
self.msg_id += 1
self.ws.send(json.dumps(response))
await self.sending_queue.put(response)
msg = json.loads(await self.ws.recv())
if msg["type"] == "auth_invalid":
logging.info("Auth failed")
await self.exit_loop()
elif msg["type"] == "auth_ok":
logging.debug("Authenticated")
self.authenticated.set()
else:
logging.error(f"Unknown answer for auth: {msg}")
await self.exit_loop()
async def sending(self):
while msg := await self.sending_queue.get():
await self.ws.send(json.dumps(msg))
async def subscribe_event(self, event_type: str):
await self.authenticated.wait()
logging.info(f"Subscribe to {event_type}")
async with self.msg_id_lock:
msg_id = self.msg_id
response = {
"id": msg_id,
"type": "subscribe_events",
"event_type": event_type
}
self.events[msg_id] = Queue()
self.msg_id += 1
await self.sending_queue.put(response)
return msg_id
async def get_states(self):
await self.authenticated.wait()
async with self.msg_id_lock:
message = {
"id": self.msg_id,
"type": "get_states"
}
self.msg_id += 1
await self.sending_queue.put(message)
response = await self.wait_for(message["id"])
# ToDo: Error handling
return response["result"]
async def get_device_state(self, entity_id: str):
device_states = await self.get_states()
for device_state in device_states:
if device_state["entity_id"] == entity_id:
return device_state
return None

View File

@ -1,3 +1,3 @@
#!/usr/bin/env bashio
#!/usr/bin/with-contenv bashio
python3 /srv/sync_ha_fb.py
python3 /srv/sync_ha_fb.py /data/options.json

View File

@ -1,78 +1,125 @@
#!/usr/bin/env python3
"""
hier die verbindungen zu HA aufbauen etc
außerdem das vergleichen der werte und dass anstoßen der updates
"""
import asyncio
import json
import logging
import os
import sys
from fritzbox import FritzBox
from homeassistant import HomeAssistantAPI
import logging
import websocket
import json
mappings = {}
sensor_mappings = {}
thermostate_mappings = {}
def handle_event(event):
entity_id = event["data"]["entity_id"]
if entity_id in mappings.keys():
new_state = event["data"]["new_state"]
logging.debug(entity_id)
logging.debug(new_state["attributes"]["temperature"])
rounded = round(float(new_state["attributes"]["temperature"])*2)/2
logging.debug(rounded)
if new_state["attributes"]["device_class"] == "temperature":
if entity_id in mappings.keys():
fb.login()
logged = False
for thermostate in mappings[entity_id]:
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
if not logged:
logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})")
logged = True
logging.info(f"Current measurement from {thermostate}: {current_temp}")
new_offset = current_offset + rounded - current_temp
if new_offset != current_offset:
old_offset = current_offset
logging.debug(f"Set offset for {thermostate} from {current_offset} to {new_offset}")
fb.set_offset(current_temp, new_offset, id, name)
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
logging.debug(f"Target: {new_offset} ; Set: {current_offset}")
if new_offset == current_offset:
logging.info(f"Adjustet offset from {old_offset} to {new_offset}")
else:
logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}")
fb.logout()
def on_error(ws, error):
print(error)
async def handle_event(idx: int):
global ha, fb
logging.debug(f"Wait for events for {idx}")
def on_close(ws, close_status_code, close_msg):
pass
while event := await ha.events[idx].get():
try:
entity_id = event["data"]["entity_id"]
if (
entity_id in sensor_mappings.keys()
or entity_id in thermostate_mappings.keys()
):
state = await ha.get_device_state(entity_id)
new_state = event["data"]["new_state"]
logging.info(
f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}"
)
if (
entity_id in thermostate_mappings.keys()
and state["state"] != "unavailable"
):
therm_temp = new_state["attributes"]["current_temperature"]
therm_name = new_state["attributes"]["friendly_name"]
sensor = thermostate_mappings[entity_id]
sensor_state = await ha.get_device_state(sensor)
sensor_temp = round(float(sensor_state["state"]) * 2) / 2
logging.info(f"temps: {therm_temp} {sensor_temp}")
if therm_temp != sensor_temp:
logging.info(
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})"
)
fb.correct_offset(therm_name, sensor_temp)
def on_open(ws):
pass
elif entity_id in sensor_mappings.keys():
logging.info(f"here {sensor_mappings} {entity_id}")
logging.info(f"{new_state}")
sensor_temp = round(float(new_state["state"]) * 2) / 2
logging.info(f"entry: {sensor_mappings[entity_id]}")
for thermostate in sensor_mappings[entity_id]:
logging.info(thermostate)
therm_state = await ha.get_device_state(thermostate)
logging.info(f"{thermostate} {therm_state}")
if therm_state["state"] == "unavailable":
continue
therm_temp = float(
therm_state["attributes"]["current_temperature"]
)
therm_name = therm_state["attributes"]["friendly_name"]
logging.info(f"Temps: {therm_temp} {sensor_temp}")
if therm_temp != sensor_temp:
logging.info(
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})"
)
fb.correct_offset(therm_name, sensor_temp)
except KeyError:
pass
def init(ha: HomeAssistantAPI):
async def init(ha: HomeAssistantAPI, fb: FritzBox):
if not await ha.connect():
return
logging.debug("Subscribe")
ha.subscribe_event("state_changed", handle_event)
state_changed_id = await ha.subscribe_event("state_changed")
logging.debug(state_changed_id)
asyncio.create_task(handle_event(state_changed_id))
fb.login()
await ha.wait_for_close()
logging.info("Websocket closed, shutting down..")
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s")
config = json.load(open("/data/options.json"))
logging.debug(config)
for mapping in config["mappings"]:
if mapping["sensor"] not in mappings.keys():
mappings[mapping["sensor"]] = []
mappings[mapping["sensor"]].append(mapping["thermostate"])
async def main():
config_path = sys.argv[1]
config = json.load(open(config_path))
level = logging.INFO
if "log_level" in config:
print(f"Setting log_level {config['log_level']}")
if config["log_level"] == "DEBUG":
level = logging.DEBUG
logging.basicConfig(
level=level, format="[%(asctime)s] [%(levelname)s] %(message)s"
)
logging.debug(config)
fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"])
ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init)
global fb
fb = FritzBox(
url=config["fritzbox"]["url"],
user=config["fritzbox"]["username"],
password=config["fritzbox"]["password"],
update_timeout=config["update_timeout"],
dry_run=False,
)
supervisor_url = "ws://supervisor/core/websocket"
if "SUPERVISOR_URL" in os.environ:
supervisor_url = os.environ["SUPERVISOR_URL"]
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
global ha
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
websocket.enableTrace(False)
ws = websocket.WebSocketApp("ws://supervisor/core/websocket",
on_open=on_open,
on_message=ha.handle_message,
on_error=on_error,
on_close=on_close)
for mapping in config["mappings"]:
if mapping["sensor"] not in sensor_mappings.keys():
sensor_mappings[mapping["sensor"]] = []
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}")
try:
await init(ha, fb)
except KeyboardInterrupt:
pass
ws.run_forever()
asyncio.run(main())