Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
5aa94a3c97 | |||
23140f672f | |||
00b161945f | |||
89fc6ea73c | |||
f6f46fba8f | |||
e01842a118 | |||
077ad19255 | |||
c36c75ac48 | |||
431450010e | |||
076af5add5 | |||
ce59adccf0 | |||
52c0e90a52 | |||
f6600c7dda | |||
43c2f0284a | |||
084bc5ba1c | |||
a1d7dfa9bd | |||
1a561a45d7 | |||
8aeb8cb1a6 | |||
5a14d251b4 | |||
9bf21d04e3 |
5
.gitignore
vendored
5
.gitignore
vendored
@ -15,6 +15,7 @@ dist/
|
|||||||
downloads/
|
downloads/
|
||||||
eggs/
|
eggs/
|
||||||
.eggs/
|
.eggs/
|
||||||
|
.venv/
|
||||||
lib/
|
lib/
|
||||||
lib64/
|
lib64/
|
||||||
parts/
|
parts/
|
||||||
@ -168,6 +169,7 @@ pip-selfcheck.json
|
|||||||
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
|
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
|
||||||
|
|
||||||
# User-specific stuff
|
# User-specific stuff
|
||||||
|
.idea/
|
||||||
.idea/**/workspace.xml
|
.idea/**/workspace.xml
|
||||||
.idea/**/tasks.xml
|
.idea/**/tasks.xml
|
||||||
.idea/**/usage.statistics.xml
|
.idea/**/usage.statistics.xml
|
||||||
@ -239,3 +241,6 @@ fabric.properties
|
|||||||
# Android studio 3.1+ serialized cache file
|
# Android studio 3.1+ serialized cache file
|
||||||
.idea/caches/build_file_checksums.ser
|
.idea/caches/build_file_checksums.ser
|
||||||
|
|
||||||
|
# local config test
|
||||||
|
options.json
|
||||||
|
.DS_Store
|
||||||
|
@ -2,8 +2,7 @@ ARG BUILD_FROM
|
|||||||
FROM $BUILD_FROM
|
FROM $BUILD_FROM
|
||||||
|
|
||||||
# Install requirements for add-on
|
# Install requirements for add-on
|
||||||
RUN apk update && apk add --no-cache python3 py-pip
|
RUN apk update && apk add --no-cache python3 py3-pip py3-websockets py3-requests
|
||||||
RUN python3 -m pip install websocket-client requests
|
|
||||||
|
|
||||||
WORKDIR /data
|
WORKDIR /data
|
||||||
|
|
||||||
@ -11,6 +10,7 @@ WORKDIR /data
|
|||||||
COPY sync_ha_fb.py /srv
|
COPY sync_ha_fb.py /srv
|
||||||
COPY fritzbox.py /srv
|
COPY fritzbox.py /srv
|
||||||
COPY homeassistant.py /srv
|
COPY homeassistant.py /srv
|
||||||
|
COPY device.py /srv
|
||||||
COPY run.sh /
|
COPY run.sh /
|
||||||
RUN chmod a+x /run.sh
|
RUN chmod a+x /run.sh
|
||||||
|
|
||||||
|
@ -1,8 +1,11 @@
|
|||||||
name: "Fritz!Box Temperature Sync"
|
name: "Fritz!Box Temperature Sync Dev"
|
||||||
description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant"
|
description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
|
||||||
version: "0.1.0"
|
version: "0.4.3"
|
||||||
slug: "fritz_temp_sync"
|
startup: "application"
|
||||||
|
stage: "stable"
|
||||||
|
slug: "fritz_temp_sync_dev"
|
||||||
homeassistant_api: true
|
homeassistant_api: true
|
||||||
|
init: false
|
||||||
arch:
|
arch:
|
||||||
- aarch64
|
- aarch64
|
||||||
- amd64
|
- amd64
|
||||||
@ -16,10 +19,14 @@ options:
|
|||||||
mappings:
|
mappings:
|
||||||
- sensor: null
|
- sensor: null
|
||||||
thermostate: null
|
thermostate: null
|
||||||
|
update_timeout: 15
|
||||||
schema:
|
schema:
|
||||||
fritzbox:
|
fritzbox:
|
||||||
url: url
|
url: url
|
||||||
|
username: "str?"
|
||||||
password: str
|
password: str
|
||||||
mappings:
|
mappings:
|
||||||
- sensor: str
|
- sensor: str
|
||||||
thermostate: str
|
thermostate: str
|
||||||
|
update_timeout: int
|
||||||
|
log_level: "str?"
|
||||||
|
714
fritz_temp_sync/device.py
Executable file
714
fritz_temp_sync/device.py
Executable file
@ -0,0 +1,714 @@
|
|||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from enum import Enum, IntFlag, auto
|
||||||
|
from typing import Dict, List, Optional, Union
|
||||||
|
import typing
|
||||||
|
|
||||||
|
|
||||||
|
class WeekDay(IntFlag):
|
||||||
|
MON = 0b1
|
||||||
|
TUE = 0b10
|
||||||
|
WED = 0b100
|
||||||
|
THU = 0b1000
|
||||||
|
FRI = 0b10000
|
||||||
|
SAT = 0b100000
|
||||||
|
SUN = 0b1000000
|
||||||
|
|
||||||
|
|
||||||
|
class Manufacturer:
|
||||||
|
def __init__(self, name: str):
|
||||||
|
self.name: str = name
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"name: {self.name}"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"name": self.name}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(manufacturer: Dict):
|
||||||
|
return Manufacturer(manufacturer["name"])
|
||||||
|
|
||||||
|
|
||||||
|
class FirmwareVersion:
|
||||||
|
def __init__(self, search: bool, current: str, update: bool, running: bool):
|
||||||
|
self.search: bool = search
|
||||||
|
self.current: str = current
|
||||||
|
self.update: bool = update
|
||||||
|
self.running: bool = running
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(firmware_version: dict):
|
||||||
|
return FirmwareVersion(firmware_version["search"],
|
||||||
|
firmware_version["current"],
|
||||||
|
firmware_version["update"],
|
||||||
|
firmware_version["running"])
|
||||||
|
|
||||||
|
|
||||||
|
class PushService:
|
||||||
|
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
|
||||||
|
self.mail_address: str = mail_address
|
||||||
|
self.unit_settings: List = unit_settings
|
||||||
|
self.is_enabled: bool = is_enabled
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
|
||||||
|
f"isEnabled: {self.is_enabled}>"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(push_service: Dict):
|
||||||
|
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
|
||||||
|
|
||||||
|
|
||||||
|
class SkillType(Enum):
|
||||||
|
SmartHomeTemperatureSensor = auto()
|
||||||
|
SmartHomeThermostat = auto()
|
||||||
|
SmartHomeBattery = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class Skill(ABC):
|
||||||
|
def __init__(self, skill_type: SkillType):
|
||||||
|
self.type: SkillType = skill_type
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def to_json(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(skill: Dict):
|
||||||
|
skill_type = SkillType[skill["type"]]
|
||||||
|
if skill_type == SkillType.SmartHomeTemperatureSensor:
|
||||||
|
return TemperatureSkill.parse_dict(skill)
|
||||||
|
elif skill_type == SkillType.SmartHomeThermostat:
|
||||||
|
return ThermostatSkill.parse_dict(skill)
|
||||||
|
elif skill_type == SkillType.SmartHomeBattery:
|
||||||
|
return BatterySkill.parse_dict(skill)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(skill_type)
|
||||||
|
|
||||||
|
|
||||||
|
class PresetName(Enum):
|
||||||
|
LOWER_TEMPERATURE = auto()
|
||||||
|
UPPER_TEMPERATURE = auto()
|
||||||
|
HOLIDAY_TEMPERATURE = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class Preset:
|
||||||
|
def __init__(self, name: PresetName, temperature: Optional[int]):
|
||||||
|
self.name: PresetName = name
|
||||||
|
self.temperature: Optional[int] = temperature
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"name": self.name.name, "temperature": self.temperature}
|
||||||
|
|
||||||
|
|
||||||
|
class Description:
|
||||||
|
def __init__(self, action: str, preset_temperature: Optional[Preset]):
|
||||||
|
self.action: str = action
|
||||||
|
self.preset_temperature: Optional[Preset] = preset_temperature
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
desc = f"<Description action: {self.action}"
|
||||||
|
if self.preset_temperature is not None:
|
||||||
|
desc += f"; presetTemperaturet: {self.preset_temperature}"
|
||||||
|
desc += " >"
|
||||||
|
|
||||||
|
return desc
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
if self.preset_temperature is not None:
|
||||||
|
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
|
||||||
|
return {"action": self.action}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(description: Dict):
|
||||||
|
preset: Optional[Preset] = None
|
||||||
|
if "presetTemperature" in description.keys():
|
||||||
|
preset = Preset(PresetName[description["presetTemperature"]["name"]],
|
||||||
|
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
|
||||||
|
return Description(description["action"], preset)
|
||||||
|
|
||||||
|
|
||||||
|
class Repetition(Enum):
|
||||||
|
NONE = auto()
|
||||||
|
YEARLY = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class TimeSetting:
|
||||||
|
def __init__(self, start_date: Optional[str], start_time: Optional[str],
|
||||||
|
end_date: Optional[str] = None, end_time: Optional[str] = None,
|
||||||
|
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
|
||||||
|
self.start_date: Optional[str] = start_date
|
||||||
|
self.start_time: Optional[str] = start_time
|
||||||
|
self.end_date: Optional[str] = end_date
|
||||||
|
self.end_time: Optional[str] = end_time
|
||||||
|
self.repetition: Repetition = repetition
|
||||||
|
self.day_of_week: Optional[WeekDay] = day_of_week
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
desc = f"<TimeSettings "
|
||||||
|
if self.day_of_week is not None:
|
||||||
|
desc += f"dayOfWeek: {self.day_of_week.name}; "
|
||||||
|
if self.start_date is not None:
|
||||||
|
desc += f"startDate: {self.start_date} "
|
||||||
|
if self.start_time is not None:
|
||||||
|
desc += f"startTime: {self.start_time} "
|
||||||
|
if self.end_date is not None:
|
||||||
|
desc += f"endDate: {self.end_date} "
|
||||||
|
if self.end_time is not None:
|
||||||
|
desc += f"endTime: {self.end_time} "
|
||||||
|
if self.repetition != Repetition.NONE:
|
||||||
|
desc += f"repetition: {self.repetition.name} "
|
||||||
|
desc += ">"
|
||||||
|
|
||||||
|
return desc
|
||||||
|
|
||||||
|
def to_json(self): # ToDo: Return typehints for all to_json
|
||||||
|
state = {}
|
||||||
|
if self.start_date is not None:
|
||||||
|
state["startDate"] = self.start_date
|
||||||
|
if self.start_time is not None:
|
||||||
|
state["startTime"] = self.start_time
|
||||||
|
if self.end_date is not None:
|
||||||
|
state["endDate"] = self.end_date
|
||||||
|
if self.end_time is not None:
|
||||||
|
state["endTime"] = self.end_time
|
||||||
|
if self.repetition != Repetition.NONE:
|
||||||
|
state["repetition"] = self.repetition.name
|
||||||
|
if self.day_of_week is not None:
|
||||||
|
state = {"dayOfWeek": self.day_of_week.name,
|
||||||
|
"time": state}
|
||||||
|
|
||||||
|
return state
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(time_setting: Dict):
|
||||||
|
start_date: Optional[str] = None
|
||||||
|
start_time: Optional[str] = None
|
||||||
|
end_date: Optional[str] = None
|
||||||
|
end_time: Optional[str] = None
|
||||||
|
day_of_week: Optional[WeekDay] = None
|
||||||
|
repetition: Repetition = Repetition.NONE
|
||||||
|
if "dayOfWeek" in time_setting.keys():
|
||||||
|
day_of_week = WeekDay[time_setting["dayOfWeek"]]
|
||||||
|
time_setting = time_setting["time"]
|
||||||
|
if "startDate" in time_setting.keys():
|
||||||
|
start_date = time_setting["startDate"]
|
||||||
|
if "startTime" in time_setting.keys():
|
||||||
|
start_time = time_setting["startTime"]
|
||||||
|
if "endDate" in time_setting.keys():
|
||||||
|
end_date = time_setting["endDate"]
|
||||||
|
if "endTime" in time_setting.keys():
|
||||||
|
end_time = time_setting["endTime"]
|
||||||
|
if "repetition" in time_setting.keys():
|
||||||
|
repetition = Repetition[time_setting["repetition"]]
|
||||||
|
|
||||||
|
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
|
||||||
|
|
||||||
|
|
||||||
|
class Change:
|
||||||
|
def __init__(self, description: Description, time_setting: TimeSetting):
|
||||||
|
self.description: Description = description
|
||||||
|
self.time_setting: TimeSetting = time_setting
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(change: Dict):
|
||||||
|
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
|
||||||
|
|
||||||
|
|
||||||
|
class TemperatureDropDetection:
|
||||||
|
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
|
||||||
|
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
|
||||||
|
self.sensitivity: int = sensitivity
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
# ToDo
|
||||||
|
pass
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {
|
||||||
|
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
|
||||||
|
"sensitivity": self.sensitivity
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
return {
|
||||||
|
"WindowOpenTrigger": self.sensitivity + 3,
|
||||||
|
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(drop_detection: Dict):
|
||||||
|
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
|
||||||
|
drop_detection["sensitivity"])
|
||||||
|
|
||||||
|
|
||||||
|
class ScheduleKind(Enum):
|
||||||
|
REPETITIVE = auto()
|
||||||
|
WEEKLY_TIMETABLE = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class ThermostatSkillMode(Enum):
|
||||||
|
TARGET_TEMPERATURE = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class Action:
|
||||||
|
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
|
||||||
|
self.is_enabled: bool = is_enabled
|
||||||
|
self.time_setting: TimeSetting = time_setting
|
||||||
|
self.description: Description = description
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
|
||||||
|
f"description: {self.description} >"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"isEnabled": self.is_enabled,
|
||||||
|
"timeSetting": self.time_setting.to_json(),
|
||||||
|
"description": self.description.to_json()}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(action: Dict):
|
||||||
|
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
|
||||||
|
Description.parse_dict(action["description"]))
|
||||||
|
|
||||||
|
|
||||||
|
class Schedule:
|
||||||
|
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
|
||||||
|
self.is_enabled: bool = is_enabled
|
||||||
|
self.kind: ScheduleKind = kind
|
||||||
|
self.name: str = name
|
||||||
|
self.actions: List[Action] = actions
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
|
||||||
|
f"name: {self.name}; actions: {self.actions}>"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {
|
||||||
|
"isEnabled": self.is_enabled,
|
||||||
|
"kind": self.kind.name,
|
||||||
|
"name": self.name,
|
||||||
|
"actions": [action.to_json() for action in self.actions]
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
data = {}
|
||||||
|
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
|
||||||
|
enabled_count = 0
|
||||||
|
for num, holiday in enumerate(self.actions):
|
||||||
|
num += 1
|
||||||
|
_, start_month, start_day = holiday.time_setting.start_date.split("-")
|
||||||
|
_, end_month, end_day = holiday.time_setting.end_date.split("-")
|
||||||
|
data.update({
|
||||||
|
f"Holiday{num}StartDay": start_day,
|
||||||
|
f"Holiday{num}StartMonth": start_month,
|
||||||
|
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
|
||||||
|
f"Holiday{num}EndDay": end_day,
|
||||||
|
f"Holiday{num}EndMonth": end_month,
|
||||||
|
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
|
||||||
|
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
|
||||||
|
f"Holiday{num}ID": num
|
||||||
|
})
|
||||||
|
if holiday.is_enabled:
|
||||||
|
enabled_count += 1
|
||||||
|
data["HolidayEnabledCount"] = enabled_count
|
||||||
|
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
|
||||||
|
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
|
||||||
|
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
|
||||||
|
data = {
|
||||||
|
"SummerStartDay": start_day,
|
||||||
|
"SummerStartMonth": start_month,
|
||||||
|
"SummerEndDay": end_day,
|
||||||
|
"SummerEndMonth": end_month,
|
||||||
|
"SummerEnabled": 1 if self.is_enabled else 0
|
||||||
|
}
|
||||||
|
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
|
||||||
|
timer_items = {}
|
||||||
|
for action in self.actions:
|
||||||
|
if not action.is_enabled:
|
||||||
|
continue
|
||||||
|
time = "".join(action.time_setting.start_time.split(":")[:-1])
|
||||||
|
if time not in timer_items.keys():
|
||||||
|
timer_items[time] = [0, 0]
|
||||||
|
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
|
||||||
|
days = timer_items[time][heat]
|
||||||
|
days |= action.time_setting.day_of_week
|
||||||
|
timer_items[time][heat] = days
|
||||||
|
num = 0
|
||||||
|
for key in timer_items.keys():
|
||||||
|
if timer_items[key][0] != 0:
|
||||||
|
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
|
||||||
|
num += 1
|
||||||
|
if timer_items[key][1] != 0:
|
||||||
|
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
|
||||||
|
num += 1
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(self.name)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
|
||||||
|
return Schedule(schedule["isEnabled"],
|
||||||
|
ScheduleKind[schedule["kind"]],
|
||||||
|
schedule["name"],
|
||||||
|
[Action.parse_dict(action) for action in schedule["actions"]])
|
||||||
|
|
||||||
|
|
||||||
|
class TimeControl:
|
||||||
|
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
|
||||||
|
self.is_enabled: bool = is_enabled
|
||||||
|
self.time_schedules: List[Schedule] = time_schedules
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {
|
||||||
|
"isEnabled": self.is_enabled,
|
||||||
|
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
data = {}
|
||||||
|
for schedule in self.time_schedules:
|
||||||
|
data.update(schedule.to_web_data(device_id))
|
||||||
|
return data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(time_control: Dict):
|
||||||
|
return TimeControl(time_control["isEnabled"],
|
||||||
|
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
|
||||||
|
|
||||||
|
|
||||||
|
class ThermostatSkill(Skill):
|
||||||
|
def __init__(self, presets: List[Preset], next_change: Optional[Change],
|
||||||
|
temperature_drop_detection: TemperatureDropDetection,
|
||||||
|
target_temp: int, time_control: TimeControl,
|
||||||
|
mode: ThermostatSkillMode, used_temp_sensor: Unit):
|
||||||
|
super().__init__(SkillType.SmartHomeThermostat)
|
||||||
|
self.presets: List[Preset] = presets
|
||||||
|
self.next_change: Optional[Change] = next_change
|
||||||
|
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
|
||||||
|
self.target_temp: int = target_temp
|
||||||
|
self.time_control: TimeControl = time_control
|
||||||
|
self.mode: ThermostatSkillMode = mode
|
||||||
|
self.used_temp_sensor: Unit = used_temp_sensor
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
# ToDo
|
||||||
|
pass
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
upper = 0.0
|
||||||
|
lower = 0.0
|
||||||
|
for preset in self.presets:
|
||||||
|
if preset.name == PresetName.UPPER_TEMPERATURE:
|
||||||
|
upper = preset.temperature
|
||||||
|
elif preset.name == PresetName.LOWER_TEMPERATURE:
|
||||||
|
lower = preset.temperature
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"Heiztemp": upper,
|
||||||
|
"Absenktemp": lower
|
||||||
|
}
|
||||||
|
|
||||||
|
if device_id == self.used_temp_sensor.id:
|
||||||
|
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
|
||||||
|
else:
|
||||||
|
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
|
||||||
|
|
||||||
|
data.update(self.time_control.to_web_data(device_id))
|
||||||
|
|
||||||
|
data.update(self.temperature_drop_detection.to_web_data(device_id))
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
state = {
|
||||||
|
"type": self.type.name,
|
||||||
|
"presets": [preset.to_json() for preset in self.presets],
|
||||||
|
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
|
||||||
|
"targetTemp": self.target_temp,
|
||||||
|
"timeControl": self.time_control.to_json(),
|
||||||
|
"mode": self.mode.name,
|
||||||
|
"usedTempSensor": self.used_temp_sensor.to_json()
|
||||||
|
}
|
||||||
|
if self.next_change is not None:
|
||||||
|
state["nextChange"] = self.next_change.to_json()
|
||||||
|
|
||||||
|
return state
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(skill: Dict):
|
||||||
|
return ThermostatSkill(
|
||||||
|
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
|
||||||
|
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
|
||||||
|
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
|
||||||
|
skill["targetTemp"],
|
||||||
|
TimeControl.parse_dict(skill["timeControl"]),
|
||||||
|
ThermostatSkillMode[skill["mode"]],
|
||||||
|
Unit.parse_dict(skill["usedTempSensor"], None)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TemperatureSkill(Skill):
|
||||||
|
def __init__(self, offset: float, current_in_celsius: int):
|
||||||
|
super().__init__(SkillType.SmartHomeTemperatureSensor)
|
||||||
|
self.offset: float = offset
|
||||||
|
self.current_in_celsius: int = current_in_celsius
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"{self.type.name}: " \
|
||||||
|
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {
|
||||||
|
"type": self.type.name,
|
||||||
|
"offset": self.offset,
|
||||||
|
"currentInCelsius": self.current_in_celsius
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
data = {
|
||||||
|
"Roomtemp": self.current_in_celsius,
|
||||||
|
"Offset": self.offset
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(skill: Dict):
|
||||||
|
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
|
||||||
|
|
||||||
|
|
||||||
|
class BatterySkill(Skill):
|
||||||
|
def __init__(self, charge_level_in_percent: int):
|
||||||
|
super().__init__(SkillType.SmartHomeBattery)
|
||||||
|
self.charge_level_in_percent: int = charge_level_in_percent
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(skill: Dict):
|
||||||
|
return BatterySkill(skill["chargeLevelInPercent"])
|
||||||
|
|
||||||
|
|
||||||
|
class UnitTypes(Enum):
|
||||||
|
BATTERY = auto()
|
||||||
|
TEMPERATURE_SENSOR = auto()
|
||||||
|
THERMOSTAT = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class Unit:
|
||||||
|
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
|
||||||
|
device: Optional[Device], skills: List[Union[Dict, Skill]],
|
||||||
|
interaction_controls: Optional[List] = None):
|
||||||
|
self.type: UnitTypes = unit_type
|
||||||
|
self.id: int = idx
|
||||||
|
self.display_name: str = display_name
|
||||||
|
self.device: Optional[Device] = device
|
||||||
|
self.skills: List[Skill] = skills
|
||||||
|
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
|
||||||
|
if self.device is not None:
|
||||||
|
desc += f"device: {self.device.short_description()}; "
|
||||||
|
desc += f"skills: {self.skills}; "
|
||||||
|
if self.interaction_controls is not None:
|
||||||
|
desc += f"interactionControls: {self.interaction_controls} ;"
|
||||||
|
return desc
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
state = {
|
||||||
|
"type": self.type.name,
|
||||||
|
"id": self.id,
|
||||||
|
"displayName": self.display_name,
|
||||||
|
"skills": [skill.to_json() for skill in self.skills]
|
||||||
|
}
|
||||||
|
if self.device is not None:
|
||||||
|
state["device"] = self.device.to_short_json()
|
||||||
|
if self.interaction_controls is not None:
|
||||||
|
state["interactionControls"] = self.interaction_controls
|
||||||
|
|
||||||
|
return state
|
||||||
|
|
||||||
|
def to_web_data(self, device_id: int):
|
||||||
|
data = {}
|
||||||
|
for skill in self.skills:
|
||||||
|
data.update(skill.to_web_data(device_id))
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_dict(unit: Dict, device: Optional[Device]):
|
||||||
|
return Unit(UnitTypes[unit["type"]],
|
||||||
|
unit["id"],
|
||||||
|
unit["displayName"],
|
||||||
|
device,
|
||||||
|
[Skill.parse_dict(skill) for skill in unit["skills"]],
|
||||||
|
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceType(Enum):
|
||||||
|
SmartHomeDevice = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceCategory(Enum):
|
||||||
|
THERMOSTAT = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class Device:
|
||||||
|
def __init__(self, state):
|
||||||
|
self.type: Optional[DeviceType] = None
|
||||||
|
self.is_deletable: bool = False
|
||||||
|
self.id: int = -1
|
||||||
|
self.master_connection_state = None # ToDo
|
||||||
|
self.display_name: Optional[str] = None
|
||||||
|
self.category: Optional[str] = None
|
||||||
|
self.units = None # ToDo
|
||||||
|
self.firmware_version: Optional[FirmwareVersion] = None
|
||||||
|
self.model: Optional[str] = None
|
||||||
|
self.is_editable: bool = False
|
||||||
|
self.manufacturer: Optional[Manufacturer] = None
|
||||||
|
self.push_service: Optional[PushService] = None
|
||||||
|
self.actor_identification_number: Optional[str] = None
|
||||||
|
|
||||||
|
if state is not None:
|
||||||
|
self.parse_state(state)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
# ToDo
|
||||||
|
pass
|
||||||
|
|
||||||
|
def short_description(self):
|
||||||
|
desc = f"masterConnectionState: {self.master_connection_state}; " \
|
||||||
|
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
|
||||||
|
f"manufacturer: {self.manufacturer}; " \
|
||||||
|
f"actorIdentificationNumber: {self.actor_identification_number}; " \
|
||||||
|
f"displayName: {self.display_name}"
|
||||||
|
|
||||||
|
return desc
|
||||||
|
|
||||||
|
def parse_state(self, state):
|
||||||
|
self.type = DeviceType[state["type"]]
|
||||||
|
self.is_deletable = state["isDeletable"]
|
||||||
|
self.id = state["id"]
|
||||||
|
self.master_connection_state = state["masterConnectionState"]
|
||||||
|
self.display_name = state["displayName"]
|
||||||
|
self.category = state["category"]
|
||||||
|
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
|
||||||
|
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
|
||||||
|
self.model = state["model"]
|
||||||
|
self.is_editable = state["isEditable"]
|
||||||
|
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
|
||||||
|
self.push_service = PushService.parse_dict(state["pushService"])
|
||||||
|
self.actor_identification_number = state["actorIdentificationNumber"]
|
||||||
|
|
||||||
|
def to_web_data(self):
|
||||||
|
data = {
|
||||||
|
"device": self.id,
|
||||||
|
"ule_device_name": self.display_name
|
||||||
|
}
|
||||||
|
for unit in self.units:
|
||||||
|
data.update(unit.to_web_data(self.id))
|
||||||
|
return data
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
state = {"type": self.type.name,
|
||||||
|
"isDeletable": self.is_deletable,
|
||||||
|
"id": self.id,
|
||||||
|
"masterConnectionState": self.master_connection_state,
|
||||||
|
"displayName": self.display_name,
|
||||||
|
"category": self.category,
|
||||||
|
"units": [unit.to_json() for unit in self.units],
|
||||||
|
"firmwareVersion": self.firmware_version.to_json(),
|
||||||
|
"model": self.model,
|
||||||
|
"isEditable": self.is_editable,
|
||||||
|
"manufacturer": self.manufacturer.to_json(),
|
||||||
|
"pushService": self.push_service.to_json(),
|
||||||
|
"actorIdentificationNumber": self.actor_identification_number}
|
||||||
|
|
||||||
|
return state
|
||||||
|
|
||||||
|
def set_offset(self, offset: float):
|
||||||
|
temp_sensor: Optional[Unit] = None
|
||||||
|
for unit in self.units:
|
||||||
|
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||||
|
temp_sensor = unit
|
||||||
|
break
|
||||||
|
temp_skill: Optional[TemperatureSkill] = None
|
||||||
|
for skill in temp_sensor.skills:
|
||||||
|
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||||
|
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||||
|
break
|
||||||
|
temp_skill.offset = offset
|
||||||
|
|
||||||
|
def get_offset(self):
|
||||||
|
temp_sensor: Optional[Unit] = None
|
||||||
|
for unit in self.units:
|
||||||
|
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||||
|
temp_sensor = unit
|
||||||
|
break
|
||||||
|
temp_skill: Optional[TemperatureSkill] = None
|
||||||
|
for skill in temp_sensor.skills:
|
||||||
|
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||||
|
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||||
|
break
|
||||||
|
return temp_skill.offset
|
||||||
|
|
||||||
|
def get_temperature(self):
|
||||||
|
temp_sensor: Optional[Unit] = None
|
||||||
|
for unit in self.units:
|
||||||
|
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
|
||||||
|
temp_sensor = unit
|
||||||
|
break
|
||||||
|
temp_skill: Optional[TemperatureSkill] = None
|
||||||
|
for skill in temp_sensor.skills:
|
||||||
|
if skill.type == SkillType.SmartHomeTemperatureSensor:
|
||||||
|
temp_skill = typing.cast(TemperatureSkill, skill)
|
||||||
|
break
|
||||||
|
return temp_skill.current_in_celsius
|
||||||
|
|
||||||
|
def to_short_json(self):
|
||||||
|
return {
|
||||||
|
"masterConnectionState": self.master_connection_state,
|
||||||
|
"type": self.type.name,
|
||||||
|
"model": self.model,
|
||||||
|
"id": self.id,
|
||||||
|
"manufacturer": self.manufacturer.to_json(),
|
||||||
|
"actorIdentificationNumber": self.actor_identification_number,
|
||||||
|
"displayName": self.display_name
|
||||||
|
}
|
@ -1,28 +1,49 @@
|
|||||||
from typing import Optional, Tuple
|
from __future__ import annotations
|
||||||
import requests
|
|
||||||
import json
|
import asyncio
|
||||||
import re
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import xml.etree.ElementTree as ET
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from asyncio import Task
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Optional, Dict, List
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from device import Device
|
||||||
|
|
||||||
class FritzBox:
|
class FritzBox:
|
||||||
def __init__(self, url:str, password:str, user:str = None) -> None:
|
def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False) -> None:
|
||||||
self._endpoints = {
|
self._endpoints = {
|
||||||
"login": "login_sid.lua?version=2",
|
"login": "login_sid.lua?version=2",
|
||||||
"logout": "index.lua",
|
"logout": "index.lua",
|
||||||
"data": "data.lua"
|
"data": "data.lua"
|
||||||
}
|
}
|
||||||
self.url = url
|
self.url: str = url
|
||||||
self.session = requests.Session()
|
self.dry_run: bool = dry_run
|
||||||
self.password = password
|
self.user: Optional[str] = user
|
||||||
self.sid = None
|
self.session: requests.Session = requests.Session()
|
||||||
|
self.password: str = password
|
||||||
|
self.sid: Optional[str] = None
|
||||||
|
self.update_timeout: int = update_timeout
|
||||||
|
self.update_time: Dict[str, datetime] = {}
|
||||||
|
self.hold_connection: Optional[Task] = None
|
||||||
|
|
||||||
|
async def hold_connection_alive(self) -> None:
|
||||||
|
while True:
|
||||||
|
# Session automatically destroyed after 20m of inactivity
|
||||||
|
# according to the manual
|
||||||
|
await asyncio.sleep(19 * 60)
|
||||||
|
self.check_session()
|
||||||
|
|
||||||
def _calc_challenge_v2(self, challenge: str) -> str:
|
def _calc_challenge_v2(self, challenge: str) -> str:
|
||||||
|
|
||||||
logging.debug(f"Calculate v2 challenge: {challenge}")
|
logging.debug(f"Calculate v2 challenge: {challenge}")
|
||||||
chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
chall_regex = re.compile(
|
||||||
|
r"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
|
||||||
|
|
||||||
chall_parts = chall_regex.match(challenge).groupdict()
|
chall_parts = chall_regex.match(challenge).groupdict()
|
||||||
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
|
salt1: bytes = bytes.fromhex(chall_parts["salt1"])
|
||||||
iter1: int = int(chall_parts["iter1"])
|
iter1: int = int(chall_parts["iter1"])
|
||||||
@ -42,8 +63,24 @@ class FritzBox:
|
|||||||
response = challenge + "-" + hashlib.md5(response).hexdigest()
|
response = challenge + "-" + hashlib.md5(response).hexdigest()
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def login(self, user:str = None) -> bool:
|
def check_session(self) -> None:
|
||||||
logging.debug(f"login user {user}")
|
data = {
|
||||||
|
"xhr": 1,
|
||||||
|
"sid": self.sid,
|
||||||
|
"lang": "de",
|
||||||
|
"page": "overview",
|
||||||
|
"xhrId": "first",
|
||||||
|
"noMenuRef": 1
|
||||||
|
}
|
||||||
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||||
|
if len(r.history) > 0:
|
||||||
|
if not self.login():
|
||||||
|
logging.error("Failed to login to Fritz!Box")
|
||||||
|
else:
|
||||||
|
logging.debug("Already logged in")
|
||||||
|
|
||||||
|
def login(self) -> bool:
|
||||||
|
logging.info(f"Login user {self.user} to Fritz!Box")
|
||||||
challenge = None
|
challenge = None
|
||||||
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
|
r = self.session.get(f"{self.url}/{self._endpoints['login']}")
|
||||||
xml = ET.fromstring(r.text)
|
xml = ET.fromstring(r.text)
|
||||||
@ -52,12 +89,12 @@ class FritzBox:
|
|||||||
self.sid = elem.text
|
self.sid = elem.text
|
||||||
elif elem.tag == "Challenge":
|
elif elem.tag == "Challenge":
|
||||||
challenge = elem.text
|
challenge = elem.text
|
||||||
elif user is None and elem.tag == "Users":
|
elif self.user is None and elem.tag == "Users":
|
||||||
for user_elem in elem:
|
for user_elem in elem:
|
||||||
if "fritz" in user_elem.text:
|
if "fritz" in user_elem.text:
|
||||||
user = user_elem.text
|
self.user = user_elem.text
|
||||||
|
|
||||||
assert challenge is not None and user is not None
|
assert challenge is not None and self.user is not None
|
||||||
|
|
||||||
if challenge.startswith("2$"):
|
if challenge.startswith("2$"):
|
||||||
response = self._calc_challenge_v2(challenge)
|
response = self._calc_challenge_v2(challenge)
|
||||||
@ -65,7 +102,7 @@ class FritzBox:
|
|||||||
response = self._calc_challenge_v1(challenge)
|
response = self._calc_challenge_v1(challenge)
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"username": user,
|
"username": self.user,
|
||||||
"response": response
|
"response": response
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,27 +112,31 @@ class FritzBox:
|
|||||||
for elem in xml:
|
for elem in xml:
|
||||||
if elem.tag == "SID":
|
if elem.tag == "SID":
|
||||||
self.sid = elem.text
|
self.sid = elem.text
|
||||||
|
|
||||||
logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}")
|
logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
|
||||||
|
if len(self.sid) != self.sid.count("0"):
|
||||||
|
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
|
||||||
return len(self.sid) != self.sid.count("0")
|
return len(self.sid) != self.sid.count("0")
|
||||||
|
|
||||||
def logout(self) -> bool:
|
def logout(self) -> bool:
|
||||||
logging.debug("logout")
|
logging.info("logout")
|
||||||
data = {
|
data = {
|
||||||
"xhr":1,
|
"xhr": 1,
|
||||||
"sid": self.sid,
|
"sid": self.sid,
|
||||||
"logout": 1,
|
"logout": 1,
|
||||||
"no_sidrenew":""}
|
"no_sidrenew": ""}
|
||||||
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
|
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data)
|
||||||
|
if self.hold_connection is not None:
|
||||||
|
self.hold_connection.cancel()
|
||||||
|
|
||||||
return r.status_code == 200
|
return r.status_code == 200
|
||||||
|
|
||||||
def list_devices(self):
|
def list_devices(self) -> Optional[List[Device]]:
|
||||||
data = {
|
data = {
|
||||||
"xhr": 1,
|
"xhr": 1,
|
||||||
"sid": self.sid,
|
"sid": self.sid,
|
||||||
"lang": "de",
|
"lang": "de",
|
||||||
"page":"sh_dev",
|
"page": "sh_dev",
|
||||||
"xhrId": "all"
|
"xhrId": "all"
|
||||||
}
|
}
|
||||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||||
@ -105,62 +146,68 @@ class FritzBox:
|
|||||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
devices = json.loads(r.text)["data"]["devices"]
|
devices: List[Device] = []
|
||||||
|
for device in json.loads(r.text)["data"]["devices"]:
|
||||||
|
devices.append(Device(device))
|
||||||
|
|
||||||
return devices
|
return devices
|
||||||
|
|
||||||
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]:
|
def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
|
||||||
if id is None and name is None:
|
if idx is None and name is None:
|
||||||
logging.debug("No id or name given")
|
logging.debug("No id or name given")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
devices = self.list_devices()
|
devices = self.list_devices()
|
||||||
|
device = None
|
||||||
for device in devices:
|
for device in devices:
|
||||||
if device["id"] == id or device["displayName"] == name:
|
if device.id == idx or device.display_name == name:
|
||||||
break
|
break
|
||||||
device = None
|
device = None
|
||||||
|
|
||||||
if device is None:
|
if device is None:
|
||||||
logging.debug(f"Device {id} {name} not found")
|
logging.debug(f"Device {idx} {name} not found")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
current_temp = None
|
return device
|
||||||
current_offset = None
|
|
||||||
for unit in device["units"]:
|
|
||||||
if unit["type"] == "TEMPERATURE_SENSOR":
|
|
||||||
for skill in unit["skills"]:
|
|
||||||
if skill["type"] == "SmartHomeTemperatureSensor":
|
|
||||||
current_temp = float(skill["currentInCelsius"])
|
|
||||||
current_offset = float(skill["offset"])
|
|
||||||
|
|
||||||
return current_temp, current_offset, device["id"], device["displayName"]
|
def set_offset(self, device: Device) -> None:
|
||||||
|
if self.dry_run:
|
||||||
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str):
|
logging.warning("No updates in dry-run-mode")
|
||||||
|
return
|
||||||
data = {
|
data = {
|
||||||
"xhr": 1,
|
"xhr": 1,
|
||||||
"sid": self.sid,
|
"sid": self.sid,
|
||||||
"lang": "de",
|
"lang": "de",
|
||||||
"device": device_id,
|
"device": device.id,
|
||||||
"page": "home_auto_hkr_edit"
|
"page": "home_auto_hkr_edit"
|
||||||
}
|
}
|
||||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"xhr":1,
|
"xhr": 1,
|
||||||
"sid": self.sid,
|
"sid": self.sid,
|
||||||
"lang": "de",
|
"lang": "de",
|
||||||
"device": device_id,
|
|
||||||
"view": "",
|
"view": "",
|
||||||
"back_to_page": "sh_dev",
|
"back_to_page": "sh_dev",
|
||||||
"ule_device_name": device_name,
|
"apply": "",
|
||||||
"WindowOpenTrigger":8,
|
"oldpage": "/net/home_auto_hkr_edit.lua"
|
||||||
"WindowOpenTimer":10,
|
|
||||||
"tempsensor": "own",
|
|
||||||
"Roomtemp": f"{current_temp}",
|
|
||||||
"ExtTempsensorID":"tochoose",
|
|
||||||
"Offset": f"{offset}",
|
|
||||||
"apply":"",
|
|
||||||
"oldpage":"/net/home_auto_hkr_edit.lua"
|
|
||||||
}
|
}
|
||||||
|
data.update(device.to_web_data())
|
||||||
|
|
||||||
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data)
|
||||||
|
|
||||||
|
def correct_offset(self, device_name: str, real_temp: float):
|
||||||
|
elapsed = None
|
||||||
|
if device_name in self.update_time.keys():
|
||||||
|
elapsed = datetime.now() - self.update_time[device_name]
|
||||||
|
logging.info(f"Last update for {device_name} {elapsed} ago")
|
||||||
|
delta = timedelta(minutes=self.update_timeout)
|
||||||
|
if device_name not in self.update_time.keys() or elapsed > delta:
|
||||||
|
device: Optional[Device] = self.get_device_data(name=device_name)
|
||||||
|
if device is None:
|
||||||
|
return
|
||||||
|
new_offset = device.get_offset() + real_temp - device.get_temperature()
|
||||||
|
logging.info(f"Update offset from {device.get_offset()} to {new_offset}")
|
||||||
|
device.set_offset(new_offset)
|
||||||
|
self.set_offset(device)
|
||||||
|
self.update_time[device.display_name] = datetime.now()
|
||||||
|
@ -1,66 +1,152 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from typing import Callable
|
from asyncio import Queue, Task, Event, Lock
|
||||||
import websocket
|
from typing import Callable, Dict, Optional
|
||||||
import time
|
import websockets
|
||||||
|
from websockets.exceptions import InvalidStatusCode
|
||||||
|
|
||||||
|
|
||||||
class HomeAssistantAPI:
|
class HomeAssistantAPI:
|
||||||
def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None:
|
def __init__(self, token: str, url: str) -> None:
|
||||||
self.token = token
|
self.token = token
|
||||||
self.msg_id = 1
|
self.msg_id = 1
|
||||||
self.ws = None
|
self.msg_id_lock = Lock()
|
||||||
self.subscriptions = {}
|
self.ws: websockets.WebSocketClientProtocol = None
|
||||||
self.init_callback = initialize
|
self.url = url
|
||||||
|
self.receiver: Optional[Task] = None
|
||||||
|
self.sender: Optional[Task] = None
|
||||||
|
self.sending_queue: Queue = Queue()
|
||||||
|
self.authenticated: Event = Event()
|
||||||
|
self.events: Dict[int, Queue] = {}
|
||||||
|
self.responses: Dict[int, Dict] = {}
|
||||||
|
self.response_events: Dict[int, Event] = {}
|
||||||
|
self.response_lock: Lock = Lock()
|
||||||
|
|
||||||
def handle_message(self, ws: websocket.WebSocket, msg: str) -> None:
|
async def connect(self):
|
||||||
if self.ws is None:
|
retries = 5
|
||||||
self.ws = ws
|
logging.info("Connect to home assistant...")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self.ws = await websockets.connect(self.url)
|
||||||
|
self.sender = asyncio.create_task(self.sending())
|
||||||
|
await self.auth()
|
||||||
|
self.receiver = asyncio.create_task(self.receiving())
|
||||||
|
return True
|
||||||
|
except (InvalidStatusCode, TimeoutError):
|
||||||
|
if retries > 0:
|
||||||
|
retries -= 1
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
logging.info(f"Retry home assistant connection... ({retries})")
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
logging.error("Invalid status code while connecting to Home Assistant")
|
||||||
|
await self.exit_loop()
|
||||||
|
return False
|
||||||
|
|
||||||
message: object = json.loads(msg)
|
async def wait_for_close(self):
|
||||||
|
await self.ws.wait_closed()
|
||||||
if message["type"] == "auth_required":
|
|
||||||
response = {
|
|
||||||
"type": "auth",
|
|
||||||
"access_token": self.token
|
|
||||||
}
|
|
||||||
logging.debug(response)
|
|
||||||
ws.send(json.dumps(response))
|
|
||||||
return
|
|
||||||
elif message["type"] == "auth_invalid":
|
|
||||||
logging.info("Auth failed")
|
|
||||||
ws.close()
|
|
||||||
return None
|
|
||||||
elif message["type"] == "auth_ok":
|
|
||||||
logging.debug("Authenticated")
|
|
||||||
self.init_callback(self)
|
|
||||||
self.init_callback = None
|
|
||||||
return
|
|
||||||
elif message["type"] == "event":
|
|
||||||
event = message["event"]
|
|
||||||
if event["event_type"] in self.subscriptions.keys():
|
|
||||||
self.subscriptions[event["event_type"]](event)
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("Received", message)
|
|
||||||
|
|
||||||
def subscribe_event(self, event_type: str, callback: Callable[[object], None]):
|
|
||||||
|
|
||||||
if self.ws is None:
|
async def receiving(self):
|
||||||
logging.debug("Websocket not set")
|
logging.debug("Start receiving")
|
||||||
return
|
async for message in self.ws:
|
||||||
|
msg: Dict = json.loads(message)
|
||||||
|
if msg["type"] == "event":
|
||||||
|
if msg["id"] not in self.events.keys():
|
||||||
|
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
|
||||||
|
continue
|
||||||
|
await self.events[msg["id"]].put(msg["event"])
|
||||||
|
else:
|
||||||
|
async with self.response_lock:
|
||||||
|
self.responses[msg["id"]] = msg
|
||||||
|
if msg["id"] in self.response_events.keys():
|
||||||
|
self.response_events[msg["id"]].set()
|
||||||
|
|
||||||
if event_type in self.subscriptions.keys():
|
async def wait_for(self, idx):
|
||||||
logging.warning(f"Already subscribed to {event_type}")
|
async with self.response_lock:
|
||||||
return
|
if idx in self.responses.keys():
|
||||||
|
msg = self.responses[idx]
|
||||||
logging.info(f"Subscribe to {event_type}")
|
del self.responses[idx]
|
||||||
self.subscriptions[event_type] = callback
|
return msg
|
||||||
|
self.response_events[idx] = Event()
|
||||||
|
|
||||||
|
await self.response_events[idx].wait()
|
||||||
|
async with self.response_lock:
|
||||||
|
del self.response_events[idx]
|
||||||
|
if idx not in self.responses.keys():
|
||||||
|
logging.error("Response ID not found")
|
||||||
|
return None
|
||||||
|
msg = self.responses[idx]
|
||||||
|
del self.responses[idx]
|
||||||
|
return msg
|
||||||
|
|
||||||
|
async def exit_loop(self):
|
||||||
|
if self.sender is not None:
|
||||||
|
self.sender.cancel()
|
||||||
|
if self.receiver is not None:
|
||||||
|
self.receiver.cancel()
|
||||||
|
|
||||||
|
async def auth(self):
|
||||||
|
msg = json.loads(await self.ws.recv())
|
||||||
|
if msg["type"] != "auth_required":
|
||||||
|
logging.error("Authentication error: Not required")
|
||||||
|
await self.exit_loop()
|
||||||
response = {
|
response = {
|
||||||
"id": self.msg_id,
|
"type": "auth",
|
||||||
"type": "subscribe_events",
|
"access_token": self.token
|
||||||
"event_type": event_type
|
|
||||||
}
|
}
|
||||||
self.msg_id += 1
|
await self.sending_queue.put(response)
|
||||||
self.ws.send(json.dumps(response))
|
msg = json.loads(await self.ws.recv())
|
||||||
|
if msg["type"] == "auth_invalid":
|
||||||
|
logging.info("Auth failed")
|
||||||
|
await self.exit_loop()
|
||||||
|
elif msg["type"] == "auth_ok":
|
||||||
|
logging.debug("Authenticated")
|
||||||
|
self.authenticated.set()
|
||||||
|
else:
|
||||||
|
logging.error(f"Unknown answer for auth: {msg}")
|
||||||
|
await self.exit_loop()
|
||||||
|
|
||||||
|
async def sending(self):
|
||||||
|
while msg := await self.sending_queue.get():
|
||||||
|
await self.ws.send(json.dumps(msg))
|
||||||
|
|
||||||
|
async def subscribe_event(self, event_type: str):
|
||||||
|
await self.authenticated.wait()
|
||||||
|
|
||||||
|
logging.info(f"Subscribe to {event_type}")
|
||||||
|
async with self.msg_id_lock:
|
||||||
|
msg_id = self.msg_id
|
||||||
|
response = {
|
||||||
|
"id": msg_id,
|
||||||
|
"type": "subscribe_events",
|
||||||
|
"event_type": event_type
|
||||||
|
}
|
||||||
|
self.events[msg_id] = Queue()
|
||||||
|
self.msg_id += 1
|
||||||
|
await self.sending_queue.put(response)
|
||||||
|
return msg_id
|
||||||
|
|
||||||
|
async def get_states(self):
|
||||||
|
await self.authenticated.wait()
|
||||||
|
async with self.msg_id_lock:
|
||||||
|
message = {
|
||||||
|
"id": self.msg_id,
|
||||||
|
"type": "get_states"
|
||||||
|
}
|
||||||
|
self.msg_id += 1
|
||||||
|
await self.sending_queue.put(message)
|
||||||
|
|
||||||
|
response = await self.wait_for(message["id"])
|
||||||
|
# ToDo: Error handling
|
||||||
|
return response["result"]
|
||||||
|
|
||||||
|
async def get_device_state(self, entity_id: str):
|
||||||
|
device_states = await self.get_states()
|
||||||
|
for device_state in device_states:
|
||||||
|
if device_state["entity_id"] == entity_id:
|
||||||
|
return device_state
|
||||||
|
|
||||||
|
return None
|
||||||
|
@ -1,3 +1,3 @@
|
|||||||
#!/usr/bin/env bashio
|
#!/usr/bin/with-contenv bashio
|
||||||
|
|
||||||
python3 /srv/sync_ha_fb.py
|
python3 /srv/sync_ha_fb.py /data/options.json
|
@ -1,78 +1,125 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
|
||||||
hier die verbindungen zu HA aufbauen etc
|
import asyncio
|
||||||
außerdem das vergleichen der werte und dass anstoßen der updates
|
import json
|
||||||
"""
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
from fritzbox import FritzBox
|
from fritzbox import FritzBox
|
||||||
from homeassistant import HomeAssistantAPI
|
from homeassistant import HomeAssistantAPI
|
||||||
import logging
|
|
||||||
import websocket
|
|
||||||
import json
|
|
||||||
|
|
||||||
mappings = {}
|
sensor_mappings = {}
|
||||||
|
thermostate_mappings = {}
|
||||||
|
|
||||||
def handle_event(event):
|
|
||||||
entity_id = event["data"]["entity_id"]
|
|
||||||
if entity_id in mappings.keys():
|
|
||||||
new_state = event["data"]["new_state"]
|
|
||||||
logging.debug(entity_id)
|
|
||||||
logging.debug(new_state["attributes"]["temperature"])
|
|
||||||
rounded = round(float(new_state["attributes"]["temperature"])*2)/2
|
|
||||||
logging.debug(rounded)
|
|
||||||
if new_state["attributes"]["device_class"] == "temperature":
|
|
||||||
if entity_id in mappings.keys():
|
|
||||||
fb.login()
|
|
||||||
logged = False
|
|
||||||
for thermostate in mappings[entity_id]:
|
|
||||||
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
|
|
||||||
if not logged:
|
|
||||||
logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})")
|
|
||||||
logged = True
|
|
||||||
logging.info(f"Current measurement from {thermostate}: {current_temp}")
|
|
||||||
new_offset = current_offset + rounded - current_temp
|
|
||||||
if new_offset != current_offset:
|
|
||||||
old_offset = current_offset
|
|
||||||
logging.debug(f"Set offset for {thermostate} from {current_offset} to {new_offset}")
|
|
||||||
fb.set_offset(current_temp, new_offset, id, name)
|
|
||||||
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
|
|
||||||
logging.debug(f"Target: {new_offset} ; Set: {current_offset}")
|
|
||||||
if new_offset == current_offset:
|
|
||||||
logging.info(f"Adjustet offset from {old_offset} to {new_offset}")
|
|
||||||
else:
|
|
||||||
logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}")
|
|
||||||
fb.logout()
|
|
||||||
|
|
||||||
def on_error(ws, error):
|
async def handle_event(idx: int):
|
||||||
print(error)
|
global ha, fb
|
||||||
|
logging.debug(f"Wait for events for {idx}")
|
||||||
|
|
||||||
def on_close(ws, close_status_code, close_msg):
|
while event := await ha.events[idx].get():
|
||||||
pass
|
try:
|
||||||
|
entity_id = event["data"]["entity_id"]
|
||||||
|
if (
|
||||||
|
entity_id in sensor_mappings.keys()
|
||||||
|
or entity_id in thermostate_mappings.keys()
|
||||||
|
):
|
||||||
|
state = await ha.get_device_state(entity_id)
|
||||||
|
new_state = event["data"]["new_state"]
|
||||||
|
logging.info(
|
||||||
|
f"received changed state from {entity_id} {entity_id in thermostate_mappings.keys()} {state['state']} {entity_id in sensor_mappings.keys()}"
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
entity_id in thermostate_mappings.keys()
|
||||||
|
and state["state"] != "unavailable"
|
||||||
|
):
|
||||||
|
therm_temp = new_state["attributes"]["current_temperature"]
|
||||||
|
therm_name = new_state["attributes"]["friendly_name"]
|
||||||
|
sensor = thermostate_mappings[entity_id]
|
||||||
|
sensor_state = await ha.get_device_state(sensor)
|
||||||
|
sensor_temp = round(float(sensor_state["state"]) * 2) / 2
|
||||||
|
logging.info(f"temps: {therm_temp} {sensor_temp}")
|
||||||
|
if therm_temp != sensor_temp:
|
||||||
|
logging.info(
|
||||||
|
f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state['state']} ({sensor_temp})"
|
||||||
|
)
|
||||||
|
fb.correct_offset(therm_name, sensor_temp)
|
||||||
|
|
||||||
def on_open(ws):
|
elif entity_id in sensor_mappings.keys():
|
||||||
pass
|
logging.info(f"here {sensor_mappings} {entity_id}")
|
||||||
|
logging.info(f"{new_state}")
|
||||||
|
sensor_temp = round(float(new_state["state"]) * 2) / 2
|
||||||
|
logging.info(f"entry: {sensor_mappings[entity_id]}")
|
||||||
|
for thermostate in sensor_mappings[entity_id]:
|
||||||
|
logging.info(thermostate)
|
||||||
|
therm_state = await ha.get_device_state(thermostate)
|
||||||
|
logging.info(f"{thermostate} {therm_state}")
|
||||||
|
if therm_state["state"] == "unavailable":
|
||||||
|
continue
|
||||||
|
therm_temp = float(
|
||||||
|
therm_state["attributes"]["current_temperature"]
|
||||||
|
)
|
||||||
|
therm_name = therm_state["attributes"]["friendly_name"]
|
||||||
|
logging.info(f"Temps: {therm_temp} {sensor_temp}")
|
||||||
|
if therm_temp != sensor_temp:
|
||||||
|
logging.info(
|
||||||
|
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state['state']} ({sensor_temp})"
|
||||||
|
)
|
||||||
|
fb.correct_offset(therm_name, sensor_temp)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
def init(ha: HomeAssistantAPI):
|
|
||||||
|
async def init(ha: HomeAssistantAPI, fb: FritzBox):
|
||||||
|
if not await ha.connect():
|
||||||
|
return
|
||||||
logging.debug("Subscribe")
|
logging.debug("Subscribe")
|
||||||
ha.subscribe_event("state_changed", handle_event)
|
state_changed_id = await ha.subscribe_event("state_changed")
|
||||||
|
logging.debug(state_changed_id)
|
||||||
|
asyncio.create_task(handle_event(state_changed_id))
|
||||||
|
fb.login()
|
||||||
|
await ha.wait_for_close()
|
||||||
|
logging.info("Websocket closed, shutting down..")
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s")
|
async def main():
|
||||||
config = json.load(open("/data/options.json"))
|
config_path = sys.argv[1]
|
||||||
logging.debug(config)
|
config = json.load(open(config_path))
|
||||||
for mapping in config["mappings"]:
|
level = logging.INFO
|
||||||
if mapping["sensor"] not in mappings.keys():
|
if "log_level" in config:
|
||||||
mappings[mapping["sensor"]] = []
|
print(f"Setting log_level {config['log_level']}")
|
||||||
mappings[mapping["sensor"]].append(mapping["thermostate"])
|
if config["log_level"] == "DEBUG":
|
||||||
|
level = logging.DEBUG
|
||||||
|
logging.basicConfig(
|
||||||
|
level=level, format="[%(asctime)s] [%(levelname)s] %(message)s"
|
||||||
|
)
|
||||||
|
logging.debug(config)
|
||||||
|
|
||||||
fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"])
|
global fb
|
||||||
ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init)
|
fb = FritzBox(
|
||||||
|
url=config["fritzbox"]["url"],
|
||||||
|
user=config["fritzbox"]["username"],
|
||||||
|
password=config["fritzbox"]["password"],
|
||||||
|
update_timeout=config["update_timeout"],
|
||||||
|
dry_run=False,
|
||||||
|
)
|
||||||
|
supervisor_url = "ws://supervisor/core/websocket"
|
||||||
|
if "SUPERVISOR_URL" in os.environ:
|
||||||
|
supervisor_url = os.environ["SUPERVISOR_URL"]
|
||||||
|
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
|
||||||
|
global ha
|
||||||
|
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
|
||||||
|
|
||||||
websocket.enableTrace(False)
|
for mapping in config["mappings"]:
|
||||||
ws = websocket.WebSocketApp("ws://supervisor/core/websocket",
|
if mapping["sensor"] not in sensor_mappings.keys():
|
||||||
on_open=on_open,
|
sensor_mappings[mapping["sensor"]] = []
|
||||||
on_message=ha.handle_message,
|
sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
|
||||||
on_error=on_error,
|
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
|
||||||
on_close=on_close)
|
logging.debug(f"Mappings: {sensor_mappings} {thermostate_mappings}")
|
||||||
|
try:
|
||||||
|
await init(ha, fb)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
ws.run_forever()
|
|
||||||
|
asyncio.run(main())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user