Compare commits

...

22 Commits

Author SHA1 Message Date
23455e593c fix: venv 2024-02-09 16:58:07 +01:00
5fc0754dec chore: bump version 2024-02-09 16:55:24 +01:00
ec9da6f042 fix: workdir pathh 2024-02-09 16:54:00 +01:00
e3442c0153 chore: update version 2024-02-09 16:49:59 +01:00
bc191a32c8 fix: add venv 2024-02-09 16:49:45 +01:00
251b12a78d fix: switch to text attribute instead of string 2024-02-09 16:36:35 +01:00
b74207f5bb just a little typo 2022-12-04 17:45:57 +01:00
9387dfbea9 more failure catching and prints 2022-12-04 16:45:29 +01:00
4fdf91ece7 More changes i guess 2022-12-04 14:08:41 +01:00
76fcec8112 Merge pull request 'old_fb' (#1) from old_fb into master
Reviewed-on: SecretMineDE/HOHA-Addons#1
2022-12-04 14:02:06 +01:00
962e2c84af More changes i guess 2022-12-04 13:59:02 +01:00
924eb57b34 now works with fb 7360 2022-12-04 13:58:53 +01:00
ce59adccf0 Update for FritzOS 7.50 2022-12-03 21:51:26 +01:00
52c0e90a52 Add delay to retry 2022-02-21 22:52:06 +01:00
f6600c7dda Add username to config #3
retry connection to HA
2022-02-21 22:31:24 +01:00
43c2f0284a Fix config 2022-02-05 23:58:21 +01:00
084bc5ba1c Fix link to HA 2022-02-05 15:45:53 +01:00
a1d7dfa9bd Fix communication with fb/device as class 2022-02-05 15:43:52 +01:00
1a561a45d7 Update offset in fb 2022-01-29 00:21:54 +01:00
8aeb8cb1a6 Update requirements 2022-01-28 22:55:29 +01:00
5a14d251b4 Fix 2022-01-28 22:54:12 +01:00
9bf21d04e3 Change to asyncio, keep connection to fb alive, use data from ha for comparing 2022-01-28 22:37:28 +01:00
8 changed files with 1215 additions and 204 deletions

5
.gitignore vendored
View File

@ -15,6 +15,7 @@ dist/
downloads/ downloads/
eggs/ eggs/
.eggs/ .eggs/
.venv/
lib/ lib/
lib64/ lib64/
parts/ parts/
@ -168,6 +169,7 @@ pip-selfcheck.json
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff # User-specific stuff
.idea/
.idea/**/workspace.xml .idea/**/workspace.xml
.idea/**/tasks.xml .idea/**/tasks.xml
.idea/**/usage.statistics.xml .idea/**/usage.statistics.xml
@ -239,3 +241,6 @@ fabric.properties
# Android studio 3.1+ serialized cache file # Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser .idea/caches/build_file_checksums.ser
# local config test
options.json
.DS_Store

View File

@ -3,14 +3,20 @@ FROM $BUILD_FROM
# Install requirements for add-on # Install requirements for add-on
RUN apk update && apk add --no-cache python3 py-pip RUN apk update && apk add --no-cache python3 py-pip
RUN python3 -m pip install websocket-client requests
WORKDIR /data # Prepare venv
RUN python3 -m venv /opt/venv
RUN source /opt/venv/bin/activate && \
python3 -m pip install websockets requests beautifulsoup4
WORKDIR /srv
# Copy data for add-on # Copy data for add-on
COPY sync_ha_fb.py /srv COPY sync_ha_fb.py /srv
COPY fritzbox.py /srv COPY fritzbox.py /srv
COPY homeassistant.py /srv COPY homeassistant.py /srv
COPY device.py /srv
COPY run.sh / COPY run.sh /
RUN chmod a+x /run.sh RUN chmod a+x /run.sh

View File

@ -1,8 +1,11 @@
name: "Fritz!Box Temperature Sync" name: "Fritz!Box Temperature Sync Dev"
description: "Sync Fritz!DECT thermostate temperatures with other sensors in Home Assistant" description: "Sync Fritz!DECT thermostat temperatures with other sensors in Home Assistant"
version: "0.1.0" version: "0.5.4"
slug: "fritz_temp_sync" startup: "application"
stage: "stable"
slug: "fritz_temp_sync_dev"
homeassistant_api: true homeassistant_api: true
init: false
arch: arch:
- aarch64 - aarch64
- amd64 - amd64
@ -13,13 +16,20 @@ options:
fritzbox: fritzbox:
url: "http://fritz.box" url: "http://fritz.box"
password: null password: null
verify_ssl: true
old_fb: false
mappings: mappings:
- sensor: null - sensor: null
thermostate: null thermostate: null
update_timeout: 15
schema: schema:
fritzbox: fritzbox:
url: url url: url
username: "str?"
password: str password: str
verify_ssl: bool
old_fb: bool
mappings: mappings:
- sensor: str - sensor: str
thermostate: str thermostate: str
update_timeout: int

714
fritz_temp_sync/device.py Executable file
View File

@ -0,0 +1,714 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum, IntFlag, auto
from typing import Dict, List, Optional, Union
import typing
class WeekDay(IntFlag):
MON = 0b1
TUE = 0b10
WED = 0b100
THU = 0b1000
FRI = 0b10000
SAT = 0b100000
SUN = 0b1000000
class Manufacturer:
def __init__(self, name: str):
self.name: str = name
def __repr__(self):
return f"name: {self.name}"
def to_json(self):
return {"name": self.name}
@staticmethod
def parse_dict(manufacturer: Dict):
return Manufacturer(manufacturer["name"])
class FirmwareVersion:
def __init__(self, search: bool, current: str, update: bool, running: bool):
self.search: bool = search
self.current: str = current
self.update: bool = update
self.running: bool = running
def __repr__(self):
return f"search: {self.search}; current: {self.current}; update: {self.update}; running: {self.running}"
def to_json(self):
return {"search": self.search, "current": self.current, "update": self.update, "running": self.running}
@staticmethod
def parse_dict(firmware_version: dict):
return FirmwareVersion(firmware_version["search"],
firmware_version["current"],
firmware_version["update"],
firmware_version["running"])
class PushService:
def __init__(self, mail_address: str, unit_settings: List, is_enabled: bool):
self.mail_address: str = mail_address
self.unit_settings: List = unit_settings
self.is_enabled: bool = is_enabled
def __repr__(self):
return f"<PushService MailAddress: {self.mail_address}; UnitSettings: {self.unit_settings}; " \
f"isEnabled: {self.is_enabled}>"
def to_json(self):
return {"mailAddress": self.mail_address, "unitSettings": self.unit_settings, "isEnabled": self.is_enabled}
@staticmethod
def parse_dict(push_service: Dict):
return PushService(push_service["mailAddress"], push_service["unitSettings"], push_service["isEnabled"])
class SkillType(Enum):
SmartHomeTemperatureSensor = auto()
SmartHomeThermostat = auto()
SmartHomeBattery = auto()
class Skill(ABC):
def __init__(self, skill_type: SkillType):
self.type: SkillType = skill_type
@abstractmethod
def to_json(self):
pass
@abstractmethod
def to_web_data(self, device_id: int):
pass
@staticmethod
def parse_dict(skill: Dict):
skill_type = SkillType[skill["type"]]
if skill_type == SkillType.SmartHomeTemperatureSensor:
return TemperatureSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeThermostat:
return ThermostatSkill.parse_dict(skill)
elif skill_type == SkillType.SmartHomeBattery:
return BatterySkill.parse_dict(skill)
else:
raise NotImplementedError(skill_type)
class PresetName(Enum):
LOWER_TEMPERATURE = auto()
UPPER_TEMPERATURE = auto()
HOLIDAY_TEMPERATURE = auto()
class Preset:
def __init__(self, name: PresetName, temperature: Optional[int]):
self.name: PresetName = name
self.temperature: Optional[int] = temperature
def __repr__(self):
return f"<Preset name: {self.name.name}; temperature: {self.temperature}>"
def to_json(self):
return {"name": self.name.name, "temperature": self.temperature}
class Description:
def __init__(self, action: str, preset_temperature: Optional[Preset]):
self.action: str = action
self.preset_temperature: Optional[Preset] = preset_temperature
def __repr__(self):
desc = f"<Description action: {self.action}"
if self.preset_temperature is not None:
desc += f"; presetTemperaturet: {self.preset_temperature}"
desc += " >"
return desc
def to_json(self):
if self.preset_temperature is not None:
return {"action": self.action, "presetTemperature": self.preset_temperature.to_json()}
return {"action": self.action}
@staticmethod
def parse_dict(description: Dict):
preset: Optional[Preset] = None
if "presetTemperature" in description.keys():
preset = Preset(PresetName[description["presetTemperature"]["name"]],
description["presetTemperature"]["temperature"] if "temperature" in description["presetTemperature"] else None)
return Description(description["action"], preset)
class Repetition(Enum):
NONE = auto()
YEARLY = auto()
class TimeSetting:
def __init__(self, start_date: Optional[str], start_time: Optional[str],
end_date: Optional[str] = None, end_time: Optional[str] = None,
repetition: Repetition = Repetition.NONE, day_of_week: Optional[WeekDay] = None):
self.start_date: Optional[str] = start_date
self.start_time: Optional[str] = start_time
self.end_date: Optional[str] = end_date
self.end_time: Optional[str] = end_time
self.repetition: Repetition = repetition
self.day_of_week: Optional[WeekDay] = day_of_week
def __repr__(self):
desc = f"<TimeSettings "
if self.day_of_week is not None:
desc += f"dayOfWeek: {self.day_of_week.name}; "
if self.start_date is not None:
desc += f"startDate: {self.start_date} "
if self.start_time is not None:
desc += f"startTime: {self.start_time} "
if self.end_date is not None:
desc += f"endDate: {self.end_date} "
if self.end_time is not None:
desc += f"endTime: {self.end_time} "
if self.repetition != Repetition.NONE:
desc += f"repetition: {self.repetition.name} "
desc += ">"
return desc
def to_json(self): # ToDo: Return typehints for all to_json
state = {}
if self.start_date is not None:
state["startDate"] = self.start_date
if self.start_time is not None:
state["startTime"] = self.start_time
if self.end_date is not None:
state["endDate"] = self.end_date
if self.end_time is not None:
state["endTime"] = self.end_time
if self.repetition != Repetition.NONE:
state["repetition"] = self.repetition.name
if self.day_of_week is not None:
state = {"dayOfWeek": self.day_of_week.name,
"time": state}
return state
@staticmethod
def parse_dict(time_setting: Dict):
start_date: Optional[str] = None
start_time: Optional[str] = None
end_date: Optional[str] = None
end_time: Optional[str] = None
day_of_week: Optional[WeekDay] = None
repetition: Repetition = Repetition.NONE
if "dayOfWeek" in time_setting.keys():
day_of_week = WeekDay[time_setting["dayOfWeek"]]
time_setting = time_setting["time"]
if "startDate" in time_setting.keys():
start_date = time_setting["startDate"]
if "startTime" in time_setting.keys():
start_time = time_setting["startTime"]
if "endDate" in time_setting.keys():
end_date = time_setting["endDate"]
if "endTime" in time_setting.keys():
end_time = time_setting["endTime"]
if "repetition" in time_setting.keys():
repetition = Repetition[time_setting["repetition"]]
return TimeSetting(start_date, start_time, end_date, end_time, repetition, day_of_week)
class Change:
def __init__(self, description: Description, time_setting: TimeSetting):
self.description: Description = description
self.time_setting: TimeSetting = time_setting
def __repr__(self):
return f"<Change description: {self.description}; timeSetting: {self.time_setting}>"
def to_json(self):
return {"description": self.description.to_json(), "timeSetting": self.time_setting.to_json()}
@staticmethod
def parse_dict(change: Dict):
return Change(Description.parse_dict(change["description"]), TimeSetting.parse_dict(change["timeSetting"]))
class TemperatureDropDetection:
def __init__(self, do_not_heat_offset_in_minutes: int, sensitivity: int):
self.do_not_heat_offset_in_minutes: int = do_not_heat_offset_in_minutes
self.sensitivity: int = sensitivity
def __repr__(self):
# ToDo
pass
def to_json(self):
return {
"doNotHeatOffsetInMinutes": self.do_not_heat_offset_in_minutes,
"sensitivity": self.sensitivity
}
def to_web_data(self, device_id: int):
return {
"WindowOpenTrigger": self.sensitivity + 3,
"WindowOpenTimer": self.do_not_heat_offset_in_minutes
}
@staticmethod
def parse_dict(drop_detection: Dict):
return TemperatureDropDetection(drop_detection["doNotHeatOffsetInMinutes"],
drop_detection["sensitivity"])
class ScheduleKind(Enum):
REPETITIVE = auto()
WEEKLY_TIMETABLE = auto()
class ThermostatSkillMode(Enum):
TARGET_TEMPERATURE = auto()
class Action:
def __init__(self, is_enabled: bool, time_setting: TimeSetting, description: Description):
self.is_enabled: bool = is_enabled
self.time_setting: TimeSetting = time_setting
self.description: Description = description
def __repr__(self):
return f"<Action isEnabled: {self.is_enabled}; timeSetting: {self.time_setting}; " \
f"description: {self.description} >"
def to_json(self):
return {"isEnabled": self.is_enabled,
"timeSetting": self.time_setting.to_json(),
"description": self.description.to_json()}
@staticmethod
def parse_dict(action: Dict):
return Action(action["isEnabled"], TimeSetting.parse_dict(action["timeSetting"]),
Description.parse_dict(action["description"]))
class Schedule:
def __init__(self, is_enabled: bool, kind: ScheduleKind, name: str, actions: List[Action]):
self.is_enabled: bool = is_enabled
self.kind: ScheduleKind = kind
self.name: str = name
self.actions: List[Action] = actions
def __repr__(self):
return f"<Schedule isEnabled: {self.is_enabled}; kind: {self.kind.name}; " \
f"name: {self.name}; actions: {self.actions}>"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"kind": self.kind.name,
"name": self.name,
"actions": [action.to_json() for action in self.actions]
}
def to_web_data(self, device_id: int):
data = {}
if self.kind == ScheduleKind.REPETITIVE and self.name == "HOLIDAYS": # ToDo: Enum for names?
enabled_count = 0
for num, holiday in enumerate(self.actions):
num += 1
_, start_month, start_day = holiday.time_setting.start_date.split("-")
_, end_month, end_day = holiday.time_setting.end_date.split("-")
data.update({
f"Holiday{num}StartDay": start_day,
f"Holiday{num}StartMonth": start_month,
f"Holiday{num}StartHour": holiday.time_setting.start_time.split(":")[0],
f"Holiday{num}EndDay": end_day,
f"Holiday{num}EndMonth": end_month,
f"Holiday{num}EndHour": holiday.time_setting.end_time.split(":")[0],
f"Holiday{num}Enabled": 1 if holiday.is_enabled else 0,
f"Holiday{num}ID": num
})
if holiday.is_enabled:
enabled_count += 1
data["HolidayEnabledCount"] = enabled_count
elif self.kind == ScheduleKind.REPETITIVE and self.name == "SUMMER_TIME":
_, start_month, start_day = self.actions[0].time_setting.start_date.split("-")
_, end_month, end_day = self.actions[0].time_setting.end_date.split("-")
data = {
"SummerStartDay": start_day,
"SummerStartMonth": start_month,
"SummerEndDay": end_day,
"SummerEndMonth": end_month,
"SummerEnabled": 1 if self.is_enabled else 0
}
elif self.kind == ScheduleKind.WEEKLY_TIMETABLE and self.name == "TEMPERATURE":
timer_items = {}
for action in self.actions:
if not action.is_enabled:
continue
time = "".join(action.time_setting.start_time.split(":")[:-1])
if time not in timer_items.keys():
timer_items[time] = [0, 0]
heat = 1 if action.description.preset_temperature.name == PresetName.UPPER_TEMPERATURE else 0
days = timer_items[time][heat]
days |= action.time_setting.day_of_week
timer_items[time][heat] = days
num = 0
for key in timer_items.keys():
if timer_items[key][0] != 0:
data.update({f"timer_item_{num}": f"{key};0;{timer_items[key][0].as_integer_ratio()[0]}"})
num += 1
if timer_items[key][1] != 0:
data.update({f"timer_item_{num}": f"{key};1;{timer_items[key][1].as_integer_ratio()[0]}"})
num += 1
else:
raise NotImplementedError(self.name)
return data
@staticmethod
def parse_dict(schedule: Dict): # ToDo: Make TypedDicts for all those dicts
return Schedule(schedule["isEnabled"],
ScheduleKind[schedule["kind"]],
schedule["name"],
[Action.parse_dict(action) for action in schedule["actions"]])
class TimeControl:
def __init__(self, is_enabled: bool, time_schedules: List[Schedule]):
self.is_enabled: bool = is_enabled
self.time_schedules: List[Schedule] = time_schedules
def __repr__(self):
return f"<TimeControl isEnabled: {self.is_enabled}; timeSchedules: {self.time_schedules}"
def to_json(self):
return {
"isEnabled": self.is_enabled,
"timeSchedules": [schedule.to_json() for schedule in self.time_schedules]
}
def to_web_data(self, device_id: int):
data = {}
for schedule in self.time_schedules:
data.update(schedule.to_web_data(device_id))
return data
@staticmethod
def parse_dict(time_control: Dict):
return TimeControl(time_control["isEnabled"],
[Schedule.parse_dict(schedule) for schedule in time_control["timeSchedules"]])
class ThermostatSkill(Skill):
def __init__(self, presets: List[Preset], next_change: Optional[Change],
temperature_drop_detection: TemperatureDropDetection,
target_temp: int, time_control: TimeControl,
mode: ThermostatSkillMode, used_temp_sensor: Unit):
super().__init__(SkillType.SmartHomeThermostat)
self.presets: List[Preset] = presets
self.next_change: Optional[Change] = next_change
self.temperature_drop_detection: TemperatureDropDetection = temperature_drop_detection
self.target_temp: int = target_temp
self.time_control: TimeControl = time_control
self.mode: ThermostatSkillMode = mode
self.used_temp_sensor: Unit = used_temp_sensor
def __repr__(self):
# ToDo
pass
def to_web_data(self, device_id: int):
upper = 0.0
lower = 0.0
for preset in self.presets:
if preset.name == PresetName.UPPER_TEMPERATURE:
upper = preset.temperature
elif preset.name == PresetName.LOWER_TEMPERATURE:
lower = preset.temperature
data = {
"Heiztemp": upper,
"Absenktemp": lower
}
if device_id == self.used_temp_sensor.id:
data.update({"ExtTempsensorID": "tochoose", "tempsensor": "own"})
else:
data.update({"ExtTempsensorID": self.used_temp_sensor.id, "tempsensor": "extern"})
data.update(self.time_control.to_web_data(device_id))
data.update(self.temperature_drop_detection.to_web_data(device_id))
return data
def to_json(self):
state = {
"type": self.type.name,
"presets": [preset.to_json() for preset in self.presets],
"temperatureDropDetection": self.temperature_drop_detection.to_json(),
"targetTemp": self.target_temp,
"timeControl": self.time_control.to_json(),
"mode": self.mode.name,
"usedTempSensor": self.used_temp_sensor.to_json()
}
if self.next_change is not None:
state["nextChange"] = self.next_change.to_json()
return state
@staticmethod
def parse_dict(skill: Dict):
return ThermostatSkill(
[Preset(PresetName[preset["name"]], preset["temperature"]) for preset in skill["presets"]],
Change.parse_dict(skill["nextChange"]) if "nextChange" in skill.keys() else None,
TemperatureDropDetection.parse_dict(skill["temperatureDropDetection"]),
skill["targetTemp"],
TimeControl.parse_dict(skill["timeControl"]),
ThermostatSkillMode[skill["mode"]],
Unit.parse_dict(skill["usedTempSensor"], None)
)
class TemperatureSkill(Skill):
def __init__(self, offset: float, current_in_celsius: int):
super().__init__(SkillType.SmartHomeTemperatureSensor)
self.offset: float = offset
self.current_in_celsius: int = current_in_celsius
def __repr__(self):
return f"{self.type.name}: " \
f"offset: {self.offset}; currentInCelsius: {self.current_in_celsius}"
def to_json(self):
return {
"type": self.type.name,
"offset": self.offset,
"currentInCelsius": self.current_in_celsius
}
def to_web_data(self, device_id: int):
data = {
"Roomtemp": self.current_in_celsius,
"Offset": self.offset
}
return data
@staticmethod
def parse_dict(skill: Dict):
return TemperatureSkill(skill["offset"], skill["currentInCelsius"])
class BatterySkill(Skill):
def __init__(self, charge_level_in_percent: int):
super().__init__(SkillType.SmartHomeBattery)
self.charge_level_in_percent: int = charge_level_in_percent
def __repr__(self):
return f"{self.type.name}: chargeLevelInPercent: {self.charge_level_in_percent}"
def to_json(self):
return {"type": self.type.name, "chargeLevelInPercent": self.charge_level_in_percent}
def to_web_data(self, device_id: int):
return {}
@staticmethod
def parse_dict(skill: Dict):
return BatterySkill(skill["chargeLevelInPercent"])
class UnitTypes(Enum):
BATTERY = auto()
TEMPERATURE_SENSOR = auto()
THERMOSTAT = auto()
class Unit:
def __init__(self, unit_type: UnitTypes, idx: int, display_name: str,
device: Optional[Device], skills: List[Union[Dict, Skill]],
interaction_controls: Optional[List] = None):
self.type: UnitTypes = unit_type
self.id: int = idx
self.display_name: str = display_name
self.device: Optional[Device] = device
self.skills: List[Skill] = skills
self.interaction_controls: Optional[List] = interaction_controls # ToDo: Type for this
def __repr__(self):
desc = f"type: {self.type.name}; id: {self.id}; displayName: {self.display_name};"
if self.device is not None:
desc += f"device: {self.device.short_description()}; "
desc += f"skills: {self.skills}; "
if self.interaction_controls is not None:
desc += f"interactionControls: {self.interaction_controls} ;"
return desc
def to_json(self):
state = {
"type": self.type.name,
"id": self.id,
"displayName": self.display_name,
"skills": [skill.to_json() for skill in self.skills]
}
if self.device is not None:
state["device"] = self.device.to_short_json()
if self.interaction_controls is not None:
state["interactionControls"] = self.interaction_controls
return state
def to_web_data(self, device_id: int):
data = {}
for skill in self.skills:
data.update(skill.to_web_data(device_id))
return data
@staticmethod
def parse_dict(unit: Dict, device: Optional[Device]):
return Unit(UnitTypes[unit["type"]],
unit["id"],
unit["displayName"],
device,
[Skill.parse_dict(skill) for skill in unit["skills"]],
unit["interactionControls"] if "interactionControls" in unit.keys() else None)
class DeviceType(Enum):
SmartHomeDevice = auto()
class DeviceCategory(Enum):
THERMOSTAT = auto()
class Device:
def __init__(self, state):
self.type: Optional[DeviceType] = None
self.is_deletable: bool = False
self.id: int = -1
self.master_connection_state = None # ToDo
self.display_name: Optional[str] = None
self.category: Optional[str] = None
self.units = None # ToDo
self.firmware_version: Optional[FirmwareVersion] = None
self.model: Optional[str] = None
self.is_editable: bool = False
self.manufacturer: Optional[Manufacturer] = None
self.push_service: Optional[PushService] = None
self.actor_identification_number: Optional[str] = None
if state is not None:
self.parse_state(state)
def __repr__(self):
# ToDo
pass
def short_description(self):
desc = f"masterConnectionState: {self.master_connection_state}; " \
f"type: {self.type}; model: {self.model}; id: {self.id}; " \
f"manufacturer: {self.manufacturer}; " \
f"actorIdentificationNumber: {self.actor_identification_number}; " \
f"displayName: {self.display_name}"
return desc
def parse_state(self, state):
self.type = DeviceType[state["type"]]
self.is_deletable = state["isDeletable"]
self.id = state["id"]
self.master_connection_state = state["masterConnectionState"]
self.display_name = state["displayName"]
self.category = state["category"]
self.units = [Unit.parse_dict(unit, self) for unit in state["units"]]
self.firmware_version = FirmwareVersion.parse_dict(state["firmwareVersion"])
self.model = state["model"]
self.is_editable = state["isEditable"]
self.manufacturer = Manufacturer.parse_dict(state["manufacturer"])
self.push_service = PushService.parse_dict(state["pushService"])
self.actor_identification_number = state["actorIdentificationNumber"]
def to_web_data(self):
data = {
"device": self.id,
"ule_device_name": self.display_name
}
for unit in self.units:
data.update(unit.to_web_data(self.id))
return data
def to_json(self):
state = {"type": self.type.name,
"isDeletable": self.is_deletable,
"id": self.id,
"masterConnectionState": self.master_connection_state,
"displayName": self.display_name,
"category": self.category,
"units": [unit.to_json() for unit in self.units],
"firmwareVersion": self.firmware_version.to_json(),
"model": self.model,
"isEditable": self.is_editable,
"manufacturer": self.manufacturer.to_json(),
"pushService": self.push_service.to_json(),
"actorIdentificationNumber": self.actor_identification_number}
return state
def set_offset(self, offset: float):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
temp_skill.offset = offset
def get_offset(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.offset
def get_temperature(self):
temp_sensor: Optional[Unit] = None
for unit in self.units:
if unit.type == UnitTypes.TEMPERATURE_SENSOR:
temp_sensor = unit
break
temp_skill: Optional[TemperatureSkill] = None
for skill in temp_sensor.skills:
if skill.type == SkillType.SmartHomeTemperatureSensor:
temp_skill = typing.cast(TemperatureSkill, skill)
break
return temp_skill.current_in_celsius
def to_short_json(self):
return {
"masterConnectionState": self.master_connection_state,
"type": self.type.name,
"model": self.model,
"id": self.id,
"manufacturer": self.manufacturer.to_json(),
"actorIdentificationNumber": self.actor_identification_number,
"displayName": self.display_name
}

View File

@ -1,27 +1,58 @@
from typing import Optional, Tuple from __future__ import annotations
import requests
import json import asyncio
import re
import hashlib import hashlib
import xml.etree.ElementTree as ET import json
import logging import logging
import re
import xml.etree.ElementTree as ET
from asyncio import Task
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Union
import requests
from bs4 import BeautifulSoup
from device import Device
class FritzBox: class FritzBox:
def __init__(self, url:str, password:str, user:str = None) -> None: def __init__(self, url: str, password: str, update_timeout: int, user: str = None, dry_run: bool = False,
verify_ssl: bool = True, old_fb: bool = False) -> None:
self._endpoints = { self._endpoints = {
"login": "login_sid.lua?version=2", "login": "login_sid.lua?version=2",
"logout": "index.lua", "logout": "index.lua",
"data": "data.lua" "data": "data.lua",
"device_details": "net/home_auto_hkr_edit.lua"
} }
self.url = url self.url: str = url
self.session = requests.Session() self.dry_run: bool = dry_run
self.password = password self.user: Optional[str] = user
self.sid = None self.session: requests.Session = requests.Session()
self.password: str = password
self.sid: Optional[str] = None
self.update_timeout: int = update_timeout
self.update_time: Dict[str, datetime] = {}
self.hold_connection: Optional[Task] = None
self.verify_ssl = verify_ssl
self.old_fb = old_fb
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def hold_connection_alive(self) -> None:
while True:
# Session automatically destroyed after 20m of inactivity
# according to the manual
await asyncio.sleep(19 * 60)
self.check_session()
def _calc_challenge_v2(self, challenge: str) -> str: def _calc_challenge_v2(self, challenge: str) -> str:
logging.debug(f"Calculate v2 challenge: {challenge}") logging.debug(f"Calculate v2 challenge: {challenge}")
chall_regex = re.compile("2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)") chall_regex = re.compile(
"2\$(?P<iter1>[0-9a-zA-Z]+)\$(?P<salt1>[0-9a-zA-Z]+)\$(?P<iter2>[0-9a-zA-Z]+)\$(?P<salt2>[0-9a-zA-Z]+)")
chall_parts = chall_regex.match(challenge).groupdict() chall_parts = chall_regex.match(challenge).groupdict()
salt1: bytes = bytes.fromhex(chall_parts["salt1"]) salt1: bytes = bytes.fromhex(chall_parts["salt1"])
@ -42,22 +73,38 @@ class FritzBox:
response = challenge + "-" + hashlib.md5(response).hexdigest() response = challenge + "-" + hashlib.md5(response).hexdigest()
return response return response
def login(self, user:str = None) -> bool: def check_session(self) -> None:
logging.debug(f"login user {user}") data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "overview",
"xhrId": "first",
"noMenuRef": 1
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
if len(r.history) > 0:
if not self.login():
logging.error("Failed to login to Fritz!Box")
else:
logging.debug("Already logged in")
def login(self) -> bool:
logging.info(f"Login user {self.user} to Fritz!Box")
challenge = None challenge = None
r = self.session.get(f"{self.url}/{self._endpoints['login']}") r = self.session.get(f"{self.url}/{self._endpoints['login']}", verify=self.verify_ssl)
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
for elem in xml: for elem in xml:
if elem.tag == "SID": if elem.tag == "SID":
self.sid = elem.text self.sid = elem.text
elif elem.tag == "Challenge": elif elem.tag == "Challenge":
challenge = elem.text challenge = elem.text
elif user is None and elem.tag == "Users": elif self.user is None and elem.tag == "Users":
for user_elem in elem: for user_elem in elem:
if "fritz" in user_elem.text: if "fritz" in user_elem.text:
user = user_elem.text self.user = user_elem.text
assert challenge is not None and user is not None assert challenge is not None and self.user is not None
if challenge.startswith("2$"): if challenge.startswith("2$"):
response = self._calc_challenge_v2(challenge) response = self._calc_challenge_v2(challenge)
@ -65,32 +112,99 @@ class FritzBox:
response = self._calc_challenge_v1(challenge) response = self._calc_challenge_v1(challenge)
data = { data = {
"username": user, "username": self.user,
"response": response "response": response
} }
r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['login']}", data=data, verify=self.verify_ssl)
logging.debug(r.text) logging.debug(r.text)
xml = ET.fromstring(r.text) xml = ET.fromstring(r.text)
for elem in xml: for elem in xml:
if elem.tag == "SID": if elem.tag == "SID":
self.sid = elem.text self.sid = elem.text
logging.debug(f"Authenticated fritzbox: {len(self.sid) != self.sid.count('0')}") logging.info(f"Authenticated Fritz!Box: {len(self.sid) != self.sid.count('0')}")
if len(self.sid) != self.sid.count("0"):
self.hold_connection = asyncio.create_task(self.hold_connection_alive())
return len(self.sid) != self.sid.count("0") return len(self.sid) != self.sid.count("0")
def logout(self) -> bool: def logout(self) -> bool:
logging.debug("logout") logging.info("logout")
data = { data = {
"xhr": 1, "xhr": 1,
"sid": self.sid, "sid": self.sid,
"logout": 1, "logout": 1,
"no_sidrenew": ""} "no_sidrenew": ""}
r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['logout']}", data=data, verify=self.verify_ssl)
if self.hold_connection is not None:
self.hold_connection.cancel()
return r.status_code == 200 return r.status_code == 200
def list_devices(self): def list_devices_old(self) -> Optional[List[Union[Device, Dict]]]:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"page": "sh",
"xhrId": "all"
}
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100])
if len(r.history) > 0:
if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else:
return None
devices_overview_raw = BeautifulSoup(r.text, "html.parser")
table_rows = devices_overview_raw.find(id="uiSmarthomeTables").table
devices_raw = []
for r in table_rows.find_all("tr"):
name = r.findAll("td", {'class': ['name', 'cut_overflow']})
button = r.findAll("td", {'class': 'btncolumn'})
temperature = r.findAll("td", {'class': 'temperature'})
if name is None or len(name) < 1:
continue
if button is None or len(button) < 1:
continue
if temperature is None or len(temperature) < 1:
continue
name = name[0].string
id = int(button[0].button["value"])
print("Temperature raw:", temperature)
try:
temperature = float(temperature[0].text.split(" ")[0].replace(",", "."))
except:
print("Error parsing temperature.")
return []
request_data = {
"device": id,
"sid": self.sid,
"xhr": 1
}
r = self.session.get(f"{self.url}/{self._endpoints['device_details']}", params=request_data,
verify=self.verify_ssl)
device_content_raw = BeautifulSoup(r.text, "html.parser")
offset = float(device_content_raw.find("input", {"type": "hidden", "name": "Offset"})["value"])
devices_raw.append({
"name": name,
"display_name": name,
"id": id,
"temperature": temperature,
"offset": offset
})
return devices_raw
def list_devices(self) -> Optional[List[Device]]:
data = { data = {
"xhr": 1, "xhr": 1,
"sid": self.sid, "sid": self.sid,
@ -98,44 +212,55 @@ class FritzBox:
"page": "sh_dev", "page": "sh_dev",
"xhrId": "all" "xhrId": "all"
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
logging.debug(r.text[:100]) logging.debug(r.text[:100])
if len(r.history) > 0: if len(r.history) > 0:
if self.login(): if self.login():
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
else: else:
return None return None
devices = json.loads(r.text)["data"]["devices"] devices: List[Device] = []
for device in json.loads(r.text)["data"]["devices"]:
devices.append(Device(device))
return devices return devices
def get_device_data(self, id: int = None, name: str = None) -> Optional[Tuple[float, float, int, str]]: def get_device_data(self, idx: int = None, name: str = None) -> Optional[Device]:
if id is None and name is None: if idx is None and name is None:
logging.debug("No id or name given") logging.debug("No id or name given")
return None return None
if self.old_fb:
devices = self.list_devices_old()
else:
devices = self.list_devices() devices = self.list_devices()
device = None
for device in devices: for device in devices:
if device["id"] == id or device["displayName"] == name: if isinstance(device, dict):
if device["id"] == idx or device["display_name"] == name:
break
else:
if device.id == idx or device.display_name == name:
break break
device = None device = None
if device is None: if device is None:
logging.debug(f"Device {id} {name} not found") logging.debug(f"Device {idx} {name} not found")
return None return None
current_temp = None return device
current_offset = None
for unit in device["units"]:
if unit["type"] == "TEMPERATURE_SENSOR":
for skill in unit["skills"]:
if skill["type"] == "SmartHomeTemperatureSensor":
current_temp = float(skill["currentInCelsius"])
current_offset = float(skill["offset"])
return current_temp, current_offset, device["id"], device["displayName"] def set_offset(self, device: Union[Device, Dict]) -> None:
if self.dry_run:
logging.warning("No updates in dry-run-mode")
return
def set_offset(self, current_temp: str, offset: float, device_id: int, device_name: str): old_type = isinstance(device, dict)
if old_type:
device_id = device["id"]
else:
device_id = device.id
if not old_type:
data = { data = {
"xhr": 1, "xhr": 1,
"sid": self.sid, "sid": self.sid,
@ -143,24 +268,61 @@ class FritzBox:
"device": device_id, "device": device_id,
"page": "home_auto_hkr_edit" "page": "home_auto_hkr_edit"
} }
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
data = { data = {
"xhr": 1, "xhr": 1,
"sid": self.sid, "sid": self.sid,
"lang": "de", "lang": "de",
"device": device_id,
"view": "", "view": "",
"back_to_page": "sh_dev", "back_to_page": "sh_dev",
"ule_device_name": device_name,
"WindowOpenTrigger":8,
"WindowOpenTimer":10,
"tempsensor": "own",
"Roomtemp": f"{current_temp}",
"ExtTempsensorID":"tochoose",
"Offset": f"{offset}",
"apply": "", "apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua" "oldpage": "/net/home_auto_hkr_edit.lua"
} }
data.update(device.to_web_data())
r = self.session.post(f"{self.url}/{self._endpoints['data']}", data=data) else:
data = {
"xhr": 1,
"sid": self.sid,
"lang": "de",
"no_sidrenew": "",
"device": device_id,
"view": "",
"back_to_page": "/net/home_auto_overview.lua",
"ule_device_name": device["name"],
"Offset": device["offset"],
"apply": "",
"oldpage": "/net/home_auto_hkr_edit.lua",
}
self.session.post(f"{self.url}/{self._endpoints['data']}", data=data, verify=self.verify_ssl)
def correct_offset(self, device_name: str, real_temp: float):
elapsed = None
if device_name in self.update_time.keys():
elapsed = datetime.now() - self.update_time[device_name]
logging.info(f"Last update for {device_name} {elapsed} ago")
delta = timedelta(minutes=self.update_timeout)
if device_name not in self.update_time.keys() or elapsed > delta:
device: Optional[Union[Dict, Device]] = self.get_device_data(name=device_name)
logging.info(f"device: {device}")
if device is None:
return
offset_changed = False
if self.old_fb:
new_offset = device["offset"] + real_temp - device["temperature"]
offset_changed = new_offset != device["offset"]
logging.info(f"Update offset from {device['offset']} to {new_offset} (changed: {offset_changed})")
else:
new_offset = device.get_offset() + real_temp - device.get_temperature()
offset_changed = new_offset != device.get_offset()
logging.info(f"Update offset from {device.get_offset()} to {new_offset} (changed: {offset_changed})")
if offset_changed:
if self.old_fb:
device["offset"] = new_offset
else:
device.set_offset(new_offset)
self.set_offset(device)
self.update_time[device_name] = datetime.now()

View File

@ -1,66 +1,152 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
import logging import logging
from typing import Callable from asyncio import Queue, Task, Event, Lock
import websocket from typing import Callable, Dict, Optional
import time import websockets
from websockets.exceptions import InvalidStatusCode
class HomeAssistantAPI: class HomeAssistantAPI:
def __init__(self, token:str, initialize: Callable[[HomeAssistantAPI], None]) -> None: def __init__(self, token: str, url: str) -> None:
self.token = token self.token = token
self.msg_id = 1 self.msg_id = 1
self.ws = None self.msg_id_lock = Lock()
self.subscriptions = {} self.ws: websockets.WebSocketClientProtocol = None
self.init_callback = initialize self.url = url
self.receiver: Optional[Task] = None
self.sender: Optional[Task] = None
self.sending_queue: Queue = Queue()
self.authenticated: Event = Event()
self.events: Dict[int, Queue] = {}
self.responses: Dict[int, Dict] = {}
self.response_events: Dict[int, Event] = {}
self.response_lock: Lock = Lock()
def handle_message(self, ws: websocket.WebSocket, msg: str) -> None: async def connect(self):
if self.ws is None: retries = 5
self.ws = ws logging.info("Connect to home assistant...")
while True:
try:
self.ws = await websockets.connect(self.url)
self.sender = asyncio.create_task(self.sending())
await self.auth()
self.receiver = asyncio.create_task(self.receiving())
return True
except (InvalidStatusCode, TimeoutError):
if retries > 0:
retries -= 1
await asyncio.sleep(30)
logging.info(f"Retry home assistant connection... ({retries})")
continue
else:
logging.error("Invalid status code while connecting to Home Assistant")
await self.exit_loop()
return False
message: object = json.loads(msg) async def wait_for_close(self):
await self.ws.wait_closed()
if message["type"] == "auth_required": async def receiving(self):
logging.debug("Start receiving")
async for message in self.ws:
msg: Dict = json.loads(message)
if msg["type"] == "event":
if msg["id"] not in self.events.keys():
logging.error(f"Received event for not subscribted id: {msg['id']} {msg['event_type']}")
continue
await self.events[msg["id"]].put(msg["event"])
else:
async with self.response_lock:
self.responses[msg["id"]] = msg
if msg["id"] in self.response_events.keys():
self.response_events[msg["id"]].set()
async def wait_for(self, idx):
async with self.response_lock:
if idx in self.responses.keys():
msg = self.responses[idx]
del self.responses[idx]
return msg
self.response_events[idx] = Event()
await self.response_events[idx].wait()
async with self.response_lock:
del self.response_events[idx]
if idx not in self.responses.keys():
logging.error("Response ID not found")
return None
msg = self.responses[idx]
del self.responses[idx]
return msg
async def exit_loop(self):
if self.sender is not None:
self.sender.cancel()
if self.receiver is not None:
self.receiver.cancel()
async def auth(self):
msg = json.loads(await self.ws.recv())
if msg["type"] != "auth_required":
logging.error("Authentication error: Not required")
await self.exit_loop()
response = { response = {
"type": "auth", "type": "auth",
"access_token": self.token "access_token": self.token
} }
logging.debug(response) await self.sending_queue.put(response)
ws.send(json.dumps(response)) msg = json.loads(await self.ws.recv())
return if msg["type"] == "auth_invalid":
elif message["type"] == "auth_invalid":
logging.info("Auth failed") logging.info("Auth failed")
ws.close() await self.exit_loop()
return None elif msg["type"] == "auth_ok":
elif message["type"] == "auth_ok":
logging.debug("Authenticated") logging.debug("Authenticated")
self.init_callback(self) self.authenticated.set()
self.init_callback = None
return
elif message["type"] == "event":
event = message["event"]
if event["event_type"] in self.subscriptions.keys():
self.subscriptions[event["event_type"]](event)
else: else:
print("Received", message) logging.error(f"Unknown answer for auth: {msg}")
await self.exit_loop()
def subscribe_event(self, event_type: str, callback: Callable[[object], None]): async def sending(self):
while msg := await self.sending_queue.get():
await self.ws.send(json.dumps(msg))
if self.ws is None: async def subscribe_event(self, event_type: str):
logging.debug("Websocket not set") await self.authenticated.wait()
return
if event_type in self.subscriptions.keys():
logging.warning(f"Already subscribed to {event_type}")
return
logging.info(f"Subscribe to {event_type}") logging.info(f"Subscribe to {event_type}")
self.subscriptions[event_type] = callback async with self.msg_id_lock:
msg_id = self.msg_id
response = { response = {
"id": self.msg_id, "id": msg_id,
"type": "subscribe_events", "type": "subscribe_events",
"event_type": event_type "event_type": event_type
} }
self.events[msg_id] = Queue()
self.msg_id += 1 self.msg_id += 1
self.ws.send(json.dumps(response)) await self.sending_queue.put(response)
return msg_id
async def get_states(self):
await self.authenticated.wait()
async with self.msg_id_lock:
message = {
"id": self.msg_id,
"type": "get_states"
}
self.msg_id += 1
await self.sending_queue.put(message)
response = await self.wait_for(message["id"])
# ToDo: Error handling
return response["result"]
async def get_device_state(self, entity_id: str):
device_states = await self.get_states()
for device_state in device_states:
if device_state["entity_id"] == entity_id:
return device_state
return None

View File

@ -1,3 +1,4 @@
#!/usr/bin/env bashio #!/usr/bin/with-contenv bashio
python3 /srv/sync_ha_fb.py source /opt/venv/bin/activate
python3 /srv/sync_ha_fb.py /data/options.json

View File

@ -1,78 +1,105 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
hier die verbindungen zu HA aufbauen etc import asyncio
außerdem das vergleichen der werte und dass anstoßen der updates
"""
import os import os
import sys
from fritzbox import FritzBox from fritzbox import FritzBox
from homeassistant import HomeAssistantAPI from homeassistant import HomeAssistantAPI
import logging import logging
import websocket
import json import json
mappings = {} sensor_mappings = {}
thermostate_mappings = {}
def handle_event(event):
async def handle_event(idx: int):
logging.info(f"Wait for events for {idx}")
while event := await ha.events[idx].get():
try:
entity_id = event["data"]["entity_id"] entity_id = event["data"]["entity_id"]
if entity_id in mappings.keys(): if entity_id in sensor_mappings.keys() or entity_id in thermostate_mappings.keys():
state = await ha.get_device_state(entity_id)
new_state = event["data"]["new_state"] new_state = event["data"]["new_state"]
logging.debug(entity_id) logging.debug(f"received changed state from {entity_id}")
logging.debug(new_state["attributes"]["temperature"]) if entity_id in thermostate_mappings.keys() and state["state"] != "unavailable":
rounded = round(float(new_state["attributes"]["temperature"])*2)/2 therm_temp = new_state["attributes"]["current_temperature"]
logging.debug(rounded) therm_name = new_state["attributes"]["friendly_name"]
if new_state["attributes"]["device_class"] == "temperature": sensor = thermostate_mappings[entity_id]
if entity_id in mappings.keys(): sensor_state = await ha.get_device_state(sensor)
fb.login() sensor_state_temp = sensor_state["attributes"]["temperature"] if "temperatur" in sensor_state[
logged = False "attributes"] else sensor_state["state"]
for thermostate in mappings[entity_id]: sensor_temp = round(float(sensor_state_temp) * 2) / 2
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate) if therm_temp != sensor_temp:
if not logged: logging.info(
logging.info(f"Current measurement from {entity_id}: {new_state['attributes']['temperature']} ({rounded})") f"{therm_name}: {therm_temp}\n{sensor}: {sensor_state_temp} ({sensor_temp})")
logged = True fb.correct_offset(therm_name, sensor_temp)
logging.info(f"Current measurement from {thermostate}: {current_temp}")
new_offset = current_offset + rounded - current_temp
if new_offset != current_offset:
old_offset = current_offset
logging.debug(f"Set offset for {thermostate} from {current_offset} to {new_offset}")
fb.set_offset(current_temp, new_offset, id, name)
current_temp, current_offset, id, name = fb.get_device_data(name=thermostate)
logging.debug(f"Target: {new_offset} ; Set: {current_offset}")
if new_offset == current_offset:
logging.info(f"Adjustet offset from {old_offset} to {new_offset}")
else:
logging.warning(f"Failed to adjust offset from {old_offset} to {new_offset}")
fb.logout()
def on_error(ws, error): elif entity_id in sensor_mappings.keys():
print(error) new_state_temp = new_state["attributes"]["temperature"] if "temperatur" in new_state[
"attributes"] else new_state["state"]
sensor_temp = round(float(new_state_temp) * 2) / 2
logging.info(sensor_temp)
for thermostate in sensor_mappings[entity_id]:
therm_state = await ha.get_device_state(thermostate)
if therm_state["state"] == "unavailable":
continue
therm_temp = float(therm_state["attributes"]["current_temperature"])
therm_name = therm_state["attributes"]["friendly_name"]
if therm_temp != sensor_temp or True:
logging.info(
f"{therm_name}: {therm_temp}\n{entity_id}: {new_state_temp} ({sensor_temp})")
fb.correct_offset(therm_name, sensor_temp)
except KeyError as e:
logging.error(e)
def on_close(ws, close_status_code, close_msg):
pass
def on_open(ws): async def init(ha: HomeAssistantAPI, fb: FritzBox):
pass if not await ha.connect():
return
def init(ha: HomeAssistantAPI):
logging.debug("Subscribe") logging.debug("Subscribe")
ha.subscribe_event("state_changed", handle_event) state_changed_id = await ha.subscribe_event("state_changed")
logging.debug(state_changed_id)
asyncio.create_task(handle_event(state_changed_id))
fb.login()
await ha.wait_for_close()
logging.info("Websocket closed, shutting down..")
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s") async def main():
config = json.load(open("/data/options.json")) config_path = sys.argv[1]
config = json.load(open(config_path))
level = logging.INFO
if "log_level" in config:
if config["log_level"] == "DEBUG":
level = logging.DEBUG
logging.basicConfig(level=level, format="[%(asctime)s] [%(levelname)s] %(message)s")
logging.debug(config) logging.debug(config)
global fb
fb = FritzBox(url=config["fritzbox"]["url"],
user=config["fritzbox"]["username"],
password=config["fritzbox"]["password"],
update_timeout=config["update_timeout"],
dry_run=False,
verify_ssl=config["fritzbox"]["verify_ssl"],
old_fb=config["fritzbox"]["old_fb"])
supervisor_url = "ws://supervisor/core/websocket"
supervisor_token = os.environ["SUPERVISOR_TOKEN"]
global ha
ha = HomeAssistantAPI(supervisor_token, supervisor_url)
for mapping in config["mappings"]: for mapping in config["mappings"]:
if mapping["sensor"] not in mappings.keys(): if mapping["sensor"] not in sensor_mappings.keys():
mappings[mapping["sensor"]] = [] sensor_mappings[mapping["sensor"]] = []
mappings[mapping["sensor"]].append(mapping["thermostate"]) sensor_mappings[mapping["sensor"]].append(mapping["thermostate"])
thermostate_mappings[mapping["thermostate"]] = mapping["sensor"]
fb = FritzBox(config["fritzbox"]["url"], config["fritzbox"]["password"]) try:
ha = HomeAssistantAPI(os.environ["SUPERVISOR_TOKEN"], init) await init(ha, fb)
except KeyboardInterrupt:
pass
websocket.enableTrace(False)
ws = websocket.WebSocketApp("ws://supervisor/core/websocket",
on_open=on_open,
on_message=ha.handle_message,
on_error=on_error,
on_close=on_close)
ws.run_forever() asyncio.run(main())