SWS-12500-custom-component/custom_components/sws12500/utils.py

496 lines
14 KiB
Python

"""Utils for SWS12500.
This module contains small helpers used across the integration.
Notable responsibilities:
- Payload remapping: convert raw station/webhook field names into stable internal keys.
- Auto-discovery helpers: detect new payload fields that are not enabled yet and persist them
to config entry options so sensors can be created dynamically.
- Formatting/conversion helpers (wind direction text, battery mapping, temperature conversions).
Keeping these concerns in one place avoids duplicating logic in the webhook handler and entity code.
"""
import logging
import math
from typing import Any, cast
from py_typecheck.core import checked_or
from homeassistant.components import persistent_notification
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.translation import async_get_translations
from .const import (
AZIMUT,
CONNECTION_GATED_SENSORS,
# DATABASE_PATH,
DEV_DBG,
OUTSIDE_HUMIDITY,
OUTSIDE_TEMP,
REMAP_ITEMS,
REMAP_WSLINK_ITEMS,
SENSORS_TO_LOAD,
VOC_LEVEL_MAP,
WIND_SPEED,
UnitOfBat,
UnitOfDir,
VOCLevel,
)
_LOGGER = logging.getLogger(__name__)
async def translations(
hass: HomeAssistant,
translation_domain: str,
translation_key: str,
*,
key: str = "message",
category: str = "notify",
) -> str | None:
"""Get translated keys for domain."""
localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}"
language: str = hass.config.language
_translations = await async_get_translations(hass, language, category, [translation_domain])
if localize_key in _translations:
return _translations[localize_key]
return None
async def translated_notification(
hass: HomeAssistant,
translation_domain: str,
translation_key: str,
translation_placeholders: dict[str, str] | None = None,
notification_id: str | None = None,
*,
key: str = "message",
category: str = "notify",
):
"""Translate notification."""
localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}"
localize_title = f"component.{translation_domain}.{category}.{translation_key}.title"
language: str = cast("str", hass.config.language)
_translations = await async_get_translations(hass, language, category, [translation_domain])
if localize_key in _translations:
if not translation_placeholders:
persistent_notification.async_create(
hass,
_translations[localize_key],
_translations[localize_title],
notification_id,
)
else:
message = _translations[localize_key].format(**translation_placeholders)
persistent_notification.async_create(hass, message, _translations[localize_title], notification_id)
async def update_options(hass: HomeAssistant, entry: ConfigEntry, update_key, update_value) -> bool:
"""Update config.options entry."""
conf = {**entry.options}
conf[update_key] = update_value
return hass.config_entries.async_update_entry(entry, options=conf)
def anonymize(
data: dict[str, str | int | float | bool],
) -> dict[str, str | int | float | bool]:
"""Anonymize received data for safe logging.
- Keep all keys, but mask sensitive values.
- Do not raise on unexpected/missing keys.
"""
secrets = {"ID", "PASSWORD", "wsid", "wspw"}
return {k: ("***" if k in secrets else v) for k, v in data.items()}
def remap_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap legacy (WU-style) payload field names into internal sensor keys.
The station sends short/legacy field names (e.g. "tempf", "humidity"). Internally we use
stable keys from `const.py` (e.g. "outside_temp", "outside_humidity"). This function produces
a normalized dict that the rest of the integration can work with.
"""
return {REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS}
def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap items in query for WSLink API."""
items: dict[str, str] = {}
for item, value in entities.items():
if item in REMAP_WSLINK_ITEMS:
items[REMAP_WSLINK_ITEMS[item]] = value
for conn_key, gated in CONNECTION_GATED_SENSORS.items():
if str(entities.get(conn_key, "0")) != "1":
for key in gated:
items.pop(key, None)
return items
def loaded_sensors(config_entry: ConfigEntry) -> list[str]:
"""Return sensor keys currently enabled for this config entry.
Auto-discovery persists new keys into `config_entry.options[SENSORS_TO_LOAD]`. The sensor
platform uses this list to decide which entities to create.
"""
return config_entry.options.get(SENSORS_TO_LOAD) or []
def check_disabled(items: dict[str, str], config_entry: ConfigEntry) -> list[str] | None:
"""Detect payload fields that are not enabled yet (auto-discovery).
The integration supports "auto-discovery" of sensors: when the station starts sending a new
field, we can automatically enable and create the corresponding entity.
This helper compares the normalized payload keys (`items`) with the currently enabled sensor
keys stored in options (`SENSORS_TO_LOAD`) and returns the missing keys.
Returns:
- list[str] of newly discovered sensor keys (to be added/enabled), or
- None if no new keys were found.
Notes:
- Logging is controlled via `DEV_DBG` because payloads can arrive frequently.
"""
log = checked_or(config_entry.options.get(DEV_DBG), bool, False)
entityFound: bool = False
_loaded_sensors: list[str] = loaded_sensors(config_entry)
missing_sensors: list[str] = []
for item in items:
if log:
_LOGGER.info("Checking %s", item)
if item not in _loaded_sensors:
missing_sensors.append(item)
entityFound = True
if log:
_LOGGER.info("Add sensor (%s) to loading queue", item)
return missing_sensors if entityFound else None
def wind_dir_to_text(deg: float) -> UnitOfDir | None:
"""Return wind direction in text representation.
Returns UnitOfDir or None
"""
_deg = to_float(deg)
if _deg is not None:
_LOGGER.debug("wind_dir: %s", AZIMUT[int(abs((_deg - 11.25) % 360) / 22.5)])
return AZIMUT[int(abs((_deg - 11.25) % 360) / 22.5)]
return None
def battery_level(battery: int | str | None) -> UnitOfBat:
"""Return battery level.
WSLink payload values often arrive as strings (e.g. "0"/"1"), so we accept
both ints and strings and coerce to int before mapping.
Returns UnitOfBat
"""
level_map: dict[int, UnitOfBat] = {
0: UnitOfBat.LOW,
1: UnitOfBat.NORMAL,
}
if (battery is None) or (battery == ""):
return UnitOfBat.UNKNOWN
vi: int
if isinstance(battery, int):
vi = battery
else:
try:
vi = int(battery)
except ValueError:
return UnitOfBat.UNKNOWN
return level_map.get(vi, UnitOfBat.UNKNOWN)
def battery_level_to_icon(battery: UnitOfBat) -> str:
"""Return battery level in icon representation.
Returns str
"""
icons = {
UnitOfBat.LOW: "mdi:battery-low",
UnitOfBat.NORMAL: "mdi:battery",
}
return icons.get(battery, "mdi:battery-unknown")
def fahrenheit_to_celsius(fahrenheit: float) -> float:
"""Convert Fahrenheit to Celsius."""
return (fahrenheit - 32) * 5.0 / 9.0
def celsius_to_fahrenheit(celsius: float) -> float:
"""Convert Celsius to Fahrenheit."""
return celsius * 9.0 / 5.0 + 32
def to_int(val: Any) -> int | None:
"""Convert int or string to int."""
if val is None:
return None
if isinstance(val, str) and val.strip() == "":
return None
try:
v = int(val)
except (TypeError, ValueError):
return None
else:
return v
def to_float(val: Any) -> float | None:
"""Convert int or string to float."""
if val is None:
return None
if isinstance(val, str) and val.strip() == "":
return None
try:
v = float(val)
except (TypeError, ValueError):
return None
else:
return v
def heat_index(data: dict[str, int | float | str], convert: bool = False) -> float | None:
"""Calculate heat index from temperature.
data: dict with temperature and humidity
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
if (temp := to_float(data.get(OUTSIDE_TEMP))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
return None
if (rh := to_float(data.get(OUTSIDE_HUMIDITY))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE HUMIDITY (%s), cannot calculate wind chill index.",
rh,
)
return None
adjustment = None
if convert:
temp = celsius_to_fahrenheit(temp)
simple = 0.5 * (temp + 61.0 + ((temp - 68.0) * 1.2) + (rh * 0.094))
if ((simple + temp) / 2) > 80:
full_index = (
-42.379
+ 2.04901523 * temp
+ 10.14333127 * rh
- 0.22475541 * temp * rh
- 0.00683783 * temp * temp
- 0.05481717 * rh * rh
+ 0.00122874 * temp * temp * rh
+ 0.00085282 * temp * rh * rh
- 0.00000199 * temp * temp * rh * rh
)
if rh < 13 and (80 <= temp <= 112):
adjustment = ((13 - rh) / 4) * math.sqrt((17 - abs(temp - 95)) / 17)
if rh > 80 and (80 <= temp <= 87):
adjustment = ((rh - 85) / 10) * ((87 - temp) / 5)
return round((full_index + adjustment if adjustment else full_index), 2)
return simple
def chill_index(data: dict[str, str | float | int], convert: bool = False) -> float | None:
"""Calculate wind chill index from temperature and wind speed.
data: dict with temperature and wind speed
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
temp = to_float(data.get(OUTSIDE_TEMP))
wind = to_float(data.get(WIND_SPEED))
if temp is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
return None
if wind is None:
_LOGGER.error(
"We are missing/invalid WIND SPEED (%s), cannot calculate wind chill index.",
wind,
)
return None
if convert:
temp = celsius_to_fahrenheit(temp)
return (
round(
((35.7 + (0.6215 * temp)) - (35.75 * (wind**0.16)) + (0.4275 * (temp * (wind**0.16)))),
2,
)
if temp < 50 and wind > 3
else temp
)
def voc_level_to_text(value: str) -> VOCLevel | None:
"""Map 1-5 VOC level to text state."""
if value in (None, ""):
return None
return VOC_LEVEL_MAP.get(int(value))
def battery_5step_to_pct(value: str) -> int | None:
"""Convert 0-5 battery steps to percentage."""
if value in (None, ""):
return None
return round(int(value) / 5 * 100)
#
# def long_term_units_in_statistics_meta():
# """Get units in long term statitstics."""
# sensor_units = []
# if not Path(DATABASE_PATH).exists():
# _LOGGER.error("Database file not found: %s", DATABASE_PATH)
# return False
#
# conn = sqlite3.connect(DATABASE_PATH)
# db = conn.cursor()
#
# try:
# db.execute(
# """
# SELECT statistic_id, unit_of_measurement from statistics_meta
# WHERE statistic_id LIKE 'sensor.weather_station_sws%'
# """
# )
# rows = db.fetchall()
# sensor_units = {
# statistic_id: f"{statistic_id} ({unit})" for statistic_id, unit in rows
# }
#
# except sqlite3.Error as e:
# _LOGGER.error("Error during data migration: %s", e)
# finally:
# conn.close()
#
# return sensor_units
#
#
# async def migrate_data(hass: HomeAssistant, sensor_id: str | None = None) -> int | bool:
# """Migrate data from mm/d to mm."""
#
# _LOGGER.debug("Sensor %s is required for data migration", sensor_id)
# updated_rows = 0
#
# if not Path(DATABASE_PATH).exists():
# _LOGGER.error("Database file not found: %s", DATABASE_PATH)
# return False
#
# conn = sqlite3.connect(DATABASE_PATH)
# db = conn.cursor()
#
# try:
# _LOGGER.info(sensor_id)
# db.execute(
# """
# UPDATE statistics_meta
# SET unit_of_measurement = 'mm'
# WHERE statistic_id = ?
# AND unit_of_measurement = 'mm/d';
# """,
# (sensor_id,),
# )
# updated_rows = db.rowcount
# conn.commit()
# _LOGGER.info(
# "Data migration completed successfully. Updated rows: %s for %s",
# updated_rows,
# sensor_id,
# )
#
# except sqlite3.Error as e:
# _LOGGER.error("Error during data migration: %s", e)
# finally:
# conn.close()
# return updated_rows
#
#
# def migrate_data_old(sensor_id: str | None = None):
# """Migrate data from mm/d to mm."""
# updated_rows = 0
#
# if not Path(DATABASE_PATH).exists():
# _LOGGER.error("Database file not found: %s", DATABASE_PATH)
# return False
#
# conn = sqlite3.connect(DATABASE_PATH)
# db = conn.cursor()
#
# try:
# _LOGGER.info(sensor_id)
# db.execute(
# """
# UPDATE statistics_meta
# SET unit_of_measurement = 'mm'
# WHERE statistic_id = ?
# AND unit_of_measurement = 'mm/d';
# """,
# (sensor_id,),
# )
# updated_rows = db.rowcount
# conn.commit()
# _LOGGER.info(
# "Data migration completed successfully. Updated rows: %s for %s",
# updated_rows,
# sensor_id,
# )
#
# except sqlite3.Error as e:
# _LOGGER.error("Error during data migration: %s", e)
# finally:
# conn.close()
# return updated_rows