Compare commits

..

5 Commits

Author SHA1 Message Date
Lukas Svoboda 5731827224
Merge a3dc3d0d53 into 9f36ab5d4c 2026-01-18 20:48:41 +00:00
SchiZzA a3dc3d0d53
Improve data validation and error logging in utils.py 2026-01-18 21:48:24 +01:00
SchiZzA 234840e115
Rename battery_level_to_text to battery_level and update docstring 2026-01-18 19:36:33 +01:00
SchiZzA a20369bab3
Fix logging of unregistered route to include path 2026-01-18 19:35:51 +01:00
SchiZzA 08b812e558
Improve type safety, add data validation, and refactor route handling
- Add typing and runtime checks with py_typecheck for config options and
  incoming data
- Enhance authentication validation and error handling in data
  coordinator
- Refactor Routes class with better dispatch logic and route
  enabling/disabling
- Clean up async functions and improve logging consistency
- Add type hints and annotations throughout the integration
- Update manifest to include typecheck-runtime dependency
2026-01-18 17:53:28 +01:00
7 changed files with 296 additions and 369 deletions

View File

@ -1,14 +1,20 @@
"""The Sencor SWS 12500 Weather Station integration."""
import logging
from typing import Any
import aiohttp.web
from aiohttp.web_exceptions import HTTPUnauthorized
from py_typecheck import checked, checked_or
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import InvalidStateError, PlatformNotReady
from homeassistant.exceptions import (
ConfigEntryNotReady,
InvalidStateError,
PlatformNotReady,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
@ -24,7 +30,7 @@ from .const import (
WSLINK_URL,
)
from .pocasti_cz import PocasiPush
from .routes import Routes, unregistred
from .routes import Routes
from .utils import (
anonymize,
check_disabled,
@ -50,19 +56,20 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Init global updater."""
self.hass = hass
self.config = config
self.windy = WindyPush(hass, config)
self.hass: HomeAssistant = hass
self.config: ConfigEntry = config
self.windy: WindyPush = WindyPush(hass, config)
self.pocasi: PocasiPush = PocasiPush(hass, config)
super().__init__(hass, _LOGGER, name=DOMAIN)
async def recieved_data(self, webdata):
async def recieved_data(self, webdata: aiohttp.web.Request) -> aiohttp.web.Response:
"""Handle incoming data query."""
_wslink = self.config_entry.options.get(WSLINK)
data = webdata.query
response = None
_wslink: bool = checked_or(self.config.options.get(WSLINK), bool, False)
data: dict[str, Any] = dict(webdata.query)
# Check if station is sending auth data
if not _wslink and ("ID" not in data or "PASSWORD" not in data):
_LOGGER.error("Invalid request. No security data provided!")
raise HTTPUnauthorized
@ -71,44 +78,67 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
_LOGGER.error("Invalid request. No security data provided!")
raise HTTPUnauthorized
if _wslink:
id_data = data["wsid"]
key_data = data["wspw"]
else:
id_data = data["ID"]
key_data = data["PASSWORD"]
id_data: str = ""
key_data: str = ""
_id = self.config_entry.options.get(API_ID)
_key = self.config_entry.options.get(API_KEY)
if _wslink:
id_data = data.get("wsid", "")
key_data = data.get("wspw", "")
else:
id_data = data.get("ID", "")
key_data = data.get("PASSWORD", "")
# Check if we have valid auth data in the integration
if (_id := checked(self.config.options.get(API_ID), str)) is None:
_LOGGER.error("We don't have API ID set! Update your config!")
raise IncorrectDataError
if (_key := checked(self.config.options.get(API_KEY), str)) is None:
_LOGGER.error("We don't have API KEY set! Update your config!")
raise IncorrectDataError
if id_data != _id or key_data != _key:
_LOGGER.error("Unauthorised access!")
raise HTTPUnauthorized
if self.config_entry.options.get(WINDY_ENABLED):
response = await self.windy.push_data_to_windy(data)
if self.config.options.get(WINDY_ENABLED, False):
await self.windy.push_data_to_windy(data)
if self.config.options.get(POCASI_CZ_ENABLED):
if self.config.options.get(POCASI_CZ_ENABLED, False):
await self.pocasi.push_data_to_server(data, "WSLINK" if _wslink else "WU")
remaped_items = (
remap_wslink_items(data)
if self.config_entry.options.get(WSLINK)
else remap_items(data)
remaped_items: dict[str, str] = (
remap_wslink_items(data) if _wslink else remap_items(data)
)
if sensors := check_disabled(self.hass, remaped_items, self.config):
translate_sensors = [
await translations(
self.hass, DOMAIN, f"sensor.{t_key}", key="name", category="entity"
if sensors := check_disabled(remaped_items, self.config):
if (
translate_sensors := checked(
[
await translations(
self.hass,
DOMAIN,
f"sensor.{t_key}",
key="name",
category="entity",
)
for t_key in sensors
if await translations(
self.hass,
DOMAIN,
f"sensor.{t_key}",
key="name",
category="entity",
)
is not None
],
list[str],
)
for t_key in sensors
if await translations(
self.hass, DOMAIN, f"sensor.{t_key}", key="name", category="entity"
)
is not None
]
human_readable = "\n".join(translate_sensors)
) is not None:
human_readable: str = "\n".join(translate_sensors)
else:
human_readable = ""
await translated_notification(
self.hass,
@ -126,82 +156,42 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
if self.config_entry.options.get(DEV_DBG):
_LOGGER.info("Dev log: %s", anonymize(data))
response = response or "OK"
return aiohttp.web.Response(body=f"{response or 'OK'}", status=200)
return aiohttp.web.Response(body="OK", status=200)
def register_path(
hass: HomeAssistant,
url_path: str,
coordinator: WeatherDataUpdateCoordinator,
config: ConfigEntry,
):
"""Register path to handle incoming data."""
) -> bool:
"""Register paths to webhook."""
hass_data = hass.data.setdefault(DOMAIN, {})
debug = config.options.get(DEV_DBG)
_wslink = config.options.get(WSLINK, False)
hass.data.setdefault(DOMAIN, {})
if (hass_data := checked(hass.data[DOMAIN], dict[str, Any])) is None:
raise ConfigEntryNotReady
routes: Routes = hass_data.get("routes", Routes())
_wslink: bool = checked_or(config.options.get(WSLINK), bool, False)
if not routes.routes:
routes = Routes()
_LOGGER.info("Routes not found, creating new routes")
# Create internal route dispatcher with provided urls
routes: Routes = Routes()
routes.add_route(DEFAULT_URL, coordinator.recieved_data, enabled=not _wslink)
routes.add_route(WSLINK_URL, coordinator.recieved_data, enabled=_wslink)
if debug:
_LOGGER.debug("Enabled route is: %s, WSLink is %s", url_path, _wslink)
# Register webhooks in HomeAssistant with dispatcher
try:
_ = hass.http.app.router.add_get(DEFAULT_URL, routes.dispatch)
_ = hass.http.app.router.add_get(WSLINK_URL, routes.dispatch)
try:
default_route = hass.http.app.router.add_get(
DEFAULT_URL,
coordinator.recieved_data if not _wslink else unregistred,
name="weather_default_url",
)
if debug:
_LOGGER.debug("Default route: %s", default_route)
# Save initialised routes
hass_data["routes"] = routes
wslink_route = hass.http.app.router.add_get(
WSLINK_URL,
coordinator.recieved_data if _wslink else unregistred,
name="weather_wslink_url",
)
if debug:
_LOGGER.debug("WSLink route: %s", wslink_route)
routes.add_route(
DEFAULT_URL,
default_route,
coordinator.recieved_data if not _wslink else unregistred,
not _wslink,
)
routes.add_route(
WSLINK_URL, wslink_route, coordinator.recieved_data, _wslink
)
hass_data["routes"] = routes
except RuntimeError as Ex: # pylint: disable=(broad-except)
if (
"Added route will never be executed, method GET is already registered"
in Ex.args
):
_LOGGER.info("Handler to URL (%s) already registred", url_path)
return False
_LOGGER.error("Unable to register URL handler! (%s)", Ex.args)
return False
_LOGGER.info(
"Registered path to handle weather data: %s",
routes.get_enabled(), # pylint: disable=used-before-assignment
except RuntimeError as Ex:
_LOGGER.critical(
"Routes cannot be added. Integration will not work as expected. %s", Ex
)
if _wslink:
routes.switch_route(coordinator.recieved_data, WSLINK_URL)
raise ConfigEntryNotReady from Ex
else:
routes.switch_route(coordinator.recieved_data, DEFAULT_URL)
return routes
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
@ -212,21 +202,22 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass_data = hass.data.setdefault(DOMAIN, {})
hass_data[entry.entry_id] = coordinator
_wslink = entry.options.get(WSLINK)
debug = entry.options.get(DEV_DBG)
routes: Routes | None = hass_data.get("routes", None)
if debug:
_LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled")
_wslink = checked_or(entry.options.get(WSLINK), bool, False)
route = register_path(
hass, DEFAULT_URL if not _wslink else WSLINK_URL, coordinator, entry
)
_LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled")
if not route:
_LOGGER.error("Fatal: path not registered!")
raise PlatformNotReady
if routes:
_LOGGER.debug("We have routes registered, will try to switch dispatcher.")
routes.switch_route(DEFAULT_URL if not _wslink else WSLINK_URL)
_LOGGER.debug("%s", routes.show_enabled())
else:
routes_enabled = register_path(hass, coordinator, entry)
hass_data["route"] = route
if not routes_enabled:
_LOGGER.error("Fatal: path not registered!")
raise PlatformNotReady
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@ -238,7 +229,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def update_listener(hass: HomeAssistant, entry: ConfigEntry):
"""Update setup listener."""
await hass.config_entries.async_reload(entry.entry_id)
_ = await hass.config_entries.async_reload(entry.entry_id)
_LOGGER.info("Settings updated")

View File

@ -6,7 +6,12 @@ from typing import Any
import voluptuous as vol
from yarl import URL
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult, OptionsFlow
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
)
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.network import get_url
@ -59,9 +64,9 @@ class ConfigOptionsFlowHandler(OptionsFlow):
self.ecowitt: dict[str, Any] = {}
self.ecowitt_schema = {}
@property
def config_entry(self):
return self.hass.config_entries.async_get_entry(self.handler)
# @property
# def config_entry(self) -> ConfigEntry:
# return self.hass.config_entries.async_get_entry(self.handler)
async def _get_entry_data(self):
"""Get entry data."""
@ -151,9 +156,9 @@ class ConfigOptionsFlowHandler(OptionsFlow):
step_id="init", menu_options=["basic", "ecowitt", "windy", "pocasi"]
)
async def async_step_basic(self, user_input=None):
async def async_step_basic(self, user_input: Any = None):
"""Manage basic options - credentials."""
errors = {}
errors: dict[str, str] = {}
await self._get_entry_data()
@ -184,9 +189,9 @@ class ConfigOptionsFlowHandler(OptionsFlow):
errors=errors,
)
async def async_step_windy(self, user_input=None):
async def async_step_windy(self, user_input: Any = None):
"""Manage windy options."""
errors = {}
errors: dict[str, str] = {}
await self._get_entry_data()
@ -212,7 +217,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
async def async_step_pocasi(self, user_input: Any = None) -> ConfigFlowResult:
"""Handle the pocasi step."""
errors = {}
errors: dict[str, str] = {}
await self._get_entry_data()
@ -246,7 +251,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
async def async_step_ecowitt(self, user_input: Any = None) -> ConfigFlowResult:
"""Ecowitt stations setup."""
errors = {}
errors: dict[str, str] = {}
await self._get_entry_data()
if not (webhook := self.ecowitt.get(ECOWITT_WEBHOOK_ID)):
@ -308,7 +313,7 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input=None):
async def async_step_user(self, user_input: Any = None):
"""Handle the initial step."""
if user_input is None:
await self.async_set_unique_id(DOMAIN)
@ -319,7 +324,7 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
data_schema=vol.Schema(self.data_schema),
)
errors = {}
errors: dict[str, str] = {}
if user_input[API_ID] in INVALID_CREDENTIALS:
errors[API_ID] = "valid_credentials_api"
@ -340,6 +345,6 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(config_entry) -> ConfigOptionsFlowHandler:
def async_get_options_flow(config_entry: ConfigEntry) -> ConfigOptionsFlowHandler:
"""Get the options flow for this handler."""
return ConfigOptionsFlowHandler()

View File

@ -8,7 +8,7 @@
"homekit": {},
"iot_class": "local_push",
"issue_tracker": "https://github.com/schizza/SWS-12500-custom-component/issues",
"requirements": [],
"requirements": ["typecheck-runtime==0.2.0"],
"ssdp": [],
"version": "1.6.9",
"zeroconf": []

View File

@ -5,6 +5,7 @@ import logging
from typing import Any, Literal
from aiohttp import ClientError
from py_typecheck.core import checked
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -75,8 +76,20 @@ class PocasiPush:
"""Pushes weather data to server."""
_data = data.copy()
_api_id = self.config.options.get(POCASI_CZ_API_ID)
_api_key = self.config.options.get(POCASI_CZ_API_KEY)
if (_api_id := checked(self.config.options.get(POCASI_CZ_API_ID), str)) is None:
_LOGGER.error(
"No API ID is provided for Pocasi Meteo. Check your configuration."
)
return
if (
_api_key := checked(self.config.options.get(POCASI_CZ_API_KEY), str)
) is None:
_LOGGER.error(
"No API Key is provided for Pocasi Meteo. Check your configuration."
)
return
if self.log:
_LOGGER.info(
@ -91,7 +104,7 @@ class PocasiPush:
self._interval,
self.next_update,
)
return False
return
request_url: str = ""
if mode == "WSLINK":
@ -139,5 +152,3 @@ class PocasiPush:
if self.log:
_LOGGER.info("Next update: %s", str(self.next_update))
return None

View File

@ -1,77 +1,67 @@
"""Store routes info."""
"""Routes implementation."""
from collections.abc import Callable
from dataclasses import dataclass
from logging import getLogger
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
import logging
from aiohttp.web import AbstractRoute, Response
from aiohttp.web import Request, Response
_LOGGER = getLogger(__name__)
_LOGGER = logging.getLogger(__name__)
Handler = Callable[[Request], Awaitable[Response]]
@dataclass
class Route:
"""Store route info."""
class RouteInfo:
"""Route struct."""
url_path: str
route: AbstractRoute
handler: Callable
handler: Handler
enabled: bool = False
def __str__(self):
"""Return string representation."""
return f"{self.url_path} -> {self.handler}"
fallback: Handler = field(default_factory=lambda: unregistred)
class Routes:
"""Store routes info."""
"""Routes class."""
def __init__(self) -> None:
"""Initialize routes."""
self.routes = {}
"""Init."""
self.routes: dict[str, RouteInfo] = {}
def switch_route(self, coordinator: Callable, url_path: str):
"""Switch route."""
async def dispatch(self, request: Request) -> Response:
"""Dispatch."""
info = self.routes.get(request.path)
if not info:
_LOGGER.debug("Route %s is not registered!", request.path)
return await unregistred(request)
handler = info.handler if info.enabled else info.fallback
return await handler(request)
for url, route in self.routes.items():
if url == url_path:
_LOGGER.info("New coordinator to route: %s", route.url_path)
route.enabled = True
route.handler = coordinator
route.route._handler = coordinator # noqa: SLF001
else:
route.enabled = False
route.handler = unregistred
route.route._handler = unregistred # noqa: SLF001
def switch_route(self, url_path: str) -> None:
"""Switch route to new handler."""
for path, info in self.routes.items():
info.enabled = path == url_path
def add_route(
self,
url_path: str,
route: AbstractRoute,
handler: Callable,
enabled: bool = False,
):
"""Add route."""
self.routes[url_path] = Route(url_path, route, handler, enabled)
self, url_path: str, handler: Handler, *, enabled: bool = False
) -> None:
"""Add route to dispatcher."""
def get_route(self, url_path: str) -> Route:
"""Get route."""
return self.routes.get(url_path, Route)
self.routes[url_path] = RouteInfo(url_path, handler, enabled=enabled)
_LOGGER.debug("Registered dispatcher for route %s", url_path)
def get_enabled(self) -> str:
"""Get enabled routes."""
enabled_routes = [
route.url_path for route in self.routes.values() if route.enabled
]
return "".join(enabled_routes) if enabled_routes else "None"
def __str__(self):
"""Return string representation."""
return "\n".join([str(route) for route in self.routes.values()])
def show_enabled(self) -> str:
"""Show info of enabled route."""
for url, route in self.routes.items():
if route.enabled:
return (
f"Dispatcher enabled for URL: {url}, with handler: {route.handler}"
)
return "No routes is enabled."
async def unregistred(*args, **kwargs):
"""Unregister path to handle incoming data."""
_LOGGER.error("Recieved data to unregistred webhook. Check your settings")
return Response(body=f"{'Unregistred webhook.'}", status=404)
async def unregistred(request: Request) -> Response:
"""Return unregistred error."""
_ = request
_LOGGER.debug("Received data to unregistred or disabled webhook.")
return Response(text="Unregistred webhook. Check your settings.", status=400)

View File

@ -2,11 +2,12 @@
import logging
import math
from pathlib import Path
import sqlite3
from typing import Any
from multiprocessing import Value
from typing import Any, cast
import numpy as np
from py_typecheck import checked
from py_typecheck.core import checked_or
from homeassistant.components import persistent_notification
from homeassistant.config_entries import ConfigEntry
@ -15,7 +16,6 @@ from homeassistant.helpers.translation import async_get_translations
from .const import (
AZIMUT,
DATABASE_PATH,
DEV_DBG,
OUTSIDE_HUMIDITY,
OUTSIDE_TEMP,
@ -37,19 +37,19 @@ async def translations(
*,
key: str = "message",
category: str = "notify",
) -> str:
) -> str | None:
"""Get translated keys for domain."""
localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}"
language = hass.config.language
language: str = hass.config.language
_translations = await async_get_translations(
hass, language, category, [translation_domain]
)
if localize_key in _translations:
return _translations[localize_key]
return ""
return None
async def translated_notification(
@ -70,7 +70,7 @@ async def translated_notification(
f"component.{translation_domain}.{category}.{translation_key}.title"
)
language = hass.config.language
language: str = cast("str", hass.config.language)
_translations = await async_get_translations(
hass, language, category, [translation_domain]
@ -91,7 +91,10 @@ async def translated_notification(
async def update_options(
hass: HomeAssistant, entry: ConfigEntry, update_key, update_value
hass: HomeAssistant,
entry: ConfigEntry,
update_key: str,
update_value: str | list[str] | bool,
) -> bool:
"""Update config.options entry."""
conf = {**entry.options}
@ -100,46 +103,43 @@ async def update_options(
return hass.config_entries.async_update_entry(entry, options=conf)
def anonymize(data):
def anonymize(
data: dict[str, str | int | float | bool],
) -> dict[str, str | int | float | bool]:
"""Anoynimize recieved data."""
anonym = {}
for k in data:
if k not in {"ID", "PASSWORD", "wsid", "wspw"}:
anonym[k] = data[k]
return anonym
anonym: dict[str, str] = {}
return {
anonym[key]: value
for key, value in data.items()
if key not in {"ID", "PASSWORD", "wsid", "wspw"}
}
def remap_items(entities):
def remap_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap items in query."""
items = {}
for item in entities:
if item in REMAP_ITEMS:
items[REMAP_ITEMS[item]] = entities[item]
return items
return {
REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS
}
def remap_wslink_items(entities):
def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap items in query for WSLink API."""
items = {}
for item in entities:
if item in REMAP_WSLINK_ITEMS:
items[REMAP_WSLINK_ITEMS[item]] = entities[item]
return items
return {
REMAP_WSLINK_ITEMS[key]: value
for key, value in entities.items()
if key in REMAP_WSLINK_ITEMS
}
def loaded_sensors(config_entry: ConfigEntry) -> list | None:
def loaded_sensors(config_entry: ConfigEntry) -> list[str]:
"""Get loaded sensors."""
return config_entry.options.get(SENSORS_TO_LOAD) or []
def check_disabled(
hass: HomeAssistant, items, config_entry: ConfigEntry
) -> list | None:
items: dict[str, str], config_entry: ConfigEntry
) -> list[str] | None:
"""Check if we have data for unloaded sensors.
If so, then add sensor to load queue.
@ -147,10 +147,11 @@ def check_disabled(
Returns list of found sensors or None
"""
log: bool = config_entry.options.get(DEV_DBG, False)
log = checked_or(config_entry.options.get(DEV_DBG), bool, False)
entityFound: bool = False
_loaded_sensors = loaded_sensors(config_entry)
missing_sensors: list = []
_loaded_sensors: list[str] = loaded_sensors(config_entry)
missing_sensors: list[str] = []
for item in items:
if log:
@ -177,8 +178,8 @@ def wind_dir_to_text(deg: float) -> UnitOfDir | None:
return None
def battery_level_to_text(battery: int) -> UnitOfBat:
"""Return battery level in text representation.
def battery_level(battery: int) -> UnitOfBat:
"""Return battery level.
Returns UnitOfBat
"""
@ -188,10 +189,10 @@ def battery_level_to_text(battery: int) -> UnitOfBat:
1: UnitOfBat.NORMAL,
}
if battery is None:
if (v := checked(battery, int)) is None:
return UnitOfBat.UNKNOWN
return level_map.get(int(battery), UnitOfBat.UNKNOWN)
return level_map.get(v, UnitOfBat.UNKNOWN)
def battery_level_to_icon(battery: UnitOfBat) -> str:
@ -218,21 +219,40 @@ def celsius_to_fahrenheit(celsius: float) -> float:
return celsius * 9.0 / 5.0 + 32
def heat_index(data: Any, convert: bool = False) -> float | None:
def _to_float(val: Any) -> float | None:
"""Convert int or string to float."""
if not val:
return None
try:
v = float(val)
except (TypeError, ValueError):
return None
else:
return v
def heat_index(
data: dict[str, int | float | str], convert: bool = False
) -> float | None:
"""Calculate heat index from temperature.
data: dict with temperature and humidity
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
temp = data.get(OUTSIDE_TEMP, None)
rh = data.get(OUTSIDE_HUMIDITY, None)
if not temp or not rh:
if (temp := _to_float(data.get(OUTSIDE_TEMP))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
return None
temp = float(temp)
rh = float(rh)
if (rh := _to_float(data.get(OUTSIDE_HUMIDITY))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE HUMIDITY (%s), cannot calculate wind chill index.",
rh,
)
return None
adjustment = None
@ -263,21 +283,30 @@ def heat_index(data: Any, convert: bool = False) -> float | None:
return simple
def chill_index(data: Any, convert: bool = False) -> float | None:
def chill_index(
data: dict[str, str | float | int], convert: bool = False
) -> float | None:
"""Calculate wind chill index from temperature and wind speed.
data: dict with temperature and wind speed
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
temp = _to_float(data.get(OUTSIDE_TEMP))
wind = _to_float(data.get(WIND_SPEED))
temp = data.get(OUTSIDE_TEMP, None)
wind = data.get(WIND_SPEED, None)
if not temp or not wind:
if temp is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
return None
temp = float(temp)
wind = float(wind)
if wind is None:
_LOGGER.error(
"We are missing/invalid WIND SPEED (%s), cannot calculate wind chill index.",
wind,
)
return None
if convert:
temp = celsius_to_fahrenheit(temp)
@ -294,109 +323,3 @@ def chill_index(data: Any, convert: bool = False) -> float | None:
if temp < 50 and wind > 3
else temp
)
def long_term_units_in_statistics_meta():
"""Get units in long term statitstics."""
sensor_units = []
if not Path(DATABASE_PATH).exists():
_LOGGER.error("Database file not found: %s", DATABASE_PATH)
return False
conn = sqlite3.connect(DATABASE_PATH)
db = conn.cursor()
try:
db.execute(
"""
SELECT statistic_id, unit_of_measurement from statistics_meta
WHERE statistic_id LIKE 'sensor.weather_station_sws%'
"""
)
rows = db.fetchall()
sensor_units = {
statistic_id: f"{statistic_id} ({unit})" for statistic_id, unit in rows
}
except sqlite3.Error as e:
_LOGGER.error("Error during data migration: %s", e)
finally:
conn.close()
return sensor_units
async def migrate_data(hass: HomeAssistant, sensor_id: str | None = None) -> int | bool:
"""Migrate data from mm/d to mm."""
_LOGGER.debug("Sensor %s is required for data migration", sensor_id)
updated_rows = 0
if not Path(DATABASE_PATH).exists():
_LOGGER.error("Database file not found: %s", DATABASE_PATH)
return False
conn = sqlite3.connect(DATABASE_PATH)
db = conn.cursor()
try:
_LOGGER.info(sensor_id)
db.execute(
"""
UPDATE statistics_meta
SET unit_of_measurement = 'mm'
WHERE statistic_id = ?
AND unit_of_measurement = 'mm/d';
""",
(sensor_id,),
)
updated_rows = db.rowcount
conn.commit()
_LOGGER.info(
"Data migration completed successfully. Updated rows: %s for %s",
updated_rows,
sensor_id,
)
except sqlite3.Error as e:
_LOGGER.error("Error during data migration: %s", e)
finally:
conn.close()
return updated_rows
def migrate_data_old(sensor_id: str | None = None):
"""Migrate data from mm/d to mm."""
updated_rows = 0
if not Path(DATABASE_PATH).exists():
_LOGGER.error("Database file not found: %s", DATABASE_PATH)
return False
conn = sqlite3.connect(DATABASE_PATH)
db = conn.cursor()
try:
_LOGGER.info(sensor_id)
db.execute(
"""
UPDATE statistics_meta
SET unit_of_measurement = 'mm'
WHERE statistic_id = ?
AND unit_of_measurement = 'mm/d';
""",
(sensor_id,),
)
updated_rows = db.rowcount
conn.commit()
_LOGGER.info(
"Data migration completed successfully. Updated rows: %s for %s",
updated_rows,
sensor_id,
)
except sqlite3.Error as e:
_LOGGER.error("Error during data migration: %s", e)
finally:
conn.close()
return updated_rows

View File

@ -2,8 +2,10 @@
from datetime import datetime, timedelta
import logging
from typing import Final
from aiohttp.client_exceptions import ClientError
from py_typecheck.core import checked
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -52,22 +54,22 @@ class WindyPush:
def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Init."""
self.hass = hass
self.config = config
self.hass: Final = hass
self.config: Final = config
""" lets wait for 1 minute to get initial data from station
and then try to push first data to Windy
"""
self.last_update = datetime.now()
self.next_update = datetime.now() + timed(minutes=1)
self.last_update: datetime = datetime.now()
self.next_update: datetime = datetime.now() + timed(minutes=1)
self.log = self.config.options.get(WINDY_LOGGER_ENABLED)
self.invalid_response_count = 0
self.log: bool = self.config.options.get(WINDY_LOGGER_ENABLED, False)
self.invalid_response_count: int = 0
def verify_windy_response( # pylint: disable=useless-return
self,
response: str,
) -> WindyNotInserted | WindySuccess | WindyApiKeyError | None:
):
"""Verify answer form Windy."""
if self.log:
@ -85,9 +87,7 @@ class WindyPush:
if "Unauthorized" in response:
raise WindyApiKeyError
return None
async def push_data_to_windy(self, data):
async def push_data_to_windy(self, data: dict[str, str]) -> bool:
"""Pushes weather data do Windy stations.
Interval is 5 minutes, otherwise Windy would not accepts data.
@ -96,8 +96,6 @@ class WindyPush:
from station. But we need to do some clean up.
"""
text_for_test = None
if self.log:
_LOGGER.info(
"Windy last update = %s, next update at: %s",
@ -112,13 +110,18 @@ class WindyPush:
for purge in PURGE_DATA:
if purge in purged_data:
purged_data.pop(purge)
_ = purged_data.pop(purge)
if "dewptf" in purged_data:
dewpoint = round(((float(purged_data.pop("dewptf")) - 32) / 1.8), 1)
purged_data["dewpoint"] = str(dewpoint)
windy_api_key = self.config.options.get(WINDY_API_KEY)
if (
windy_api_key := checked(self.config.options.get(WINDY_API_KEY), str)
) is None:
_LOGGER.error("Windy API key is not provided! Check your configuration.")
return False
request_url = f"{WINDY_URL}{windy_api_key}"
if self.log:
@ -133,27 +136,33 @@ class WindyPush:
# log despite of settings
_LOGGER.error(WINDY_NOT_INSERTED)
text_for_test = WINDY_NOT_INSERTED
except WindyApiKeyError:
# log despite of settings
_LOGGER.critical(WINDY_INVALID_KEY)
text_for_test = WINDY_INVALID_KEY
await update_options(self.hass, self.config, WINDY_ENABLED, False)
if not (
await update_options(
self.hass, self.config, WINDY_ENABLED, False
)
):
_LOGGER.debug("Failed to set Windy option to false.")
except WindySuccess:
if self.log:
_LOGGER.info(WINDY_SUCCESS)
text_for_test = WINDY_SUCCESS
else:
if self.log:
_LOGGER.debug(WINDY_NOT_INSERTED)
except ClientError as ex:
_LOGGER.critical("Invalid response from Windy: %s", str(ex))
self.invalid_response_count += 1
if self.invalid_response_count > 3:
_LOGGER.critical(WINDY_UNEXPECTED)
text_for_test = WINDY_UNEXPECTED
await update_options(self.hass, self.config, WINDY_ENABLED, False)
if not await update_options(
self.hass, self.config, WINDY_ENABLED, False
):
_LOGGER.debug("Failed to set Windy options to false.")
self.last_update = datetime.now()
self.next_update = self.last_update + timed(minutes=5)
@ -161,6 +170,4 @@ class WindyPush:
if self.log:
_LOGGER.info("Next update: %s", str(self.next_update))
if RESPONSE_FOR_TEST and text_for_test:
return text_for_test
return None
return True