Compare commits

..

1 Commits

Author SHA1 Message Date
Lukas Svoboda 35c1eb9572
Merge 39cd852b36 into 9f36ab5d4c 2026-01-17 10:27:02 +01:00
7 changed files with 367 additions and 294 deletions

View File

@ -1,20 +1,14 @@
"""The Sencor SWS 12500 Weather Station integration."""
import logging
from typing import Any
import aiohttp.web
from aiohttp.web_exceptions import HTTPUnauthorized
from py_typecheck import checked, checked_or
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import (
ConfigEntryNotReady,
InvalidStateError,
PlatformNotReady,
)
from homeassistant.exceptions import InvalidStateError, PlatformNotReady
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
@ -30,7 +24,7 @@ from .const import (
WSLINK_URL,
)
from .pocasti_cz import PocasiPush
from .routes import Routes
from .routes import Routes, unregistred
from .utils import (
anonymize,
check_disabled,
@ -56,20 +50,19 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Init global updater."""
self.hass: HomeAssistant = hass
self.config: ConfigEntry = config
self.windy: WindyPush = WindyPush(hass, config)
self.hass = hass
self.config = config
self.windy = WindyPush(hass, config)
self.pocasi: PocasiPush = PocasiPush(hass, config)
super().__init__(hass, _LOGGER, name=DOMAIN)
async def recieved_data(self, webdata: aiohttp.web.Request) -> aiohttp.web.Response:
async def recieved_data(self, webdata):
"""Handle incoming data query."""
_wslink = self.config_entry.options.get(WSLINK)
data = webdata.query
_wslink: bool = checked_or(self.config.options.get(WSLINK), bool, False)
response = None
data: dict[str, Any] = dict(webdata.query)
# Check if station is sending auth data
if not _wslink and ("ID" not in data or "PASSWORD" not in data):
_LOGGER.error("Invalid request. No security data provided!")
raise HTTPUnauthorized
@ -78,67 +71,44 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
_LOGGER.error("Invalid request. No security data provided!")
raise HTTPUnauthorized
id_data: str = ""
key_data: str = ""
if _wslink:
id_data = data.get("wsid", "")
key_data = data.get("wspw", "")
id_data = data["wsid"]
key_data = data["wspw"]
else:
id_data = data.get("ID", "")
key_data = data.get("PASSWORD", "")
id_data = data["ID"]
key_data = data["PASSWORD"]
# Check if we have valid auth data in the integration
if (_id := checked(self.config.options.get(API_ID), str)) is None:
_LOGGER.error("We don't have API ID set! Update your config!")
raise IncorrectDataError
if (_key := checked(self.config.options.get(API_KEY), str)) is None:
_LOGGER.error("We don't have API KEY set! Update your config!")
raise IncorrectDataError
_id = self.config_entry.options.get(API_ID)
_key = self.config_entry.options.get(API_KEY)
if id_data != _id or key_data != _key:
_LOGGER.error("Unauthorised access!")
raise HTTPUnauthorized
if self.config.options.get(WINDY_ENABLED, False):
await self.windy.push_data_to_windy(data)
if self.config_entry.options.get(WINDY_ENABLED):
response = await self.windy.push_data_to_windy(data)
if self.config.options.get(POCASI_CZ_ENABLED, False):
if self.config.options.get(POCASI_CZ_ENABLED):
await self.pocasi.push_data_to_server(data, "WSLINK" if _wslink else "WU")
remaped_items: dict[str, str] = (
remap_wslink_items(data) if _wslink else remap_items(data)
remaped_items = (
remap_wslink_items(data)
if self.config_entry.options.get(WSLINK)
else remap_items(data)
)
if sensors := check_disabled(remaped_items, self.config):
if (
translate_sensors := checked(
[
await translations(
self.hass,
DOMAIN,
f"sensor.{t_key}",
key="name",
category="entity",
)
for t_key in sensors
if await translations(
self.hass,
DOMAIN,
f"sensor.{t_key}",
key="name",
category="entity",
)
is not None
],
list[str],
if sensors := check_disabled(self.hass, remaped_items, self.config):
translate_sensors = [
await translations(
self.hass, DOMAIN, f"sensor.{t_key}", key="name", category="entity"
)
) is not None:
human_readable: str = "\n".join(translate_sensors)
else:
human_readable = ""
for t_key in sensors
if await translations(
self.hass, DOMAIN, f"sensor.{t_key}", key="name", category="entity"
)
is not None
]
human_readable = "\n".join(translate_sensors)
await translated_notification(
self.hass,
@ -156,42 +126,82 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
if self.config_entry.options.get(DEV_DBG):
_LOGGER.info("Dev log: %s", anonymize(data))
return aiohttp.web.Response(body="OK", status=200)
response = response or "OK"
return aiohttp.web.Response(body=f"{response or 'OK'}", status=200)
def register_path(
hass: HomeAssistant,
url_path: str,
coordinator: WeatherDataUpdateCoordinator,
config: ConfigEntry,
) -> bool:
"""Register paths to webhook."""
):
"""Register path to handle incoming data."""
hass.data.setdefault(DOMAIN, {})
if (hass_data := checked(hass.data[DOMAIN], dict[str, Any])) is None:
raise ConfigEntryNotReady
hass_data = hass.data.setdefault(DOMAIN, {})
debug = config.options.get(DEV_DBG)
_wslink = config.options.get(WSLINK, False)
_wslink: bool = checked_or(config.options.get(WSLINK), bool, False)
routes: Routes = hass_data.get("routes", Routes())
# Create internal route dispatcher with provided urls
routes: Routes = Routes()
routes.add_route(DEFAULT_URL, coordinator.recieved_data, enabled=not _wslink)
routes.add_route(WSLINK_URL, coordinator.recieved_data, enabled=_wslink)
if not routes.routes:
routes = Routes()
_LOGGER.info("Routes not found, creating new routes")
# Register webhooks in HomeAssistant with dispatcher
try:
_ = hass.http.app.router.add_get(DEFAULT_URL, routes.dispatch)
_ = hass.http.app.router.add_get(WSLINK_URL, routes.dispatch)
if debug:
_LOGGER.debug("Enabled route is: %s, WSLink is %s", url_path, _wslink)
# Save initialised routes
hass_data["routes"] = routes
try:
default_route = hass.http.app.router.add_get(
DEFAULT_URL,
coordinator.recieved_data if not _wslink else unregistred,
name="weather_default_url",
)
if debug:
_LOGGER.debug("Default route: %s", default_route)
except RuntimeError as Ex:
_LOGGER.critical(
"Routes cannot be added. Integration will not work as expected. %s", Ex
wslink_route = hass.http.app.router.add_get(
WSLINK_URL,
coordinator.recieved_data if _wslink else unregistred,
name="weather_wslink_url",
)
if debug:
_LOGGER.debug("WSLink route: %s", wslink_route)
routes.add_route(
DEFAULT_URL,
default_route,
coordinator.recieved_data if not _wslink else unregistred,
not _wslink,
)
routes.add_route(
WSLINK_URL, wslink_route, coordinator.recieved_data, _wslink
)
hass_data["routes"] = routes
except RuntimeError as Ex: # pylint: disable=(broad-except)
if (
"Added route will never be executed, method GET is already registered"
in Ex.args
):
_LOGGER.info("Handler to URL (%s) already registred", url_path)
return False
_LOGGER.error("Unable to register URL handler! (%s)", Ex.args)
return False
_LOGGER.info(
"Registered path to handle weather data: %s",
routes.get_enabled(), # pylint: disable=used-before-assignment
)
raise ConfigEntryNotReady from Ex
if _wslink:
routes.switch_route(coordinator.recieved_data, WSLINK_URL)
else:
return True
routes.switch_route(coordinator.recieved_data, DEFAULT_URL)
return routes
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
@ -202,22 +212,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass_data = hass.data.setdefault(DOMAIN, {})
hass_data[entry.entry_id] = coordinator
routes: Routes | None = hass_data.get("routes", None)
_wslink = entry.options.get(WSLINK)
debug = entry.options.get(DEV_DBG)
_wslink = checked_or(entry.options.get(WSLINK), bool, False)
if debug:
_LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled")
_LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled")
route = register_path(
hass, DEFAULT_URL if not _wslink else WSLINK_URL, coordinator, entry
)
if routes:
_LOGGER.debug("We have routes registered, will try to switch dispatcher.")
routes.switch_route(DEFAULT_URL if not _wslink else WSLINK_URL)
_LOGGER.debug("%s", routes.show_enabled())
else:
routes_enabled = register_path(hass, coordinator, entry)
if not route:
_LOGGER.error("Fatal: path not registered!")
raise PlatformNotReady
if not routes_enabled:
_LOGGER.error("Fatal: path not registered!")
raise PlatformNotReady
hass_data["route"] = route
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@ -229,7 +238,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def update_listener(hass: HomeAssistant, entry: ConfigEntry):
"""Update setup listener."""
_ = await hass.config_entries.async_reload(entry.entry_id)
await hass.config_entries.async_reload(entry.entry_id)
_LOGGER.info("Settings updated")

View File

@ -6,12 +6,7 @@ from typing import Any
import voluptuous as vol
from yarl import URL
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
)
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult, OptionsFlow
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.network import get_url
@ -64,9 +59,9 @@ class ConfigOptionsFlowHandler(OptionsFlow):
self.ecowitt: dict[str, Any] = {}
self.ecowitt_schema = {}
# @property
# def config_entry(self) -> ConfigEntry:
# return self.hass.config_entries.async_get_entry(self.handler)
@property
def config_entry(self):
return self.hass.config_entries.async_get_entry(self.handler)
async def _get_entry_data(self):
"""Get entry data."""
@ -156,9 +151,9 @@ class ConfigOptionsFlowHandler(OptionsFlow):
step_id="init", menu_options=["basic", "ecowitt", "windy", "pocasi"]
)
async def async_step_basic(self, user_input: Any = None):
async def async_step_basic(self, user_input=None):
"""Manage basic options - credentials."""
errors: dict[str, str] = {}
errors = {}
await self._get_entry_data()
@ -189,9 +184,9 @@ class ConfigOptionsFlowHandler(OptionsFlow):
errors=errors,
)
async def async_step_windy(self, user_input: Any = None):
async def async_step_windy(self, user_input=None):
"""Manage windy options."""
errors: dict[str, str] = {}
errors = {}
await self._get_entry_data()
@ -217,7 +212,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
async def async_step_pocasi(self, user_input: Any = None) -> ConfigFlowResult:
"""Handle the pocasi step."""
errors: dict[str, str] = {}
errors = {}
await self._get_entry_data()
@ -251,7 +246,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
async def async_step_ecowitt(self, user_input: Any = None) -> ConfigFlowResult:
"""Ecowitt stations setup."""
errors: dict[str, str] = {}
errors = {}
await self._get_entry_data()
if not (webhook := self.ecowitt.get(ECOWITT_WEBHOOK_ID)):
@ -313,7 +308,7 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input: Any = None):
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
if user_input is None:
await self.async_set_unique_id(DOMAIN)
@ -324,7 +319,7 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
data_schema=vol.Schema(self.data_schema),
)
errors: dict[str, str] = {}
errors = {}
if user_input[API_ID] in INVALID_CREDENTIALS:
errors[API_ID] = "valid_credentials_api"
@ -345,6 +340,6 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> ConfigOptionsFlowHandler:
def async_get_options_flow(config_entry) -> ConfigOptionsFlowHandler:
"""Get the options flow for this handler."""
return ConfigOptionsFlowHandler()

View File

@ -8,7 +8,7 @@
"homekit": {},
"iot_class": "local_push",
"issue_tracker": "https://github.com/schizza/SWS-12500-custom-component/issues",
"requirements": ["typecheck-runtime==0.2.0"],
"requirements": [],
"ssdp": [],
"version": "1.6.9",
"zeroconf": []

View File

@ -5,7 +5,6 @@ import logging
from typing import Any, Literal
from aiohttp import ClientError
from py_typecheck.core import checked
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -76,20 +75,8 @@ class PocasiPush:
"""Pushes weather data to server."""
_data = data.copy()
if (_api_id := checked(self.config.options.get(POCASI_CZ_API_ID), str)) is None:
_LOGGER.error(
"No API ID is provided for Pocasi Meteo. Check your configuration."
)
return
if (
_api_key := checked(self.config.options.get(POCASI_CZ_API_KEY), str)
) is None:
_LOGGER.error(
"No API Key is provided for Pocasi Meteo. Check your configuration."
)
return
_api_id = self.config.options.get(POCASI_CZ_API_ID)
_api_key = self.config.options.get(POCASI_CZ_API_KEY)
if self.log:
_LOGGER.info(
@ -104,7 +91,7 @@ class PocasiPush:
self._interval,
self.next_update,
)
return
return False
request_url: str = ""
if mode == "WSLINK":
@ -152,3 +139,5 @@ class PocasiPush:
if self.log:
_LOGGER.info("Next update: %s", str(self.next_update))
return None

View File

@ -1,67 +1,77 @@
"""Routes implementation."""
"""Store routes info."""
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
import logging
from collections.abc import Callable
from dataclasses import dataclass
from logging import getLogger
from aiohttp.web import Request, Response
from aiohttp.web import AbstractRoute, Response
_LOGGER = logging.getLogger(__name__)
Handler = Callable[[Request], Awaitable[Response]]
_LOGGER = getLogger(__name__)
@dataclass
class RouteInfo:
"""Route struct."""
class Route:
"""Store route info."""
url_path: str
handler: Handler
route: AbstractRoute
handler: Callable
enabled: bool = False
fallback: Handler = field(default_factory=lambda: unregistred)
def __str__(self):
"""Return string representation."""
return f"{self.url_path} -> {self.handler}"
class Routes:
"""Routes class."""
"""Store routes info."""
def __init__(self) -> None:
"""Init."""
self.routes: dict[str, RouteInfo] = {}
"""Initialize routes."""
self.routes = {}
async def dispatch(self, request: Request) -> Response:
"""Dispatch."""
info = self.routes.get(request.path)
if not info:
_LOGGER.debug("Route %s is not registered!", request.path)
return await unregistred(request)
handler = info.handler if info.enabled else info.fallback
return await handler(request)
def switch_route(self, coordinator: Callable, url_path: str):
"""Switch route."""
def switch_route(self, url_path: str) -> None:
"""Switch route to new handler."""
for path, info in self.routes.items():
info.enabled = path == url_path
for url, route in self.routes.items():
if url == url_path:
_LOGGER.info("New coordinator to route: %s", route.url_path)
route.enabled = True
route.handler = coordinator
route.route._handler = coordinator # noqa: SLF001
else:
route.enabled = False
route.handler = unregistred
route.route._handler = unregistred # noqa: SLF001
def add_route(
self, url_path: str, handler: Handler, *, enabled: bool = False
) -> None:
"""Add route to dispatcher."""
self,
url_path: str,
route: AbstractRoute,
handler: Callable,
enabled: bool = False,
):
"""Add route."""
self.routes[url_path] = Route(url_path, route, handler, enabled)
self.routes[url_path] = RouteInfo(url_path, handler, enabled=enabled)
_LOGGER.debug("Registered dispatcher for route %s", url_path)
def get_route(self, url_path: str) -> Route:
"""Get route."""
return self.routes.get(url_path, Route)
def show_enabled(self) -> str:
"""Show info of enabled route."""
for url, route in self.routes.items():
if route.enabled:
return (
f"Dispatcher enabled for URL: {url}, with handler: {route.handler}"
)
return "No routes is enabled."
def get_enabled(self) -> str:
"""Get enabled routes."""
enabled_routes = [
route.url_path for route in self.routes.values() if route.enabled
]
return "".join(enabled_routes) if enabled_routes else "None"
def __str__(self):
"""Return string representation."""
return "\n".join([str(route) for route in self.routes.values()])
async def unregistred(request: Request) -> Response:
"""Return unregistred error."""
_ = request
_LOGGER.debug("Received data to unregistred or disabled webhook.")
return Response(text="Unregistred webhook. Check your settings.", status=400)
async def unregistred(*args, **kwargs):
"""Unregister path to handle incoming data."""
_LOGGER.error("Recieved data to unregistred webhook. Check your settings")
return Response(body=f"{'Unregistred webhook.'}", status=404)

View File

@ -2,12 +2,11 @@
import logging
import math
from multiprocessing import Value
from typing import Any, cast
from pathlib import Path
import sqlite3
from typing import Any
import numpy as np
from py_typecheck import checked
from py_typecheck.core import checked_or
from homeassistant.components import persistent_notification
from homeassistant.config_entries import ConfigEntry
@ -16,6 +15,7 @@ from homeassistant.helpers.translation import async_get_translations
from .const import (
AZIMUT,
DATABASE_PATH,
DEV_DBG,
OUTSIDE_HUMIDITY,
OUTSIDE_TEMP,
@ -37,19 +37,19 @@ async def translations(
*,
key: str = "message",
category: str = "notify",
) -> str | None:
) -> str:
"""Get translated keys for domain."""
localize_key = f"component.{translation_domain}.{category}.{translation_key}.{key}"
language: str = hass.config.language
language = hass.config.language
_translations = await async_get_translations(
hass, language, category, [translation_domain]
)
if localize_key in _translations:
return _translations[localize_key]
return None
return ""
async def translated_notification(
@ -70,7 +70,7 @@ async def translated_notification(
f"component.{translation_domain}.{category}.{translation_key}.title"
)
language: str = cast("str", hass.config.language)
language = hass.config.language
_translations = await async_get_translations(
hass, language, category, [translation_domain]
@ -91,10 +91,7 @@ async def translated_notification(
async def update_options(
hass: HomeAssistant,
entry: ConfigEntry,
update_key: str,
update_value: str | list[str] | bool,
hass: HomeAssistant, entry: ConfigEntry, update_key, update_value
) -> bool:
"""Update config.options entry."""
conf = {**entry.options}
@ -103,43 +100,46 @@ async def update_options(
return hass.config_entries.async_update_entry(entry, options=conf)
def anonymize(
data: dict[str, str | int | float | bool],
) -> dict[str, str | int | float | bool]:
def anonymize(data):
"""Anoynimize recieved data."""
anonym: dict[str, str] = {}
return {
anonym[key]: value
for key, value in data.items()
if key not in {"ID", "PASSWORD", "wsid", "wspw"}
}
anonym = {}
for k in data:
if k not in {"ID", "PASSWORD", "wsid", "wspw"}:
anonym[k] = data[k]
return anonym
def remap_items(entities: dict[str, str]) -> dict[str, str]:
def remap_items(entities):
"""Remap items in query."""
return {
REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS
}
items = {}
for item in entities:
if item in REMAP_ITEMS:
items[REMAP_ITEMS[item]] = entities[item]
return items
def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]:
def remap_wslink_items(entities):
"""Remap items in query for WSLink API."""
return {
REMAP_WSLINK_ITEMS[key]: value
for key, value in entities.items()
if key in REMAP_WSLINK_ITEMS
}
items = {}
for item in entities:
if item in REMAP_WSLINK_ITEMS:
items[REMAP_WSLINK_ITEMS[item]] = entities[item]
return items
def loaded_sensors(config_entry: ConfigEntry) -> list[str]:
def loaded_sensors(config_entry: ConfigEntry) -> list | None:
"""Get loaded sensors."""
return config_entry.options.get(SENSORS_TO_LOAD) or []
def check_disabled(
items: dict[str, str], config_entry: ConfigEntry
) -> list[str] | None:
hass: HomeAssistant, items, config_entry: ConfigEntry
) -> list | None:
"""Check if we have data for unloaded sensors.
If so, then add sensor to load queue.
@ -147,11 +147,10 @@ def check_disabled(
Returns list of found sensors or None
"""
log = checked_or(config_entry.options.get(DEV_DBG), bool, False)
log: bool = config_entry.options.get(DEV_DBG, False)
entityFound: bool = False
_loaded_sensors: list[str] = loaded_sensors(config_entry)
missing_sensors: list[str] = []
_loaded_sensors = loaded_sensors(config_entry)
missing_sensors: list = []
for item in items:
if log:
@ -178,8 +177,8 @@ def wind_dir_to_text(deg: float) -> UnitOfDir | None:
return None
def battery_level(battery: int) -> UnitOfBat:
"""Return battery level.
def battery_level_to_text(battery: int) -> UnitOfBat:
"""Return battery level in text representation.
Returns UnitOfBat
"""
@ -189,10 +188,10 @@ def battery_level(battery: int) -> UnitOfBat:
1: UnitOfBat.NORMAL,
}
if (v := checked(battery, int)) is None:
if battery is None:
return UnitOfBat.UNKNOWN
return level_map.get(v, UnitOfBat.UNKNOWN)
return level_map.get(int(battery), UnitOfBat.UNKNOWN)
def battery_level_to_icon(battery: UnitOfBat) -> str:
@ -219,40 +218,21 @@ def celsius_to_fahrenheit(celsius: float) -> float:
return celsius * 9.0 / 5.0 + 32
def _to_float(val: Any) -> float | None:
"""Convert int or string to float."""
if not val:
return None
try:
v = float(val)
except (TypeError, ValueError):
return None
else:
return v
def heat_index(
data: dict[str, int | float | str], convert: bool = False
) -> float | None:
def heat_index(data: Any, convert: bool = False) -> float | None:
"""Calculate heat index from temperature.
data: dict with temperature and humidity
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
if (temp := _to_float(data.get(OUTSIDE_TEMP))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
temp = data.get(OUTSIDE_TEMP, None)
rh = data.get(OUTSIDE_HUMIDITY, None)
if not temp or not rh:
return None
if (rh := _to_float(data.get(OUTSIDE_HUMIDITY))) is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE HUMIDITY (%s), cannot calculate wind chill index.",
rh,
)
return None
temp = float(temp)
rh = float(rh)
adjustment = None
@ -283,30 +263,21 @@ def heat_index(
return simple
def chill_index(
data: dict[str, str | float | int], convert: bool = False
) -> float | None:
def chill_index(data: Any, convert: bool = False) -> float | None:
"""Calculate wind chill index from temperature and wind speed.
data: dict with temperature and wind speed
convert: bool, convert recieved data from Celsius to Fahrenheit
"""
temp = _to_float(data.get(OUTSIDE_TEMP))
wind = _to_float(data.get(WIND_SPEED))
if temp is None:
_LOGGER.error(
"We are missing/invalid OUTSIDE TEMP (%s), cannot calculate wind chill index.",
temp,
)
temp = data.get(OUTSIDE_TEMP, None)
wind = data.get(WIND_SPEED, None)
if not temp or not wind:
return None
if wind is None:
_LOGGER.error(
"We are missing/invalid WIND SPEED (%s), cannot calculate wind chill index.",
wind,
)
return None
temp = float(temp)
wind = float(wind)
if convert:
temp = celsius_to_fahrenheit(temp)
@ -323,3 +294,109 @@ def chill_index(
if temp < 50 and wind > 3
else temp
)
def long_term_units_in_statistics_meta():
"""Get units in long term statitstics."""
sensor_units = []
if not Path(DATABASE_PATH).exists():
_LOGGER.error("Database file not found: %s", DATABASE_PATH)
return False
conn = sqlite3.connect(DATABASE_PATH)
db = conn.cursor()
try:
db.execute(
"""
SELECT statistic_id, unit_of_measurement from statistics_meta
WHERE statistic_id LIKE 'sensor.weather_station_sws%'
"""
)
rows = db.fetchall()
sensor_units = {
statistic_id: f"{statistic_id} ({unit})" for statistic_id, unit in rows
}
except sqlite3.Error as e:
_LOGGER.error("Error during data migration: %s", e)
finally:
conn.close()
return sensor_units
async def migrate_data(hass: HomeAssistant, sensor_id: str | None = None) -> int | bool:
"""Migrate data from mm/d to mm."""
_LOGGER.debug("Sensor %s is required for data migration", sensor_id)
updated_rows = 0
if not Path(DATABASE_PATH).exists():
_LOGGER.error("Database file not found: %s", DATABASE_PATH)
return False
conn = sqlite3.connect(DATABASE_PATH)
db = conn.cursor()
try:
_LOGGER.info(sensor_id)
db.execute(
"""
UPDATE statistics_meta
SET unit_of_measurement = 'mm'
WHERE statistic_id = ?
AND unit_of_measurement = 'mm/d';
""",
(sensor_id,),
)
updated_rows = db.rowcount
conn.commit()
_LOGGER.info(
"Data migration completed successfully. Updated rows: %s for %s",
updated_rows,
sensor_id,
)
except sqlite3.Error as e:
_LOGGER.error("Error during data migration: %s", e)
finally:
conn.close()
return updated_rows
def migrate_data_old(sensor_id: str | None = None):
"""Migrate data from mm/d to mm."""
updated_rows = 0
if not Path(DATABASE_PATH).exists():
_LOGGER.error("Database file not found: %s", DATABASE_PATH)
return False
conn = sqlite3.connect(DATABASE_PATH)
db = conn.cursor()
try:
_LOGGER.info(sensor_id)
db.execute(
"""
UPDATE statistics_meta
SET unit_of_measurement = 'mm'
WHERE statistic_id = ?
AND unit_of_measurement = 'mm/d';
""",
(sensor_id,),
)
updated_rows = db.rowcount
conn.commit()
_LOGGER.info(
"Data migration completed successfully. Updated rows: %s for %s",
updated_rows,
sensor_id,
)
except sqlite3.Error as e:
_LOGGER.error("Error during data migration: %s", e)
finally:
conn.close()
return updated_rows

View File

@ -2,10 +2,8 @@
from datetime import datetime, timedelta
import logging
from typing import Final
from aiohttp.client_exceptions import ClientError
from py_typecheck.core import checked
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -54,22 +52,22 @@ class WindyPush:
def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Init."""
self.hass: Final = hass
self.config: Final = config
self.hass = hass
self.config = config
""" lets wait for 1 minute to get initial data from station
and then try to push first data to Windy
"""
self.last_update: datetime = datetime.now()
self.next_update: datetime = datetime.now() + timed(minutes=1)
self.last_update = datetime.now()
self.next_update = datetime.now() + timed(minutes=1)
self.log: bool = self.config.options.get(WINDY_LOGGER_ENABLED, False)
self.invalid_response_count: int = 0
self.log = self.config.options.get(WINDY_LOGGER_ENABLED)
self.invalid_response_count = 0
def verify_windy_response( # pylint: disable=useless-return
self,
response: str,
):
) -> WindyNotInserted | WindySuccess | WindyApiKeyError | None:
"""Verify answer form Windy."""
if self.log:
@ -87,7 +85,9 @@ class WindyPush:
if "Unauthorized" in response:
raise WindyApiKeyError
async def push_data_to_windy(self, data: dict[str, str]) -> bool:
return None
async def push_data_to_windy(self, data):
"""Pushes weather data do Windy stations.
Interval is 5 minutes, otherwise Windy would not accepts data.
@ -96,6 +96,8 @@ class WindyPush:
from station. But we need to do some clean up.
"""
text_for_test = None
if self.log:
_LOGGER.info(
"Windy last update = %s, next update at: %s",
@ -110,18 +112,13 @@ class WindyPush:
for purge in PURGE_DATA:
if purge in purged_data:
_ = purged_data.pop(purge)
purged_data.pop(purge)
if "dewptf" in purged_data:
dewpoint = round(((float(purged_data.pop("dewptf")) - 32) / 1.8), 1)
purged_data["dewpoint"] = str(dewpoint)
if (
windy_api_key := checked(self.config.options.get(WINDY_API_KEY), str)
) is None:
_LOGGER.error("Windy API key is not provided! Check your configuration.")
return False
windy_api_key = self.config.options.get(WINDY_API_KEY)
request_url = f"{WINDY_URL}{windy_api_key}"
if self.log:
@ -136,33 +133,27 @@ class WindyPush:
# log despite of settings
_LOGGER.error(WINDY_NOT_INSERTED)
text_for_test = WINDY_NOT_INSERTED
except WindyApiKeyError:
# log despite of settings
_LOGGER.critical(WINDY_INVALID_KEY)
text_for_test = WINDY_INVALID_KEY
if not (
await update_options(
self.hass, self.config, WINDY_ENABLED, False
)
):
_LOGGER.debug("Failed to set Windy option to false.")
await update_options(self.hass, self.config, WINDY_ENABLED, False)
except WindySuccess:
if self.log:
_LOGGER.info(WINDY_SUCCESS)
else:
if self.log:
_LOGGER.debug(WINDY_NOT_INSERTED)
text_for_test = WINDY_SUCCESS
except ClientError as ex:
_LOGGER.critical("Invalid response from Windy: %s", str(ex))
self.invalid_response_count += 1
if self.invalid_response_count > 3:
_LOGGER.critical(WINDY_UNEXPECTED)
if not await update_options(
self.hass, self.config, WINDY_ENABLED, False
):
_LOGGER.debug("Failed to set Windy options to false.")
text_for_test = WINDY_UNEXPECTED
await update_options(self.hass, self.config, WINDY_ENABLED, False)
self.last_update = datetime.now()
self.next_update = self.last_update + timed(minutes=5)
@ -170,4 +161,6 @@ class WindyPush:
if self.log:
_LOGGER.info("Next update: %s", str(self.next_update))
return True
if RESPONSE_FOR_TEST and text_for_test:
return text_for_test
return None