Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
5aa94a3c97 | |||
23140f672f | |||
00b161945f | |||
89fc6ea73c | |||
f6f46fba8f | |||
e01842a118 | |||
077ad19255 | |||
c36c75ac48 | |||
431450010e | |||
076af5add5 | |||
ce59adccf0 | |||
52c0e90a52 | |||
f6600c7dda | |||
43c2f0284a | |||
084bc5ba1c | |||
a1d7dfa9bd | |||
1a561a45d7 | |||
8aeb8cb1a6 | |||
5a14d251b4 | |||
9bf21d04e3 |
5
.gitignore
vendored
5
.gitignore
vendored
@ -15,6 +15,7 @@ dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
.venv/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
@ -168,6 +169,7 @@ pip-selfcheck.json
|
||||
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
|
||||
|
||||
# User-specific stuff
|
||||
.idea/
|
||||
.idea/**/workspace.xml
|
||||
.idea/**/tasks.xml
|
||||
.idea/**/usage.statistics.xml
|
||||
@ -239,3 +241,6 @@ fabric.properties
|
||||
# Android studio 3.1+ serialized cache file
|
||||
.idea/caches/build_file_checksums.ser
|
||||
|
||||
# local config test
|
||||
options.json
|
||||
.DS_Store
|
||||
|
@ -2,8 +2,7 @@ ARG BUILD_FROM
|
||||
FROM $BUILD_FROM
|
||||
|
||||
# Install requirements for add-on
|
||||
RUN apk update && apk add --no-cache python3 py-pip
|
||||
RUN python3 -m pip install websocket-client requests
|
||||
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests
|
||||
|
||||
WORKDIR /data
|
||||
|
||||
@ -11,6 +10,7 @@ WORKDIR /data
|
||||
COPY sync_ha_fb.py /srv
|
||||
COPY fritzbox.py /srv
|
||||
COPY homeassistant.py /srv
|
||||
COPY device.py /srv
|
||||
COPY run.sh /
|
||||
RUN chmod a+x /run.sh
|
||||
|
||||
|
@ -1,8 +1,11 @@
|
||||
name: "Fritz!Box Temperature Sync"
|
||||
description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant"
|
||||
version: "0.1.0"
|
||||
slug: "fritz_temp_sync"
|
||||
name: "Fritz!Box Temperature Sync Dev"
|
||||
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
|
||||
version: "0.4.3"
|
||||
startup: "application"
|
||||
stage: "stable"
|
||||
slug: "fritz_temp_sync_dev"
|
||||
homeassistant_api: true
|
||||
init: false
|
||||
arch:
|
||||
- aarch64
|
||||
- amd64
|
||||
@ -16,10 +19,14 @@ options:
|
||||
mappings:
|
||||
- sensor: null
|
||||
thermostate: null
|
||||
update_timeout: 15
|
||||
schema:
|
||||
fritzbox:
|
||||
url: url
|
||||
username: "str?"
|
||||
password: str
|
||||
mappings:
|
||||
- sensor: str
|
||||
thermostate: str
|
||||
thermostate: str
|
||||
update_timeout: int
|
||||
log_level: "str?"
|
||||
|
714
fritz_temp_sync/device.py
Executable file
714
fritz_temp_sync/device.py
Executable file
@ -0,0 +1,714 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum, IntFlag, auto
|
||||
from typing import Dict, List, Optional, Union
|
||||
import typing
|
||||
|
||||
|
||||
class WeekDay(IntFlag):
|
||||
MON = 0b1
|
||||
TUE = 0b10
|
||||
WED = 0b100
|
||||
THU = 0b1000
|
||||
FRI = 0b10000
|
||||
SAT = 0b100000
|
||||
SUN = 0b1000000
|
||||
|
||||
|
||||
class Manufacturer:
|
||||
def __init__(self, name: str):
|
||||
self.name: str = name
|
||||
|
||||
def __repr__(self):
|
||||
return f"name: {self.name}"
|
||||
|
||||
def to_json(self):
|
||||
return {"name": self.name}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(manufacturer: Dict):
|
||||
return Manufacturer(manufacturer["name"])
|
||||
|
||||
|
||||
class FirmwareVersion:
|
||||
def __init__(self, search: bool, current: str, update: bool, running: bool):
|
||||
self.search: bool = search
|
||||
self.current: str = current
|
||||
self.update: bool = update
|
||||
self.running: bool = running
|
||||
|
||||
def __repr__(self):
|
||||
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
|
||||
|
||||
def to_json(self):
|
||||
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(firmware_version: dict):
|
||||
return FirmwareVersion(firmware_version["search"],
|
||||
firmware_version["current"],
|
||||
firmware_version["update"],
|
||||
firmware_version["running"])
|
||||
|
||||
|
||||
class PushService:
|
||||
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
|
||||
self.mail_address: str = mail_address
|
||||
self.unit_settings: List = unit_settings
|
||||
self.is_enabled: bool = is_enabled
|
||||
|
||||
def __repr__(self):
|
||||
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
|
||||
f"isEnabled: {self.is_enabled}>"
|
||||
|
||||
def to_json(self):
|
||||
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(push_service: Dict):
|
||||
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
|
||||
|
||||
|
||||
class SkillType(Enum):
|
||||
SmartHomeTemperatureSensor = auto()
|
||||
SmartHomeThermostat = auto()
|
||||
SmartHomeBattery = auto()
|
||||
|
||||
|
||||
class Skill(ABC):
|
||||
def __init__(self, skill_type: SkillType):
|
||||
self.type: SkillType = skill_type
|
||||
|
||||
@abstractmethod
|
||||
def to_json(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def to_web_data(self, device_id: int):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
skill_type = SkillType[skill["type"]]
|
||||
if skill_type == SkillType.SmartHomeTemperatureSensor:
|
||||
return TemperatureSkill.parse_dict(skill)
|
||||
elif skill_type == SkillType.SmartHomeThermostat:
|
||||
return ThermostatSkill.parse_dict(skill)
|
||||
elif skill_type == SkillType.SmartHomeBattery:
|
||||
return BatterySkill.parse_dict(skill)
|
||||
else:
|
||||
raise NotImplementedError(skill_type)
|
||||
|
||||
|
||||
class PresetName(Enum):
|
||||
LOWER_TEMPERATURE = auto()
|
||||
UPPER_TEMPERATURE = auto()
|
||||
HOLIDAY_TEMPERATURE = auto()
|
||||
|
||||
|
||||
class Preset:
|
||||
def __init__(self, name: PresetName, temperature: Optional[int]):
|
||||
self.name: PresetName = name
|
||||
self.temperature: Optional[int] = temperature
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
|
||||
|
||||
def to_json(self):
|
||||
return {"name": self.name.name, "temperature": self.temperature}
|
||||
|
||||
|
||||
class Description:
|
||||
def __init__(self, action: str, preset_temperature: Optional[Preset]):
|
||||
self.action: str = action
|
||||
self.preset_temperature: Optional[Preset] = preset_temperature
|
||||
|
||||
def __repr__(self):
|
||||
desc = f"<Description action: {self.action}"
|
||||
if self.preset_temperature is not None:
|
||||
desc += f"; presetTemperaturet: {self.preset_temperature}"
|
||||
desc += " >"
|
||||
|
||||
return desc
|
||||
|
||||
def to_json(self):
|
||||
if self.preset_temperature is not None:
|
||||
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
|
||||
return {"action": self.action}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(description: Dict):
|
||||
preset: Optional[Preset] = None
|
||||
if "presetTemperature" in description.keys():
|
||||
preset = Preset(PresetName[description["presetTemperature"]["name"]],
|
||||
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
|
||||
return Description(description["action"], preset)
|
||||
|
||||
|
||||
class Repetition(Enum):
|
||||
NONE = auto()
|
||||
YEARLY = auto()
|
||||
|
||||
|
||||
class TimeSetting:
|
||||
def __init__(self, start_date: Optional[str], start_time: Optional[str],
|
||||
end_date: Optional[str] = None, end_time: Optional[str] = None,
|
||||
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
|
||||
self.start_date: Optional[str] = start_date
|
||||
self.start_time: Optional[str] = start_time
|
||||
self.end_date: Optional[str] = end_date
|
||||
self.end_time: Optional[str] = end_time
|
||||
self.repetition: Repetition = repetition
|
||||
self.day_of_week: Optional[WeekDay] = day_of_week
|
||||
|
||||
def __repr__(self):
|
||||
desc = f"<TimeSettings "
|
||||
if self.day_of_week is not None:
|
||||
desc += f"dayOfWeek: {self.day_of_week.name}; "
|
||||
if self.start_date is not None:
|
||||
desc += f"startDate: {self.start_date} "
|
||||
if self.start_time is not None:
|
||||
desc += f"startTime: {self.start_time} "
|
||||
if self.end_date is not None:
|
||||
desc += f"endDate: {self.end_date} "
|
||||
if self.end_time is not None:
|
||||
desc += f"endTime: {self.end_time} "
|
||||
if self.repetition != Repetition.NONE:
|
||||
desc += f"repetition: {self.repetition.name} "
|
||||
desc += ">"
|
||||
|
||||
return desc
|
||||
|
||||
def to_json(self): # ToDo: Return typehints for all to_json
|
||||
state = {}
|
||||
if self.start_date is not None:
|
||||
state["startDate"] = self.start_date
|
||||
if self.start_time is not None:
|
||||
state["startTime"] = self.start_time
|
||||
if self.end_date is not None:
|
||||
state["endDate"] = self.end_date
|
||||
if self.end_time is not None:
|
||||
state["endTime"] = self.end_time
|
||||
if self.repetition != Repetition.NONE:
|
||||
state["repetition"] = self.repetition.name
|
||||
if self.day_of_week is not None:
|
||||
state = {"dayOfWeek": self.day_of_week.name,
|
||||
"time": state}
|
||||
|
||||
return state
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(time_setting: Dict):
|
||||
start_date: Optional[str] = None
|
||||
start_time: Optional[str] = None
|
||||
end_date: Optional[str] = None
|
||||
end_time: Optional[str] = None
|
||||
day_of_week: Optional[WeekDay] = None
|
||||
repetition: Repetition = Repetition.NONE
|
||||
if "dayOfWeek" in time_setting.keys():
|
||||
day_of_week = WeekDay[time_setting["dayOfWeek"]]
|
||||
time_setting = time_setting["time"]
|
||||
if "startDate" in time_setting.keys():
|
||||
start_date = time_setting["startDate"]
|
||||
if "startTime" in time_setting.keys():
|
||||
start_time = time_setting["startTime"]
|
||||
if "endDate" in time_setting.keys():
|
||||
end_date = time_setting["endDate"]
|
||||
if "endTime" in time_setting.keys():
|
||||
end_time = time_setting["endTime"]
|
||||
if "repetition" in time_setting.keys():
|
||||
repetition = Repetition[time_setting["repetition"]]
|
||||
|
||||
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
|
||||
|
||||
|
||||
class Change:
|
||||
def __init__(self, description: Description, time_setting: TimeSetting):
|
||||
self.description: Description = description
|
||||
self.time_setting: TimeSetting = time_setting
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
|
||||
|
||||
def to_json(self):
|
||||
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(change: Dict):
|
||||
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
|
||||
|
||||
|
||||
class TemperatureDropDetection:
|
||||
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
|
||||
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
|
||||
self.sensitivity: int = sensitivity
|
||||
|
||||
def __repr__(self):
|
||||
# ToDo
|
||||
pass
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
|
||||
"sensitivity": self.sensitivity
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
return {
|
||||
"WindowOpenTrigger": self.sensitivity + 3,
|
||||
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(drop_detection: Dict):
|
||||
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
|
||||
drop_detection["sensitivity"])
|
||||
|
||||
|
||||
class ScheduleKind(Enum):
|
||||
REPETITIVE = auto()
|
||||
WEEKLY_TIMETABLE = auto()
|
||||
|
||||
|
||||
class ThermostatSkillMode(Enum):
|
||||
TARGET_TEMPERATURE = auto()
|
||||
|
||||
|
||||
class Action:
|
||||
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
|
||||
self.is_enabled: bool = is_enabled
|
||||
self.time_setting: TimeSetting = time_setting
|
||||
self.description: Description = description
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
|
||||
f"description: {self.description} >"
|
||||
|
||||
def to_json(self):
|
||||
return {"isEnabled": self.is_enabled,
|
||||
"timeSetting": self.time_setting.to_json(),
|
||||
"description": self.description.to_json()}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(action: Dict):
|
||||
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
|
||||
Description.parse_dict(action["description"]))
|
||||
|
||||
|
||||
class Schedule:
|
||||
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
|
||||
self.is_enabled: bool = is_enabled
|
||||
self.kind: ScheduleKind = kind
|
||||
self.name: str = name
|
||||
self.actions: List[Action] = actions
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
|
||||
f"name: {self.name}; actions: {self.actions}>"
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"isEnabled": self.is_enabled,
|
||||
"kind": self.kind.name,
|
||||
"name": self.name,
|
||||
"actions": [action.to_json() for action in self.actions]
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {}
|
||||
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
|
||||
enabled_count = 0
|
||||
for num, holiday in enumerate(self.actions):
|
||||
num += 1
|
||||
_, start_month, start_day = holiday.time_setting.start_date.split("-")
|
||||
_, end_month, end_day = holiday.time_setting.end_date.split("-")
|
||||
data.update({
|
||||
f"Holiday{num}StartDay": start_day,
|
||||
f"Holiday{num}StartMonth": start_month,
|
||||
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
|
||||
f"Holiday{num}EndDay": end_day,
|
||||
f"Holiday{num}EndMonth": end_month,
|
||||
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
|
||||
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
|
||||
f"Holiday{num}ID": num
|
||||
})
|
||||
if holiday.is_enabled:
|
||||
enabled_count += 1
|
||||
data["HolidayEnabledCount"] = enabled_count
|
||||
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
|
||||
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
|
||||
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
|
||||
data = {
|
||||
"SummerStartDay": start_day,
|
||||
"SummerStartMonth": start_month,
|
||||
"SummerEndDay": end_day,
|
||||
"SummerEndMonth": end_month,
|
||||
"SummerEnabled": 1 if self.is_enabled else 0
|
||||
}
|
||||
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
|
||||
timer_items = {}
|
||||
for action in self.actions:
|
||||
if not action.is_enabled:
|
||||
continue
|
||||
time = "".join(action.time_setting.start_time.split(":")[:-1])
|
||||
if time not in timer_items.keys():
|
||||
timer_items[time] = [0, 0]
|
||||
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
|
||||
days = timer_items[time][heat]
|
||||
days |= action.time_setting.day_of_week
|
||||
timer_items[time][heat] = days
|
||||
num = 0
|
||||
for key in timer_items.keys():
|
||||
if timer_items[key][0] != 0:
|
||||
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
|
||||
num += 1
|
||||
if timer_items[key][1] != 0:
|
||||
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
|
||||
num += 1
|
||||
else:
|
||||
raise NotImplementedError(self.name)
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
|
||||
return Schedule(schedule["isEnabled"],
|
||||
ScheduleKind[schedule["kind"]],
|
||||
schedule["name"],
|
||||
[Action.parse_dict(action) for action in schedule["actions"]])
|
||||
|
||||
|
||||
class TimeControl:
|
||||
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
|
||||
self.is_enabled: bool = is_enabled
|
||||
self.time_schedules: List[Schedule] = time_schedules
|
||||
|
||||
def __repr__(self):
|
||||
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"isEnabled": self.is_enabled,
|
||||
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {}
|
||||
for schedule in self.time_schedules:
|
||||
data.update(schedule.to_web_data(device_id))
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(time_control: Dict):
|
||||
return TimeControl(time_control["isEnabled"],
|
||||
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
|
||||
|
||||
|
||||
class ThermostatSkill(Skill):
|
||||
def __init__(self, presets: List[Preset], next_change: Optional[Change],
|
||||
temperature_drop_detection: TemperatureDropDetection,
|
||||
target_temp: int, time_control: TimeControl,
|
||||
mode: ThermostatSkillMode, used_temp_sensor: Unit):
|
||||
super().__init__(SkillType.SmartHomeThermostat)
|
||||
self.presets: List[Preset] = presets
|
||||
self.next_change: Optional[Change] = next_change
|
||||
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
|
||||
self.target_temp: int = target_temp
|
||||
self.time_control: TimeControl = time_control
|
||||
self.mode: ThermostatSkillMode = mode
|
||||
self.used_temp_sensor: Unit = used_temp_sensor
|
||||
|
||||
def __repr__(self):
|
||||
# ToDo
|
||||
pass
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
upper = 0.0
|
||||
lower = 0.0
|
||||
for preset in self.presets:
|
||||
if preset.name == PresetName.UPPER_TEMPERATURE:
|
||||
upper = preset.temperature
|
||||
elif preset.name == PresetName.LOWER_TEMPERATURE:
|
||||
lower = preset.temperature
|
||||
|
||||
data = {
|
||||
"Heiztemp": upper,
|
||||
"Absenktemp": lower
|
||||
}
|
||||
|
||||
if device_id == self.used_temp_sensor.id:
|
||||
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
|
||||
else:
|
||||
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
|
||||
|
||||
data.update(self.time_control.to_web_data(device_id))
|
||||
|
||||
data.update(self.temperature_drop_detection.to_web_data(device_id))
|
||||
|
||||
return data
|
||||
|
||||
def to_json(self):
|
||||
state = {
|
||||
"type": self.type.name,
|
||||
"presets": [preset.to_json() for preset in self.presets],
|
||||
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
|
||||
"targetTemp": self.target_temp,
|
||||
"timeControl": self.time_control.to_json(),
|
||||
"mode": self.mode.name,
|
||||
"usedTempSensor": self.used_temp_sensor.to_json()
|
||||
}
|
||||
if self.next_change is not None:
|
||||
state["nextChange"] = self.next_change.to_json()
|
||||
|
||||
return state
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
return ThermostatSkill(
|
||||
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
|
||||
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
|
||||
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
|
||||
skill["targetTemp"],
|
||||
TimeControl.parse_dict(skill["timeControl"]),
|
||||
ThermostatSkillMode[skill["mode"]],
|
||||
Unit.parse_dict(skill["usedTempSensor"], None)
|
||||
)
|
||||
|
||||
|
||||
class TemperatureSkill(Skill):
|
||||
def __init__(self, offset: float, current_in_celsius: int):
|
||||
super().__init__(SkillType.SmartHomeTemperatureSensor)
|
||||
self.offset: float = offset
|
||||
self.current_in_celsius: int = current_in_celsius
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.type.name}: " \
|
||||
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"type": self.type.name,
|
||||
"offset": self.offset,
|
||||
"currentInCelsius": self.current_in_celsius
|
||||
}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {
|
||||
"Roomtemp": self.current_in_celsius,
|
||||
"Offset": self.offset
|
||||
}
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
|
||||
|
||||
|
||||
class BatterySkill(Skill):
|
||||
def __init__(self, charge_level_in_percent: int):
|
||||
super().__init__(SkillType.SmartHomeBattery)
|
||||
self.charge_level_in_percent: int = charge_level_in_percent
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
|
||||
|
||||
def to_json(self):
|
||||
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(skill: Dict):
|
||||
return BatterySkill(skill["chargeLevelInPercent"])
|
||||
|
||||
|
||||
class UnitTypes(Enum):
|
||||
BATTERY = auto()
|
||||
TEMPERATURE_SENSOR = auto()
|
||||
THERMOSTAT = auto()
|
||||
|
||||
|
||||
class Unit:
|
||||
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
|
||||
device: Optional[Device], skills: List[Union[Dict, Skill]],
|
||||
interaction_controls: Optional[List] = None):
|
||||
self.type: UnitTypes = unit_type
|
||||
self.id: int = idx
|
||||
self.display_name: str = display_name
|
||||
self.device: Optional[Device] = device
|
||||
self.skills: List[Skill] = skills
|
||||
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
|
||||
|
||||
def __repr__(self):
|
||||
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
|
||||
if self.device is not None:
|
||||
desc += f"device: {self.device.short_description()}; "
|
||||
desc += f"skills: {self.skills}; "
|
||||
if self.interaction_controls is not None:
|
||||
desc += f"interactionControls: {self.interaction_controls} ;"
|
||||
return desc
|
||||
|
||||
def to_json(self):
|
||||
state = {
|
||||
"type": self.type.name,
|
||||
"id": self.id,
|
||||
"displayName": self.display_name,
|
||||
"skills": [skill.to_json() for skill in self.skills]
|
||||
}
|
||||
if self.device is not None:
|
||||
state["device"] = self.device.to_short_json()
|
||||
if self.interaction_controls is not None:
|
||||
state["interactionControls"] = self.interaction_controls
|
||||
|
||||
return state
|
||||
|
||||
def to_web_data(self, device_id: int):
|
||||
data = {}
|
||||
for skill in self.skills:
|
||||
data.update(skill.to_web_data(device_id))
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def parse_dict(unit: Dict, device: Optional[Device]):
|
||||
return Unit(UnitTypes[unit["type"]],
|
||||
unit["id"],
|
||||
unit["displayName"],
|
||||
device,
|
||||
[Skill.parse_dict(skill) for skill in unit["skills"]],
|
||||
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
|
||||
|
||||
|
||||
class DeviceType(Enum):
|
||||
SmartHomeDevice = auto()
|
||||
|
||||
|
||||
class DeviceCategory(Enum):
|
||||
THERMOSTAT = auto()
|
||||
|
||||
|
||||
class Device:
|
||||
def __init__(self, state):
|
||||
self.type: Optional[DeviceType] = None
|
||||
self.is_deletable: bool = False
|
||||
self.id: int = -1
|
||||
self.master_connection_state = None # ToDo
|
||||
self.display_name: Optional[str] = None
|
||||
self.category: Optional[str] = None
|
||||
self.units = None # ToDo
|
||||
self.firmware_version: Optional[FirmwareVersion] = None
|
||||
self.model: Optional[str] = None
|
||||
self.is_editable: bool = False
|
||||
self.manufacturer: Optional[Manufacturer] = None
|
||||
self.push_service: Optional[PushService] = None
|
||||
self.actor_identification_number: Optional[str] = None
|
||||
|
||||
if state is not None:
|
||||
self.parse_state(state)
|
||||
|
||||
def __repr__(self):
|
||||
# ToDo
|
||||
pass
|
||||
|
||||
def short_description(self):
|
||||
desc = f"masterConnectionState: {self.master_connection_state}; " \
|
||||
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
|
||||
f"manufacturer: {self.manufacturer}; " \
|
||||
f"actorIdentificationNumber: {self.actor_identification_number}; " \
|
||||
f"displayName: {self.display_name}"
|
||||
|
||||
return desc
|
||||
|
||||
def parse_state(self, state):
|
||||
self.type = DeviceType[state["type"]]
|
||||
self.is_deletable = state["isDeletable"]
|
||||
self.id = state["id"]
|
||||
self.master_connection_state = state["masterConnectionState"]
|
||||
self.display_name = state["displayName"]
|
||||
self.category = state["category"]
|
||||
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
|
||||
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
|
||||
self.model = state["model"]
|
||||
self.is_editable = state["isEditable"]
|
||||
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
|
||||
self.push_service = PushService.parse_dict(state["pushService"])
|
||||
self.actor_identification_number = state["actorIdentificationNumber"]
|
||||
|
||||
def to_web_data(self):
|
||||
data = {
|
||||
"device": self.id,
|
||||
"ule_device_name": self.display_name
|
||||
}
|
||||
for unit in self.units:
|
||||
data.update(unit.to_web_data(self.id))
|
||||
return data
|
||||
|
||||
def to_json(self):
|
||||
state = {"type": self.type.name,
|
||||
"isDeletable": self.is_deletable,
|
||||
"id": self.id,
|
||||
"masterConnectionState": self.master_connection_state,
|
||||
"displayName": self.display_name,
|
||||
"category": self.category,
|
||||
"units": [unit.to_json() for unit in self.units],
|
||||
"firmwareVersion": self.firmware_version.to_json(),
|
||||
"model": self.model,
|
||||
"isEditable": self.is_editable,
|
||||
"manufacturer": self.manufacturer.to_json(),
|
||||
"pushService": self.push_service.to_json(),
|
||||
"actorIdentificationNumber": self.actor_identification_number}
|
||||
|
||||
return state
|
||||
|
||||
def set_offset(self, offset: float):
|
||||
temp_sensor: Optional[Unit] = None
|
||||
for unit in self.units:
|
||||
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||
temp_sensor = unit
|
||||
break
|
||||
temp_skill: Optional[TemperatureSkill] = None
|
||||
for skill in temp_sensor.skills:
|
||||
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||
break
|
||||
temp_skill.offset = offset
|
||||
|
||||
def get_offset(self):
|
||||
temp_sensor: Optional[Unit] = None
|
||||
for unit in self.units:
|
||||
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||
temp_sensor = unit
|
||||
break
|
||||
temp_skill: Optional[TemperatureSkill] = None
|
||||
for skill in temp_sensor.skills:
|
||||
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||
break
|
||||
return temp_skill.offset
|
||||
|
||||
def get_temperature(self):
|
||||
temp_sensor: Optional[Unit] = None
|
||||
for unit in self.units:
|
||||
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||
temp_sensor = unit
|
||||
break
|
||||
temp_skill: Optional[TemperatureSkill] = None
|
||||
for skill in temp_sensor.skills:
|
||||
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||
break
|
||||
return temp_skill.current_in_celsius
|
||||
|
||||
def to_short_json(self):
|
||||
return {
|
||||
"masterConnectionState": self.master_connection_state,
|
||||
"type": self.type.name,
|
||||
"model": self.model,
|
||||
"id": self.id,
|
||||
"manufacturer": self.manufacturer.to_json(),
|
||||
"actorIdentificationNumber": self.actor_identification_number,
|
||||
"displayName": self.display_name
|
||||
}
|
@ -1,28 +1,49 @@
|
||||
from typing import Optional, Tuple
|
||||
import requests
|
||||
import json
|
||||
import re
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import xml.etree.ElementTree as ET
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import xml.etree.ElementTree as ET
|
||||
from asyncio import Task
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict, List
|
||||
|
||||
import requests
|
||||
|
||||
from device import Device
|
||||
|
||||
class FritzBox:
|
||||
def __init__(self, url:str, password:str, user:str = None) -> None:
|
||||
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None:
|
||||
self._endpoints = {
|
||||
"login": "login_sid.lua?version=2",
|
||||
"logout": "index.lua",
|
||||
"data": "data.lua"
|
||||
}
|
||||
self.url = url
|
||||
self.session = requests.Session()
|
||||
self.password = password
|
||||
self.sid = None
|
||||
self.url: str = url
|
||||
self.dry_run: bool = dry_run
|
||||
self.user: Optional[str] = user
|
||||
self.session: requests.Session = requests.Session()
|
||||
self.password: str = password
|
||||
self.sid: Optional[str] = None
|
||||
self.update_timeout: int = update_timeout
|
||||
self.update_time: Dict[str, datetime] = {}
|
||||
self.hold_connection: Optional[Task] = None
|
||||
|
||||
async def hold_connection_alive(self) -> None:
|
||||
while True:
|
||||
# Session automatically destroyed after 20m of inactivity
|
||||
# according to the manual
|
||||
await asyncio.sleep(19 * 60)
|
||||
self.check_session()
|
||||
|
||||
def _calc_challenge_v2(self, challenge: str) -> str:
|
||||
|
||||
logging.debug(f"Calculate v2 challenge: {challenge}")
|
||||
chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
||||
|
||||
chall_regex = re.compile(
|
||||
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
||||
|
||||
chall_parts = chall_regex.match(challenge).groupdict()
|
||||
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
|
||||
iter1: int = int(chall_parts["iter1"])
|
||||
@ -42,8 +63,24 @@ class FritzBox:
|
||||
response = challenge + "-" + hashlib.md5(response).hexdigest()
|
||||
return response
|
||||
|
||||
def login(self, user:str = None) -> bool:
|
||||
logging.debug(f"login user {user}")
|
||||
def check_session(self) -> None:
|
||||
data = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"page": "overview",
|
||||
"xhrId": "first",
|
||||
"noMenuRef": 1
|
||||
}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
if len(r.history) > 0:
|
||||
if not self.login():
|
||||
logging.error("Failed to login to Fritz!Box")
|
||||
else:
|
||||
logging.debug("Already logged in")
|
||||
|
||||
def login(self) -> bool:
|
||||
logging.info(f"Login user {self.user} to Fritz!Box")
|
||||
challenge = None
|
||||
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
|
||||
xml = ET.fromstring(r.text)
|
||||
@ -52,12 +89,12 @@ class FritzBox:
|
||||
self.sid = elem.text
|
||||
elif elem.tag == "Challenge":
|
||||
challenge = elem.text
|
||||
elif user is None and elem.tag == "Users":
|
||||
elif self.user is None and elem.tag == "Users":
|
||||
for user_elem in elem:
|
||||
if "fritz" in user_elem.text:
|
||||
user = user_elem.text
|
||||
self.user = user_elem.text
|
||||
|
||||
assert challenge is not None and user is not None
|
||||
assert challenge is not None and self.user is not None
|
||||
|
||||
if challenge.startswith("2$"):
|
||||
response = self._calc_challenge_v2(challenge)
|
||||
@ -65,7 +102,7 @@ class FritzBox:
|
||||
response = self._calc_challenge_v1(challenge)
|
||||
|
||||
data = {
|
||||
"username": user,
|
||||
"username": self.user,
|
||||
"response": response
|
||||
}
|
||||
|
||||
@ -75,27 +112,31 @@ class FritzBox:
|
||||
for elem in xml:
|
||||
if elem.tag == "SID":
|
||||
self.sid = elem.text
|
||||
|
||||
logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}")
|
||||
|
||||
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
|
||||
if len(self.sid) != self.sid.count("0"):
|
||||
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
|
||||
return len(self.sid) != self.sid.count("0")
|
||||
|
||||
def logout(self) -> bool:
|
||||
logging.debug("logout")
|
||||
logging.info("logout")
|
||||
data = {
|
||||
"xhr":1,
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"logout": 1,
|
||||
"no_sidrenew":""}
|
||||
"no_sidrenew": ""}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
|
||||
if self.hold_connection is not None:
|
||||
self.hold_connection.cancel()
|
||||
|
||||
return r.status_code == 200
|
||||
|
||||
def list_devices(self):
|
||||
def list_devices(self) -> Optional[List[Device]]:
|
||||
data = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"page":"sh_dev",
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"page": "sh_dev",
|
||||
"xhrId": "all"
|
||||
}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
@ -105,62 +146,68 @@ class FritzBox:
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
else:
|
||||
return None
|
||||
devices = json.loads(r.text)["data"]["devices"]
|
||||
|
||||
devices: List[Device] = []
|
||||
for device in json.loads(r.text)["data"]["devices"]:
|
||||
devices.append(Device(device))
|
||||
|
||||
return devices
|
||||
|
||||
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
|
||||
if id is None and name is None:
|
||||
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
|
||||
if idx is None and name is None:
|
||||
logging.debug("No id or name given")
|
||||
return None
|
||||
|
||||
devices = self.list_devices()
|
||||
device = None
|
||||
for device in devices:
|
||||
if device["id"] == id or device["displayName"] == name:
|
||||
if device.id == idx or device.display_name == name:
|
||||
break
|
||||
device = None
|
||||
|
||||
|
||||
if device is None:
|
||||
logging.debug(f"Device {id} {name} not found")
|
||||
logging.debug(f"Device {idx} {name} not found")
|
||||
return None
|
||||
|
||||
current_temp = None
|
||||
current_offset = None
|
||||
for unit in device["units"]:
|
||||
if unit["type"] == "TEMPERATURE_SENSOR":
|
||||
for skill in unit["skills"]:
|
||||
if skill["type"] == "SmartHomeTemperatureSensor":
|
||||
current_temp = float(skill["currentInCelsius"])
|
||||
current_offset = float(skill["offset"])
|
||||
return device
|
||||
|
||||
return current_temp, current_offset, device["id"], device["displayName"]
|
||||
|
||||
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str):
|
||||
def set_offset(self, device: Device) -> None:
|
||||
if self.dry_run:
|
||||
logging.warning("No updates in dry-run-mode")
|
||||
return
|
||||
data = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"device": device_id,
|
||||
"page": "home_auto_hkr_edit"
|
||||
}
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
|
||||
data = {
|
||||
"xhr":1,
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"device": device.id,
|
||||
"page": "home_auto_hkr_edit"
|
||||
}
|
||||
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
|
||||
data = {
|
||||
"xhr": 1,
|
||||
"sid": self.sid,
|
||||
"lang": "de",
|
||||
"device": device_id,
|
||||
"view": "",
|
||||
"back_to_page": "sh_dev",
|
||||
"ule_device_name": device_name,
|
||||
"WindowOpenTrigger":8,
|
||||
"WindowOpenTimer":10,
|
||||
"tempsensor": "own",
|
||||
"Roomtemp": f"{current_temp}",
|
||||
"ExtTempsensorID":"tochoose",
|
||||
"Offset": f"{offset}",
|
||||
"apply":"",
|
||||
"oldpage":"/net/home_auto_hkr_edit.lua"
|
||||
"apply": "",
|
||||
"oldpage": "/net/home_auto_hkr_edit.lua"
|
||||
}
|
||||
data.update(device.to_web_data())
|
||||
|
||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||
|
||||
def correct_offset(self, device_name: str, real_temp: float):
|
||||
elapsed = None
|
||||
if device_name in self.update_time.keys():
|
||||
elapsed = datetime.now() - self.update_time[device_name]
|
||||
logging.info(f"Last update for {device_name} {elapsed} ago")
|
||||
delta = timedelta(minutes=self.update_timeout)
|
||||
if device_name not in self.update_time.keys() or elapsed > delta:
|
||||
device: Optional[Device] = self.get_device_data(name=device_name)
|
||||
if device is None:
|
||||
return
|
||||
new_offset = device.get_offset() + real_temp - device.get_temperature()
|
||||
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
|
||||
device.set_offset(new_offset)
|
||||
self.set_offset(device)
|
||||
self.update_time[device.display_name] = datetime.now()
|
||||
|
@ -1,66 +1,152 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Callable
|
||||
import websocket
|
||||
import time
|
||||
from asyncio import Queue, Task, Event, Lock
|
||||
from typing import Callable, Dict, Optional
|
||||
import websockets
|
||||
from websockets.exceptions import InvalidStatusCode
|
||||
|
||||
|
||||
class HomeAssistantAPI:
|
||||
def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None:
|
||||
def __init__(self, token: str, url: str) -> None:
|
||||
self.token = token
|
||||
self.msg_id = 1
|
||||
self.ws = None
|
||||
self.subscriptions = {}
|
||||
self.init_callback = initialize
|
||||
self.msg_id_lock = Lock()
|
||||
self.ws: websockets.WebSocketClientProtocol = None
|
||||
self.url = url
|
||||
self.receiver: Optional[Task] = None
|
||||
self.sender: Optional[Task] = None
|
||||
self.sending_queue: Queue = Queue()
|
||||
self.authenticated: Event = Event()
|
||||
self.events: Dict[int, Queue] = {}
|
||||
self.responses: Dict[int, Dict] = {}
|
||||
self.response_events: Dict[int, Event] = {}
|
||||
self.response_lock: Lock = Lock()
|
||||
|
||||
def handle_message(self, ws: websocket.WebSocket, msg: str) -> None:
|
||||
if self.ws is None:
|
||||
self.ws = ws
|
||||
async def connect(self):
|
||||
retries = 5
|
||||
logging.info("Connect to home assistant...")
|
||||
while True:
|
||||
try:
|
||||
self.ws = await websockets.connect(self.url)
|
||||
self.sender = asyncio.create_task(self.sending())
|
||||
await self.auth()
|
||||
self.receiver = asyncio.create_task(self.receiving())
|
||||
return True
|
||||
except (InvalidStatusCode, TimeoutError):
|
||||
if retries > 0:
|
||||
retries -= 1
|
||||
await asyncio.sleep(30)
|
||||
logging.info(f"Retry home assistant connection... ({retries})")
|
||||
continue
|
||||
else:
|
||||
logging.error("Invalid status code while connecting to Home Assistant")
|
||||
await self.exit_loop()
|
||||
return False
|
||||
|
||||
message: object = json.loads(msg)
|
||||
|
||||
if message["type"] == "auth_required":
|
||||
response = {
|
||||
"type": "auth",
|
||||
"access_token": self.token
|
||||
}
|
||||
logging.debug(response)
|
||||
ws.send(json.dumps(response))
|
||||
return
|
||||
elif message["type"] == "auth_invalid":
|
||||
logging.info("Auth failed")
|
||||
ws.close()
|
||||
return None
|
||||
elif message["type"] == "auth_ok":
|
||||
logging.debug("Authenticated")
|
||||
self.init_callback(self)
|
||||
self.init_callback = None
|
||||
return
|
||||
elif message["type"] == "event":
|
||||
event = message["event"]
|
||||
if event["event_type"] in self.subscriptions.keys():
|
||||
self.subscriptions[event["event_type"]](event)
|
||||
|
||||
else:
|
||||
print("Received", message)
|
||||
|
||||
def subscribe_event(self, event_type: str, callback: Callable[[object], None]):
|
||||
async def wait_for_close(self):
|
||||
await self.ws.wait_closed()
|
||||
|
||||
if self.ws is None:
|
||||
logging.debug("Websocket not set")
|
||||
return
|
||||
async def receiving(self):
|
||||
logging.debug("Start receiving")
|
||||
async for message in self.ws:
|
||||
msg: Dict = json.loads(message)
|
||||
if msg["type"] == "event":
|
||||
if msg["id"] not in self.events.keys():
|
||||
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
|
||||
continue
|
||||
await self.events[msg["id"]].put(msg["event"])
|
||||
else:
|
||||
async with self.response_lock:
|
||||
self.responses[msg["id"]] = msg
|
||||
if msg["id"] in self.response_events.keys():
|
||||
self.response_events[msg["id"]].set()
|
||||
|
||||
if event_type in self.subscriptions.keys():
|
||||
logging.warning(f"Already subscribed to {event_type}")
|
||||
return
|
||||
|
||||
logging.info(f"Subscribe to {event_type}")
|
||||
self.subscriptions[event_type] = callback
|
||||
async def wait_for(self, idx):
|
||||
async with self.response_lock:
|
||||
if idx in self.responses.keys():
|
||||
msg = self.responses[idx]
|
||||
del self.responses[idx]
|
||||
return msg
|
||||
self.response_events[idx] = Event()
|
||||
|
||||
await self.response_events[idx].wait()
|
||||
async with self.response_lock:
|
||||
del self.response_events[idx]
|
||||
if idx not in self.responses.keys():
|
||||
logging.error("Response ID not found")
|
||||
return None
|
||||
msg = self.responses[idx]
|
||||
del self.responses[idx]
|
||||
return msg
|
||||
|
||||
async def exit_loop(self):
|
||||
if self.sender is not None:
|
||||
self.sender.cancel()
|
||||
if self.receiver is not None:
|
||||
self.receiver.cancel()
|
||||
|
||||
async def auth(self):
|
||||
msg = json.loads(await self.ws.recv())
|
||||
if msg["type"] != "auth_required":
|
||||
logging.error("Authentication error: Not required")
|
||||
await self.exit_loop()
|
||||
response = {
|
||||
"id": self.msg_id,
|
||||
"type": "subscribe_events",
|
||||
"event_type": event_type
|
||||
"type": "auth",
|
||||
"access_token": self.token
|
||||
}
|
||||
self.msg_id += 1
|
||||
self.ws.send(json.dumps(response))
|
||||
await self.sending_queue.put(response)
|
||||
msg = json.loads(await self.ws.recv())
|
||||
if msg["type"] == "auth_invalid":
|
||||
logging.info("Auth failed")
|
||||
await self.exit_loop()
|
||||
elif msg["type"] == "auth_ok":
|
||||
logging.debug("Authenticated")
|
||||
self.authenticated.set()
|
||||
else:
|
||||
logging.error(f"Unknown answer for auth: {msg}")
|
||||
await self.exit_loop()
|
||||
|
||||
async def sending(self):
|
||||
while msg := await self.sending_queue.get():
|
||||
await self.ws.send(json.dumps(msg))
|
||||
|
||||
async def subscribe_event(self, event_type: str):
|
||||
await self.authenticated.wait()
|
||||
|
||||
logging.info(f"Subscribe to {event_type}")
|
||||
async with self.msg_id_lock:
|
||||
msg_id = self.msg_id
|
||||
response = {
|
||||
"id": msg_id,
|
||||
"type": "subscribe_events",
|
||||
"event_type": event_type
|
||||
}
|
||||
self.events[msg_id] = Queue()
|
||||
self.msg_id += 1
|
||||
await self.sending_queue.put(response)
|
||||
return msg_id
|
||||
|
||||
async def get_states(self):
|
||||
await self.authenticated.wait()
|
||||
async with self.msg_id_lock:
|
||||
message = {
|
||||
"id": self.msg_id,
|
||||
"type": "get_states"
|
||||
}
|
||||
self.msg_id += 1
|
||||
await self.sending_queue.put(message)
|
||||
|
||||
response = await self.wait_for(message["id"])
|
||||
# ToDo: Error handling
|
||||
return response["result"]
|
||||
|
||||
async def get_device_state(self, entity_id: str):
|
||||
device_states = await self.get_states()
|
||||
for device_state in device_states:
|
||||
if device_state["entity_id"] == entity_id:
|
||||
return device_state
|
||||
|
||||
return None
|
||||
|
@ -1,3 +1,3 @@
|
||||
#!/usr/bin/env bashio
|
||||
#!/usr/bin/with-contenv bashio
|
||||
|
||||
python3 /srv/sync_ha_fb.py
|
||||
python3 /srv/sync_ha_fb.py /data/options.json
|
@ -1,78 +1,125 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
hier die verbindungen zu HA aufbauen etc
|
||||
außerdem das vergleichen der werte und dass anstoßen der updates
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from fritzbox import FritzBox
|
||||
from homeassistant import HomeAssistantAPI
|
||||
import logging
|
||||
import websocket
|
||||
import json
|
||||
|
||||
mappings = {}
|
||||
sensor_mappings = {}
|
||||
thermostate_mappings = {}
|
||||
|
||||
def handle_event(event):
|
||||
entity_id = event["data"]["entity_id"]
|
||||
if entity_id in mappings.keys():
|
||||
new_state = event["data"]["new_state"]
|
||||
logging.debug(entity_id)
|
||||
logging.debug(new_state["attributes"]["temperature"])
|
||||
rounded = round(float(new_state["attributes"]["temperature"])*2)/2
|
||||
logging.debug(rounded)
|
||||
if new_state["attributes"]["device_class"] == "temperature":
|
||||
if entity_id in mappings.keys():
|
||||
fb.login()
|
||||
logged = False
|
||||
for thermostate in mappings[entity_id]:
|
||||
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
|
||||
if not logged:
|
||||
logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})")
|
||||
logged = True
|
||||
logging.info(f"Current measurement from {thermostate}: {current_temp}")
|
||||
new_offset = current_offset + rounded - current_temp
|
||||
if new_offset != current_offset:
|
||||
old_offset = current_offset
|
||||
logging.debug(f"Set offset for {thermostate} from {current_offset} to {new_offset}")
|
||||
fb.set_offset(current_temp, new_offset, id, name)
|
||||
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
|
||||
logging.debug(f"Target: {new_offset} ; Set: {current_offset}")
|
||||
if new_offset == current_offset:
|
||||
logging.info(f"Adjustet offset from {old_offset} to {new_offset}")
|
||||
else:
|
||||
logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}")
|
||||
fb.logout()
|
||||
|
||||
def on_error(ws, error):
|
||||
print(error)
|
||||
async def handle_event(idx: int):
|
||||
global ha, fb
|
||||
logging.debug(f"Wait for events for {idx}")
|
||||
|
||||
def on_close(ws, close_status_code, close_msg):
|
||||
pass
|
||||
while event := await ha.events[idx].get():
|
||||
try:
|
||||
entity_id = event["data"]["entity_id"]
|
||||
if (
|
||||
entity_id in sensor_mappings.keys()
|
||||
or entity_id in thermostate_mappings.keys()
|
||||
):
|
||||
state = await ha.get_device_state(entity_id)
|
||||
new_state = event["data"]["new_state"]
|
||||
logging.info(
|
||||
f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}"
|
||||
)
|
||||
if (
|
||||
entity_id in thermostate_mappings.keys()
|
||||
and state["state"] != "unavailable"
|
||||
):
|
||||
therm_temp = new_state["attributes"]["current_temperature"]
|
||||
therm_name = new_state["attributes"]["friendly_name"]
|
||||
sensor = thermostate_mappings[entity_id]
|
||||
sensor_state = await ha.get_device_state(sensor)
|
||||
sensor_temp = round(float(sensor_state["state"]) * 2) / 2
|
||||
logging.info(f"temps: {therm_temp} {sensor_temp}")
|
||||
if therm_temp != sensor_temp:
|
||||
logging.info(
|
||||
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})"
|
||||
)
|
||||
fb.correct_offset(therm_name, sensor_temp)
|
||||
|
||||
def on_open(ws):
|
||||
pass
|
||||
elif entity_id in sensor_mappings.keys():
|
||||
logging.info(f"here {sensor_mappings} {entity_id}")
|
||||
logging.info(f"{new_state}")
|
||||
sensor_temp = round(float(new_state["state"]) * 2) / 2
|
||||
logging.info(f"entry: {sensor_mappings[entity_id]}")
|
||||
for thermostate in sensor_mappings[entity_id]:
|
||||
logging.info(thermostate)
|
||||
therm_state = await ha.get_device_state(thermostate)
|
||||
logging.info(f"{thermostate} {therm_state}")
|
||||
if therm_state["state"] == "unavailable":
|
||||
continue
|
||||
therm_temp = float(
|
||||
therm_state["attributes"]["current_temperature"]
|
||||
)
|
||||
therm_name = therm_state["attributes"]["friendly_name"]
|
||||
logging.info(f"Temps: {therm_temp} {sensor_temp}")
|
||||
if therm_temp != sensor_temp:
|
||||
logging.info(
|
||||
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})"
|
||||
)
|
||||
fb.correct_offset(therm_name, sensor_temp)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def init(ha: HomeAssistantAPI):
|
||||
|
||||
async def init(ha: HomeAssistantAPI, fb: FritzBox):
|
||||
if not await ha.connect():
|
||||
return
|
||||
logging.debug("Subscribe")
|
||||
ha.subscribe_event("state_changed", handle_event)
|
||||
state_changed_id = await ha.subscribe_event("state_changed")
|
||||
logging.debug(state_changed_id)
|
||||
asyncio.create_task(handle_event(state_changed_id))
|
||||
fb.login()
|
||||
await ha.wait_for_close()
|
||||
logging.info("Websocket closed, shutting down..")
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s")
|
||||
config = json.load(open("/data/options.json"))
|
||||
logging.debug(config)
|
||||
for mapping in config["mappings"]:
|
||||
if mapping["sensor"] not in mappings.keys():
|
||||
mappings[mapping["sensor"]] = []
|
||||
mappings[mapping["sensor"]].append(mapping["thermostate"])
|
||||
async def main():
|
||||
config_path = sys.argv[1]
|
||||
config = json.load(open(config_path))
|
||||
level = logging.INFO
|
||||
if "log_level" in config:
|
||||
print(f"Setting log_level {config['log_level']}")
|
||||
if config["log_level"] == "DEBUG":
|
||||
level = logging.DEBUG
|
||||
logging.basicConfig(
|
||||
level=level, format="[%(asctime)s] [%(levelname)s] %(message)s"
|
||||
)
|
||||
logging.debug(config)
|
||||
|
||||
fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"])
|
||||
ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init)
|
||||
global fb
|
||||
fb = FritzBox(
|
||||
url=config["fritzbox"]["url"],
|
||||
user=config["fritzbox"]["username"],
|
||||
password=config["fritzbox"]["password"],
|
||||
update_timeout=config["update_timeout"],
|
||||
dry_run=False,
|
||||
)
|
||||
supervisor_url = "ws://supervisor/core/websocket"
|
||||
if "SUPERVISOR_URL" in os.environ:
|
||||
supervisor_url = os.environ["SUPERVISOR_URL"]
|
||||
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
|
||||
global ha
|
||||
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
|
||||
|
||||
websocket.enableTrace(False)
|
||||
ws = websocket.WebSocketApp("ws://supervisor/core/websocket",
|
||||
on_open=on_open,
|
||||
on_message=ha.handle_message,
|
||||
on_error=on_error,
|
||||
on_close=on_close)
|
||||
for mapping in config["mappings"]:
|
||||
if mapping["sensor"] not in sensor_mappings.keys():
|
||||
sensor_mappings[mapping["sensor"]] = []
|
||||
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
|
||||
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
|
||||
logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}")
|
||||
try:
|
||||
await init(ha, fb)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
ws.run_forever()
|
||||
|
||||
asyncio.run(main())
|
||||
|
Loading…
x
Reference in New Issue
Block a user