SWS-12500-custom-component/custom_components/sws12500/utils.py

326 lines
8.4 KiB
Python

"""Utils for SWS12500."""
import logging
import math
from multiprocessing import Value
from typing import Any, cast
import numpy as np
from py_typecheck import checked
from py_typecheck.core import checked_or
from homeassistant.components import persistent_notification
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.translation import async_get_translations
from .const import (
AZIMUT,
DEV_DBG,
OUTSIDE_HUMIDITY,
OUTSIDE_TEMP,
REMAP_ITEMS,
REMAP_WSLINK_ITEMS,
SENSORS_TO_LOAD,
WIND_SPEED,
UnitOfBat,
UnitOfDir,
)
_LOGGER = logging.getLogger(__name__)
async def translations(
hass: HomeAssistant,
translation_domain: str,
translation_key: str,
*,
key: str = "message",
category: str = "notify",
) -> str | None:
"""Get translated keys for domain."""
localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}"
language: str = hass.config.language
_translations = await async_get_translations(
hass, language, category, [translation_domain]
)
if localize_key in _translations:
return _translations[localize_key]
return None
async def translated_notification(
hass: HomeAssistant,
translation_domain: str,
translation_key: str,
translation_placeholders: dict[str, str] | None = None,
notification_id: str | None = None,
*,
key: str = "message",
category: str = "notify",
):
"""Translate notification."""
localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}"
localize_title = (
f"component.{translation_domain}.{category}.{translation_key}.title"
)
language: str = cast("str", hass.config.language)
_translations = await async_get_translations(
hass, language, category, [translation_domain]
)
if localize_key in _translations:
if not translation_placeholders:
persistent_notification.async_create(
hass,
_translations[localize_key],
_translations[localize_title],
notification_id,
)
else:
message = _translations[localize_key].format(**translation_placeholders)
persistent_notification.async_create(
hass, message, _translations[localize_title], notification_id
)
async def update_options(
hass: HomeAssistant,
entry: ConfigEntry,
update_key: str,
update_value: str | list[str] | bool,
) -> bool:
"""Update config.options entry."""
conf = {**entry.options}
conf[update_key] = update_value
return hass.config_entries.async_update_entry(entry, options=conf)
def anonymize(
data: dict[str, str | int | float | bool],
) -> dict[str, str | int | float | bool]:
"""Anoynimize recieved data."""
anonym: dict[str, str] = {}
return {
anonym[key]: value
for key, value in data.items()
if key not in {"ID", "PASSWORD", "wsid", "wspw"}
}
def remap_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap items in query."""
return {
REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS
}
def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap items in query for WSLink API."""
return {
REMAP_WSLINK_ITEMS[key]: value
for key, value in entities.items()
if key in REMAP_WSLINK_ITEMS
}
def loaded_sensors(config_entry: ConfigEntry) -> list[str]:
"""Get loaded sensors."""
return config_entry.options.get(SENSORS_TO_LOAD) or []
def check_disabled(
items: dict[str, str], config_entry: ConfigEntry
) -> list[str] | None:
"""Check if we have data for unloaded sensors.
If so, then add sensor to load queue.
Returns list of found sensors or None
"""
log = checked_or(config_entry.options.get(DEV_DBG), bool, False)
entityFound: bool = False
_loaded_sensors: list[str] = loaded_sensors(config_entry)
missing_sensors: list[str] = []
for item in items:
if log:
_LOGGER.info("Checking %s", item)
if item not in _loaded_sensors:
missing_sensors.append(item)
entityFound = True
if log:
_LOGGER.info("Add sensor (%s) to loading queue", item)
return missing_sensors if entityFound else None
def wind_dir_to_text(deg: float) -> UnitOfDir | None:
"""Return wind direction in text representation.
Returns UnitOfDir or None
"""
if deg:
return AZIMUT[int(abs((float(deg) - 11.25) % 360) / 22.5)]
return None
def battery_level(battery: int) -> UnitOfBat:
"""Return battery level.
Returns UnitOfBat
"""
level_map: dict[int, UnitOfBat] = {
0: UnitOfBat.LOW,
1: UnitOfBat.NORMAL,
}
if (v := checked(battery, int)) is None:
return UnitOfBat.UNKNOWN
return level_map.get(v, UnitOfBat.UNKNOWN)
def battery_level_to_icon(battery: UnitOfBat) -> str:
"""Return battery level in icon representation.
Returns str
"""
icons = {
UnitOfBat.LOW: "mdi:battery-low",
UnitOfBat.NORMAL: "mdi:battery",
}
return icons.get(battery, "mdi:battery-unknown")
def fahrenheit_to_celsius(fahrenheit: float) -> float:
"""Convert Fahrenheit to Celsius."""
return (fahrenheit - 32) * 5.0 / 9.0
def celsius_to_fahrenheit(celsius: float) -> float:
"""Convert Celsius to Fahrenheit."""
return celsius * 9.0 / 5.0 + 32
def _to_float(val: Any) -> float | None:
"""Convert int or string to float."""
if not val:
return None
try:
v = float(val)
except (TypeError, ValueError):
return None
else:
return v
def heat_index(
data: dict[str, int | float | str], convert: bool = False
) -> float | None:
"""Calculate heat index from temperature.
data: dict with temperature and humidity
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
if (temp := _to_float(data.get(OUTSIDE_TEMP))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
return None
if (rh := _to_float(data.get(OUTSIDE_HUMIDITY))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE HUMIDITY (%s), cannot calculate wind chill index.",
rh,
)
return None
adjustment = None
if convert:
temp = celsius_to_fahrenheit(temp)
simple = 0.5 * (temp + 61.0 + ((temp - 68.0) * 1.2) + (rh * 0.094))
if ((simple + temp) / 2) > 80:
full_index = (
-42.379
+ 2.04901523 * temp
+ 10.14333127 * rh
- 0.22475541 * temp * rh
- 0.00683783 * temp * temp
- 0.05481717 * rh * rh
+ 0.00122874 * temp * temp * rh
+ 0.00085282 * temp * rh * rh
- 0.00000199 * temp * temp * rh * rh
)
if rh < 13 and (temp in np.arange(80, 112, 0.1)):
adjustment = ((13 - rh) / 4) * math.sqrt((17 - abs(temp - 95)) / 17)
if rh > 80 and (temp in np.arange(80, 87, 0.1)):
adjustment = ((rh - 85) / 10) * ((87 - temp) / 5)
return round((full_index + adjustment if adjustment else full_index), 2)
return simple
def chill_index(
data: dict[str, str | float | int], convert: bool = False
) -> float | None:
"""Calculate wind chill index from temperature and wind speed.
data: dict with temperature and wind speed
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
temp = _to_float(data.get(OUTSIDE_TEMP))
wind = _to_float(data.get(WIND_SPEED))
if temp is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
return None
if wind is None:
_LOGGER.error(
"We are missing/invalid WIND SPEED (%s), cannot calculate wind chill index.",
wind,
)
return None
if convert:
temp = celsius_to_fahrenheit(temp)
return (
round(
(
(35.7 + (0.6215 * temp))
- (35.75 * (wind**0.16))
+ (0.4275 * (temp * (wind**0.16)))
),
2,
)
if temp < 50 and wind > 3
else temp
)