diff --git a/custom_components/sws12500/__init__.py b/custom_components/sws12500/__init__.py index 811ac75..c710ceb 100644 --- a/custom_components/sws12500/__init__.py +++ b/custom_components/sws12500/__init__.py @@ -1,14 +1,20 @@ """The Sencor SWS 12500 Weather Station integration.""" import logging +from typing import Any import aiohttp.web from aiohttp.web_exceptions import HTTPUnauthorized +from py_typecheck import checked, checked_or from homeassistant.config_entries import ConfigEntry from homeassistant.const import Platform from homeassistant.core import HomeAssistant -from homeassistant.exceptions import InvalidStateError, PlatformNotReady +from homeassistant.exceptions import ( + ConfigEntryNotReady, + InvalidStateError, + PlatformNotReady, +) from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from .const import ( @@ -24,7 +30,7 @@ from .const import ( WSLINK_URL, ) from .pocasti_cz import PocasiPush -from .routes import Routes, unregistred +from .routes import Routes from .utils import ( anonymize, check_disabled, @@ -50,19 +56,20 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator): def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None: """Init global updater.""" - self.hass = hass - self.config = config - self.windy = WindyPush(hass, config) + self.hass: HomeAssistant = hass + self.config: ConfigEntry = config + self.windy: WindyPush = WindyPush(hass, config) self.pocasi: PocasiPush = PocasiPush(hass, config) super().__init__(hass, _LOGGER, name=DOMAIN) - async def recieved_data(self, webdata): + async def recieved_data(self, webdata: aiohttp.web.Request) -> aiohttp.web.Response: """Handle incoming data query.""" - _wslink = self.config_entry.options.get(WSLINK) - data = webdata.query - response = None + _wslink: bool = checked_or(self.config.options.get(WSLINK), bool, False) + data: dict[str, Any] = dict(webdata.query) + + # Check if station is sending auth data if not _wslink and ("ID" not in data or "PASSWORD" not in data): _LOGGER.error("Invalid request. No security data provided!") raise HTTPUnauthorized @@ -71,44 +78,67 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator): _LOGGER.error("Invalid request. No security data provided!") raise HTTPUnauthorized - if _wslink: - id_data = data["wsid"] - key_data = data["wspw"] - else: - id_data = data["ID"] - key_data = data["PASSWORD"] + id_data: str = "" + key_data: str = "" - _id = self.config_entry.options.get(API_ID) - _key = self.config_entry.options.get(API_KEY) + if _wslink: + id_data = data.get("wsid", "") + key_data = data.get("wspw", "") + else: + id_data = data.get("ID", "") + key_data = data.get("PASSWORD", "") + + # Check if we have valid auth data in the integration + + if (_id := checked(self.config.options.get(API_ID), str)) is None: + _LOGGER.error("We don't have API ID set! Update your config!") + raise IncorrectDataError + + if (_key := checked(self.config.options.get(API_KEY), str)) is None: + _LOGGER.error("We don't have API KEY set! Update your config!") + raise IncorrectDataError if id_data != _id or key_data != _key: _LOGGER.error("Unauthorised access!") raise HTTPUnauthorized - if self.config_entry.options.get(WINDY_ENABLED): - response = await self.windy.push_data_to_windy(data) + if self.config.options.get(WINDY_ENABLED, False): + await self.windy.push_data_to_windy(data) - if self.config.options.get(POCASI_CZ_ENABLED): + if self.config.options.get(POCASI_CZ_ENABLED, False): await self.pocasi.push_data_to_server(data, "WSLINK" if _wslink else "WU") - remaped_items = ( - remap_wslink_items(data) - if self.config_entry.options.get(WSLINK) - else remap_items(data) + remaped_items: dict[str, str] = ( + remap_wslink_items(data) if _wslink else remap_items(data) ) - if sensors := check_disabled(self.hass, remaped_items, self.config): - translate_sensors = [ - await translations( - self.hass, DOMAIN, f"sensor.{t_key}", key="name", category="entity" + if sensors := check_disabled(remaped_items, self.config): + if ( + translate_sensors := checked( + [ + await translations( + self.hass, + DOMAIN, + f"sensor.{t_key}", + key="name", + category="entity", + ) + for t_key in sensors + if await translations( + self.hass, + DOMAIN, + f"sensor.{t_key}", + key="name", + category="entity", + ) + is not None + ], + list[str], ) - for t_key in sensors - if await translations( - self.hass, DOMAIN, f"sensor.{t_key}", key="name", category="entity" - ) - is not None - ] - human_readable = "\n".join(translate_sensors) + ) is not None: + human_readable: str = "\n".join(translate_sensors) + else: + human_readable = "" await translated_notification( self.hass, @@ -126,82 +156,42 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator): if self.config_entry.options.get(DEV_DBG): _LOGGER.info("Dev log: %s", anonymize(data)) - response = response or "OK" - return aiohttp.web.Response(body=f"{response or 'OK'}", status=200) + return aiohttp.web.Response(body="OK", status=200) def register_path( hass: HomeAssistant, - url_path: str, coordinator: WeatherDataUpdateCoordinator, config: ConfigEntry, -): - """Register path to handle incoming data.""" +) -> bool: + """Register paths to webhook.""" - hass_data = hass.data.setdefault(DOMAIN, {}) - debug = config.options.get(DEV_DBG) - _wslink = config.options.get(WSLINK, False) + hass.data.setdefault(DOMAIN, {}) + if (hass_data := checked(hass.data[DOMAIN], dict[str, Any])) is None: + raise ConfigEntryNotReady - routes: Routes = hass_data.get("routes", Routes()) + _wslink: bool = checked_or(config.options.get(WSLINK), bool, False) - if not routes.routes: - routes = Routes() - _LOGGER.info("Routes not found, creating new routes") + # Create internal route dispatcher with provided urls + routes: Routes = Routes() + routes.add_route(DEFAULT_URL, coordinator.recieved_data, enabled=not _wslink) + routes.add_route(WSLINK_URL, coordinator.recieved_data, enabled=_wslink) - if debug: - _LOGGER.debug("Enabled route is: %s, WSLink is %s", url_path, _wslink) + # Register webhooks in HomeAssistant with dispatcher + try: + _ = hass.http.app.router.add_get(DEFAULT_URL, routes.dispatch) + _ = hass.http.app.router.add_get(WSLINK_URL, routes.dispatch) - try: - default_route = hass.http.app.router.add_get( - DEFAULT_URL, - coordinator.recieved_data if not _wslink else unregistred, - name="weather_default_url", - ) - if debug: - _LOGGER.debug("Default route: %s", default_route) + # Save initialised routes + hass_data["routes"] = routes - wslink_route = hass.http.app.router.add_get( - WSLINK_URL, - coordinator.recieved_data if _wslink else unregistred, - name="weather_wslink_url", - ) - if debug: - _LOGGER.debug("WSLink route: %s", wslink_route) - - routes.add_route( - DEFAULT_URL, - default_route, - coordinator.recieved_data if not _wslink else unregistred, - not _wslink, - ) - routes.add_route( - WSLINK_URL, wslink_route, coordinator.recieved_data, _wslink - ) - - hass_data["routes"] = routes - - except RuntimeError as Ex: # pylint: disable=(broad-except) - if ( - "Added route will never be executed, method GET is already registered" - in Ex.args - ): - _LOGGER.info("Handler to URL (%s) already registred", url_path) - return False - - _LOGGER.error("Unable to register URL handler! (%s)", Ex.args) - return False - - _LOGGER.info( - "Registered path to handle weather data: %s", - routes.get_enabled(), # pylint: disable=used-before-assignment + except RuntimeError as Ex: + _LOGGER.critical( + "Routes cannot be added. Integration will not work as expected. %s", Ex ) - - if _wslink: - routes.switch_route(coordinator.recieved_data, WSLINK_URL) + raise ConfigEntryNotReady from Ex else: - routes.switch_route(coordinator.recieved_data, DEFAULT_URL) - - return routes + return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: @@ -212,21 +202,22 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: hass_data = hass.data.setdefault(DOMAIN, {}) hass_data[entry.entry_id] = coordinator - _wslink = entry.options.get(WSLINK) - debug = entry.options.get(DEV_DBG) + routes: Routes | None = hass_data.get("routes", None) - if debug: - _LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled") + _wslink = checked_or(entry.options.get(WSLINK), bool, False) - route = register_path( - hass, DEFAULT_URL if not _wslink else WSLINK_URL, coordinator, entry - ) + _LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled") - if not route: - _LOGGER.error("Fatal: path not registered!") - raise PlatformNotReady + if routes: + _LOGGER.debug("We have routes registered, will try to switch dispatcher.") + routes.switch_route(DEFAULT_URL if not _wslink else WSLINK_URL) + _LOGGER.debug("%s", routes.show_enabled()) + else: + routes_enabled = register_path(hass, coordinator, entry) - hass_data["route"] = route + if not routes_enabled: + _LOGGER.error("Fatal: path not registered!") + raise PlatformNotReady await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) @@ -238,7 +229,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def update_listener(hass: HomeAssistant, entry: ConfigEntry): """Update setup listener.""" - await hass.config_entries.async_reload(entry.entry_id) + _ = await hass.config_entries.async_reload(entry.entry_id) _LOGGER.info("Settings updated") diff --git a/custom_components/sws12500/config_flow.py b/custom_components/sws12500/config_flow.py index c72f02a..8fbaec2 100644 --- a/custom_components/sws12500/config_flow.py +++ b/custom_components/sws12500/config_flow.py @@ -6,7 +6,12 @@ from typing import Any import voluptuous as vol from yarl import URL -from homeassistant.config_entries import ConfigFlow, ConfigFlowResult, OptionsFlow +from homeassistant.config_entries import ( + ConfigEntry, + ConfigFlow, + ConfigFlowResult, + OptionsFlow, +) from homeassistant.core import callback from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.network import get_url @@ -59,9 +64,9 @@ class ConfigOptionsFlowHandler(OptionsFlow): self.ecowitt: dict[str, Any] = {} self.ecowitt_schema = {} - @property - def config_entry(self): - return self.hass.config_entries.async_get_entry(self.handler) + # @property + # def config_entry(self) -> ConfigEntry: + # return self.hass.config_entries.async_get_entry(self.handler) async def _get_entry_data(self): """Get entry data.""" @@ -151,9 +156,9 @@ class ConfigOptionsFlowHandler(OptionsFlow): step_id="init", menu_options=["basic", "ecowitt", "windy", "pocasi"] ) - async def async_step_basic(self, user_input=None): + async def async_step_basic(self, user_input: Any = None): """Manage basic options - credentials.""" - errors = {} + errors: dict[str, str] = {} await self._get_entry_data() @@ -184,9 +189,9 @@ class ConfigOptionsFlowHandler(OptionsFlow): errors=errors, ) - async def async_step_windy(self, user_input=None): + async def async_step_windy(self, user_input: Any = None): """Manage windy options.""" - errors = {} + errors: dict[str, str] = {} await self._get_entry_data() @@ -212,7 +217,7 @@ class ConfigOptionsFlowHandler(OptionsFlow): async def async_step_pocasi(self, user_input: Any = None) -> ConfigFlowResult: """Handle the pocasi step.""" - errors = {} + errors: dict[str, str] = {} await self._get_entry_data() @@ -246,7 +251,7 @@ class ConfigOptionsFlowHandler(OptionsFlow): async def async_step_ecowitt(self, user_input: Any = None) -> ConfigFlowResult: """Ecowitt stations setup.""" - errors = {} + errors: dict[str, str] = {} await self._get_entry_data() if not (webhook := self.ecowitt.get(ECOWITT_WEBHOOK_ID)): @@ -308,7 +313,7 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN): VERSION = 1 - async def async_step_user(self, user_input=None): + async def async_step_user(self, user_input: Any = None): """Handle the initial step.""" if user_input is None: await self.async_set_unique_id(DOMAIN) @@ -319,7 +324,7 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN): data_schema=vol.Schema(self.data_schema), ) - errors = {} + errors: dict[str, str] = {} if user_input[API_ID] in INVALID_CREDENTIALS: errors[API_ID] = "valid_credentials_api" @@ -340,6 +345,6 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN): @staticmethod @callback - def async_get_options_flow(config_entry) -> ConfigOptionsFlowHandler: + def async_get_options_flow(config_entry: ConfigEntry) -> ConfigOptionsFlowHandler: """Get the options flow for this handler.""" return ConfigOptionsFlowHandler() diff --git a/custom_components/sws12500/manifest.json b/custom_components/sws12500/manifest.json index 0fff70b..9b55655 100644 --- a/custom_components/sws12500/manifest.json +++ b/custom_components/sws12500/manifest.json @@ -8,7 +8,7 @@ "homekit": {}, "iot_class": "local_push", "issue_tracker": "https://github.com/schizza/SWS-12500-custom-component/issues", - "requirements": [], + "requirements": ["typecheck-runtime==0.2.0"], "ssdp": [], "version": "1.6.9", "zeroconf": [] diff --git a/custom_components/sws12500/pocasti_cz.py b/custom_components/sws12500/pocasti_cz.py index 1df93ee..859de40 100644 --- a/custom_components/sws12500/pocasti_cz.py +++ b/custom_components/sws12500/pocasti_cz.py @@ -5,6 +5,7 @@ import logging from typing import Any, Literal from aiohttp import ClientError +from py_typecheck.core import checked from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant @@ -75,8 +76,20 @@ class PocasiPush: """Pushes weather data to server.""" _data = data.copy() - _api_id = self.config.options.get(POCASI_CZ_API_ID) - _api_key = self.config.options.get(POCASI_CZ_API_KEY) + + if (_api_id := checked(self.config.options.get(POCASI_CZ_API_ID), str)) is None: + _LOGGER.error( + "No API ID is provided for Pocasi Meteo. Check your configuration." + ) + return + + if ( + _api_key := checked(self.config.options.get(POCASI_CZ_API_KEY), str) + ) is None: + _LOGGER.error( + "No API Key is provided for Pocasi Meteo. Check your configuration." + ) + return if self.log: _LOGGER.info( @@ -91,7 +104,7 @@ class PocasiPush: self._interval, self.next_update, ) - return False + return request_url: str = "" if mode == "WSLINK": @@ -139,5 +152,3 @@ class PocasiPush: if self.log: _LOGGER.info("Next update: %s", str(self.next_update)) - - return None diff --git a/custom_components/sws12500/routes.py b/custom_components/sws12500/routes.py index 0a562e7..f110f8b 100644 --- a/custom_components/sws12500/routes.py +++ b/custom_components/sws12500/routes.py @@ -1,77 +1,67 @@ -"""Store routes info.""" +"""Routes implementation.""" -from collections.abc import Callable -from dataclasses import dataclass -from logging import getLogger +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +import logging -from aiohttp.web import AbstractRoute, Response +from aiohttp.web import Request, Response -_LOGGER = getLogger(__name__) +_LOGGER = logging.getLogger(__name__) + +Handler = Callable[[Request], Awaitable[Response]] @dataclass -class Route: - """Store route info.""" +class RouteInfo: + """Route struct.""" url_path: str - route: AbstractRoute - handler: Callable + handler: Handler enabled: bool = False - - def __str__(self): - """Return string representation.""" - return f"{self.url_path} -> {self.handler}" + fallback: Handler = field(default_factory=lambda: unregistred) class Routes: - """Store routes info.""" + """Routes class.""" def __init__(self) -> None: - """Initialize routes.""" - self.routes = {} + """Init.""" + self.routes: dict[str, RouteInfo] = {} - def switch_route(self, coordinator: Callable, url_path: str): - """Switch route.""" + async def dispatch(self, request: Request) -> Response: + """Dispatch.""" + info = self.routes.get(request.path) + if not info: + _LOGGER.debug("Route %s is not registered!") + return await unregistred(request) + handler = info.handler if info.enabled else info.fallback + return await handler(request) - for url, route in self.routes.items(): - if url == url_path: - _LOGGER.info("New coordinator to route: %s", route.url_path) - route.enabled = True - route.handler = coordinator - route.route._handler = coordinator # noqa: SLF001 - else: - route.enabled = False - route.handler = unregistred - route.route._handler = unregistred # noqa: SLF001 + def switch_route(self, url_path: str) -> None: + """Switch route to new handler.""" + for path, info in self.routes.items(): + info.enabled = path == url_path def add_route( - self, - url_path: str, - route: AbstractRoute, - handler: Callable, - enabled: bool = False, - ): - """Add route.""" - self.routes[url_path] = Route(url_path, route, handler, enabled) + self, url_path: str, handler: Handler, *, enabled: bool = False + ) -> None: + """Add route to dispatcher.""" - def get_route(self, url_path: str) -> Route: - """Get route.""" - return self.routes.get(url_path, Route) + self.routes[url_path] = RouteInfo(url_path, handler, enabled=enabled) + _LOGGER.debug("Registered dispatcher for route %s", url_path) - def get_enabled(self) -> str: - """Get enabled routes.""" - enabled_routes = [ - route.url_path for route in self.routes.values() if route.enabled - ] - return "".join(enabled_routes) if enabled_routes else "None" - - def __str__(self): - """Return string representation.""" - return "\n".join([str(route) for route in self.routes.values()]) + def show_enabled(self) -> str: + """Show info of enabled route.""" + for url, route in self.routes.items(): + if route.enabled: + return ( + f"Dispatcher enabled for URL: {url}, with handler: {route.handler}" + ) + return "No routes is enabled." -async def unregistred(*args, **kwargs): - """Unregister path to handle incoming data.""" - - _LOGGER.error("Recieved data to unregistred webhook. Check your settings") - return Response(body=f"{'Unregistred webhook.'}", status=404) +async def unregistred(request: Request) -> Response: + """Return unregistred error.""" + _ = request + _LOGGER.debug("Received data to unregistred or disabled webhook.") + return Response(text="Unregistred webhook. Check your settings.", status=400) diff --git a/custom_components/sws12500/utils.py b/custom_components/sws12500/utils.py index b6f54ac..46c507e 100644 --- a/custom_components/sws12500/utils.py +++ b/custom_components/sws12500/utils.py @@ -2,11 +2,11 @@ import logging import math -from pathlib import Path -import sqlite3 -from typing import Any +from typing import cast import numpy as np +from py_typecheck import checked +from py_typecheck.core import checked_or from homeassistant.components import persistent_notification from homeassistant.config_entries import ConfigEntry @@ -15,7 +15,6 @@ from homeassistant.helpers.translation import async_get_translations from .const import ( AZIMUT, - DATABASE_PATH, DEV_DBG, OUTSIDE_HUMIDITY, OUTSIDE_TEMP, @@ -37,19 +36,19 @@ async def translations( *, key: str = "message", category: str = "notify", -) -> str: +) -> str | None: """Get translated keys for domain.""" localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}" - language = hass.config.language + language: str = hass.config.language _translations = await async_get_translations( hass, language, category, [translation_domain] ) if localize_key in _translations: return _translations[localize_key] - return "" + return None async def translated_notification( @@ -70,7 +69,7 @@ async def translated_notification( f"component.{translation_domain}.{category}.{translation_key}.title" ) - language = hass.config.language + language: str = cast("str", hass.config.language) _translations = await async_get_translations( hass, language, category, [translation_domain] @@ -91,7 +90,10 @@ async def translated_notification( async def update_options( - hass: HomeAssistant, entry: ConfigEntry, update_key, update_value + hass: HomeAssistant, + entry: ConfigEntry, + update_key: str, + update_value: str | list[str] | bool, ) -> bool: """Update config.options entry.""" conf = {**entry.options} @@ -100,46 +102,43 @@ async def update_options( return hass.config_entries.async_update_entry(entry, options=conf) -def anonymize(data): +def anonymize( + data: dict[str, str | int | float | bool], +) -> dict[str, str | int | float | bool]: """Anoynimize recieved data.""" - - anonym = {} - for k in data: - if k not in {"ID", "PASSWORD", "wsid", "wspw"}: - anonym[k] = data[k] - - return anonym + anonym: dict[str, str] = {} + return { + anonym[key]: value + for key, value in data.items() + if key not in {"ID", "PASSWORD", "wsid", "wspw"} + } -def remap_items(entities): +def remap_items(entities: dict[str, str]) -> dict[str, str]: """Remap items in query.""" - items = {} - for item in entities: - if item in REMAP_ITEMS: - items[REMAP_ITEMS[item]] = entities[item] - - return items + return { + REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS + } -def remap_wslink_items(entities): +def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]: """Remap items in query for WSLink API.""" - items = {} - for item in entities: - if item in REMAP_WSLINK_ITEMS: - items[REMAP_WSLINK_ITEMS[item]] = entities[item] - - return items + return { + REMAP_WSLINK_ITEMS[key]: value + for key, value in entities.items() + if key in REMAP_WSLINK_ITEMS + } -def loaded_sensors(config_entry: ConfigEntry) -> list | None: +def loaded_sensors(config_entry: ConfigEntry) -> list[str]: """Get loaded sensors.""" return config_entry.options.get(SENSORS_TO_LOAD) or [] def check_disabled( - hass: HomeAssistant, items, config_entry: ConfigEntry -) -> list | None: + items: dict[str, str], config_entry: ConfigEntry +) -> list[str] | None: """Check if we have data for unloaded sensors. If so, then add sensor to load queue. @@ -147,10 +146,11 @@ def check_disabled( Returns list of found sensors or None """ - log: bool = config_entry.options.get(DEV_DBG, False) + log = checked_or(config_entry.options.get(DEV_DBG), bool, False) + entityFound: bool = False - _loaded_sensors = loaded_sensors(config_entry) - missing_sensors: list = [] + _loaded_sensors: list[str] = loaded_sensors(config_entry) + missing_sensors: list[str] = [] for item in items: if log: @@ -188,10 +188,10 @@ def battery_level_to_text(battery: int) -> UnitOfBat: 1: UnitOfBat.NORMAL, } - if battery is None: + if (v := checked(battery, int)) is None: return UnitOfBat.UNKNOWN - return level_map.get(int(battery), UnitOfBat.UNKNOWN) + return level_map.get(v, UnitOfBat.UNKNOWN) def battery_level_to_icon(battery: UnitOfBat) -> str: @@ -218,21 +218,21 @@ def celsius_to_fahrenheit(celsius: float) -> float: return celsius * 9.0 / 5.0 + 32 -def heat_index(data: Any, convert: bool = False) -> float | None: +def heat_index( + data: dict[str, int | float | str], convert: bool = False +) -> float | None: """Calculate heat index from temperature. data: dict with temperature and humidity convert: bool, convert recieved data from Celsius to Fahrenheit """ - - temp = data.get(OUTSIDE_TEMP, None) - rh = data.get(OUTSIDE_HUMIDITY, None) - - if not temp or not rh: + if (temp := checked(data.get(OUTSIDE_TEMP), float)) is None: + _LOGGER.error("We are missing OUTSIDE TEMP, cannot calculate heat index.") return None - temp = float(temp) - rh = float(rh) + if (rh := checked(data.get(OUTSIDE_HUMIDITY), float)) is None: + _LOGGER.error("We are missing OUTSIDE HUMIDITY, cannot calculate heat index.") + return None adjustment = None @@ -263,21 +263,20 @@ def heat_index(data: Any, convert: bool = False) -> float | None: return simple -def chill_index(data: Any, convert: bool = False) -> float | None: +def chill_index( + data: dict[str, str | float | int], convert: bool = False +) -> float | None: """Calculate wind chill index from temperature and wind speed. data: dict with temperature and wind speed convert: bool, convert recieved data from Celsius to Fahrenheit """ - - temp = data.get(OUTSIDE_TEMP, None) - wind = data.get(WIND_SPEED, None) - - if not temp or not wind: + if (temp := checked(data.get(OUTSIDE_TEMP), float)) is None: + _LOGGER.error("We are missing OUTSIDE TEMP, cannot calculate wind chill index.") + return None + if (wind := checked(data.get(WIND_SPEED), float)) is None: + _LOGGER.error("We are missing WIND SPEED, cannot calculate wind chill index.") return None - - temp = float(temp) - wind = float(wind) if convert: temp = celsius_to_fahrenheit(temp) @@ -294,109 +293,3 @@ def chill_index(data: Any, convert: bool = False) -> float | None: if temp < 50 and wind > 3 else temp ) - - -def long_term_units_in_statistics_meta(): - """Get units in long term statitstics.""" - sensor_units = [] - if not Path(DATABASE_PATH).exists(): - _LOGGER.error("Database file not found: %s", DATABASE_PATH) - return False - - conn = sqlite3.connect(DATABASE_PATH) - db = conn.cursor() - - try: - db.execute( - """ - SELECT statistic_id, unit_of_measurement from statistics_meta - WHERE statistic_id LIKE 'sensor.weather_station_sws%' - """ - ) - rows = db.fetchall() - sensor_units = { - statistic_id: f"{statistic_id} ({unit})" for statistic_id, unit in rows - } - - except sqlite3.Error as e: - _LOGGER.error("Error during data migration: %s", e) - finally: - conn.close() - - return sensor_units - - -async def migrate_data(hass: HomeAssistant, sensor_id: str | None = None) -> int | bool: - """Migrate data from mm/d to mm.""" - - _LOGGER.debug("Sensor %s is required for data migration", sensor_id) - updated_rows = 0 - - if not Path(DATABASE_PATH).exists(): - _LOGGER.error("Database file not found: %s", DATABASE_PATH) - return False - - conn = sqlite3.connect(DATABASE_PATH) - db = conn.cursor() - - try: - _LOGGER.info(sensor_id) - db.execute( - """ - UPDATE statistics_meta - SET unit_of_measurement = 'mm' - WHERE statistic_id = ? - AND unit_of_measurement = 'mm/d'; - """, - (sensor_id,), - ) - updated_rows = db.rowcount - conn.commit() - _LOGGER.info( - "Data migration completed successfully. Updated rows: %s for %s", - updated_rows, - sensor_id, - ) - - except sqlite3.Error as e: - _LOGGER.error("Error during data migration: %s", e) - finally: - conn.close() - return updated_rows - - -def migrate_data_old(sensor_id: str | None = None): - """Migrate data from mm/d to mm.""" - updated_rows = 0 - - if not Path(DATABASE_PATH).exists(): - _LOGGER.error("Database file not found: %s", DATABASE_PATH) - return False - - conn = sqlite3.connect(DATABASE_PATH) - db = conn.cursor() - - try: - _LOGGER.info(sensor_id) - db.execute( - """ - UPDATE statistics_meta - SET unit_of_measurement = 'mm' - WHERE statistic_id = ? - AND unit_of_measurement = 'mm/d'; - """, - (sensor_id,), - ) - updated_rows = db.rowcount - conn.commit() - _LOGGER.info( - "Data migration completed successfully. Updated rows: %s for %s", - updated_rows, - sensor_id, - ) - - except sqlite3.Error as e: - _LOGGER.error("Error during data migration: %s", e) - finally: - conn.close() - return updated_rows diff --git a/custom_components/sws12500/windy_func.py b/custom_components/sws12500/windy_func.py index 1bbb0e2..37a98b0 100644 --- a/custom_components/sws12500/windy_func.py +++ b/custom_components/sws12500/windy_func.py @@ -2,8 +2,10 @@ from datetime import datetime, timedelta import logging +from typing import Final from aiohttp.client_exceptions import ClientError +from py_typecheck.core import checked from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant @@ -52,22 +54,22 @@ class WindyPush: def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None: """Init.""" - self.hass = hass - self.config = config + self.hass: Final = hass + self.config: Final = config """ lets wait for 1 minute to get initial data from station and then try to push first data to Windy """ - self.last_update = datetime.now() - self.next_update = datetime.now() + timed(minutes=1) + self.last_update: datetime = datetime.now() + self.next_update: datetime = datetime.now() + timed(minutes=1) - self.log = self.config.options.get(WINDY_LOGGER_ENABLED) - self.invalid_response_count = 0 + self.log: bool = self.config.options.get(WINDY_LOGGER_ENABLED, False) + self.invalid_response_count: int = 0 def verify_windy_response( # pylint: disable=useless-return self, response: str, - ) -> WindyNotInserted | WindySuccess | WindyApiKeyError | None: + ): """Verify answer form Windy.""" if self.log: @@ -85,9 +87,7 @@ class WindyPush: if "Unauthorized" in response: raise WindyApiKeyError - return None - - async def push_data_to_windy(self, data): + async def push_data_to_windy(self, data: dict[str, str]) -> bool: """Pushes weather data do Windy stations. Interval is 5 minutes, otherwise Windy would not accepts data. @@ -96,8 +96,6 @@ class WindyPush: from station. But we need to do some clean up. """ - text_for_test = None - if self.log: _LOGGER.info( "Windy last update = %s, next update at: %s", @@ -112,13 +110,18 @@ class WindyPush: for purge in PURGE_DATA: if purge in purged_data: - purged_data.pop(purge) + _ = purged_data.pop(purge) if "dewptf" in purged_data: dewpoint = round(((float(purged_data.pop("dewptf")) - 32) / 1.8), 1) purged_data["dewpoint"] = str(dewpoint) - windy_api_key = self.config.options.get(WINDY_API_KEY) + if ( + windy_api_key := checked(self.config.options.get(WINDY_API_KEY), str) + ) is None: + _LOGGER.error("Windy API key is not provided! Check your configuration.") + return False + request_url = f"{WINDY_URL}{windy_api_key}" if self.log: @@ -133,27 +136,33 @@ class WindyPush: # log despite of settings _LOGGER.error(WINDY_NOT_INSERTED) - text_for_test = WINDY_NOT_INSERTED - except WindyApiKeyError: # log despite of settings _LOGGER.critical(WINDY_INVALID_KEY) - text_for_test = WINDY_INVALID_KEY - await update_options(self.hass, self.config, WINDY_ENABLED, False) + if not ( + await update_options( + self.hass, self.config, WINDY_ENABLED, False + ) + ): + _LOGGER.debug("Failed to set Windy option to false.") except WindySuccess: if self.log: _LOGGER.info(WINDY_SUCCESS) - text_for_test = WINDY_SUCCESS + else: + if self.log: + _LOGGER.debug(WINDY_NOT_INSERTED) except ClientError as ex: _LOGGER.critical("Invalid response from Windy: %s", str(ex)) self.invalid_response_count += 1 if self.invalid_response_count > 3: _LOGGER.critical(WINDY_UNEXPECTED) - text_for_test = WINDY_UNEXPECTED - await update_options(self.hass, self.config, WINDY_ENABLED, False) + if not await update_options( + self.hass, self.config, WINDY_ENABLED, False + ): + _LOGGER.debug("Failed to set Windy options to false.") self.last_update = datetime.now() self.next_update = self.last_update + timed(minutes=5) @@ -161,6 +170,4 @@ class WindyPush: if self.log: _LOGGER.info("Next update: %s", str(self.next_update)) - if RESPONSE_FOR_TEST and text_for_test: - return text_for_test - return None + return True