Compare commits

..

1 Commits

Author SHA1 Message Date
Lukas Svoboda 5731827224
Merge a3dc3d0d53 into 9f36ab5d4c 2026-01-18 20:48:41 +00:00
13 changed files with 181 additions and 585 deletions

View File

@ -1,33 +1,7 @@
"""Sencor SWS 12500 Weather Station integration (push/webhook based). """The Sencor SWS 12500 Weather Station integration."""
Architecture overview
---------------------
This integration is *push-based*: the weather station calls our HTTP endpoint and we
receive a query payload. We do not poll the station.
Key building blocks:
- `WeatherDataUpdateCoordinator` acts as an in-memory "data bus" for the latest payload.
On each webhook request we call `async_set_updated_data(...)` and all `CoordinatorEntity`
sensors get notified and update their states.
- `hass.data[DOMAIN][entry_id]` is a per-entry *dict* that stores runtime state
(coordinator instance, options snapshot, and sensor platform callbacks). Keeping this
structure consistent is critical; mixing different value types under the same key can
break listener wiring and make the UI appear "frozen".
Auto-discovery
--------------
When the station starts sending a new field, we:
1) persist the new sensor key into options (`SENSORS_TO_LOAD`)
2) dynamically add the new entity through the sensor platform (without reloading)
Why avoid reload?
Reloading a config entry unloads platforms temporarily, which removes coordinator listeners.
With a high-frequency push source (webhook), a reload at the wrong moment can lead to a
period where no entities are subscribed, causing stale states until another full reload/restart.
"""
import logging import logging
from typing import Any, cast from typing import Any
import aiohttp.web import aiohttp.web
from aiohttp.web_exceptions import HTTPUnauthorized from aiohttp.web_exceptions import HTTPUnauthorized
@ -47,6 +21,7 @@ from .const import (
API_ID, API_ID,
API_KEY, API_KEY,
DEFAULT_URL, DEFAULT_URL,
DEV_DBG,
DOMAIN, DOMAIN,
POCASI_CZ_ENABLED, POCASI_CZ_ENABLED,
SENSORS_TO_LOAD, SENSORS_TO_LOAD,
@ -54,7 +29,6 @@ from .const import (
WSLINK, WSLINK,
WSLINK_URL, WSLINK_URL,
) )
from .data import ENTRY_COORDINATOR, ENTRY_LAST_OPTIONS
from .pocasti_cz import PocasiPush from .pocasti_cz import PocasiPush
from .routes import Routes from .routes import Routes
from .utils import ( from .utils import (
@ -77,54 +51,25 @@ class IncorrectDataError(InvalidStateError):
"""Invalid exception.""" """Invalid exception."""
# NOTE:
# We intentionally avoid importing the sensor platform module at import-time here.
# Home Assistant can import modules in different orders; keeping imports acyclic
# prevents "partially initialized module" failures (circular imports / partially initialized modules).
#
# When we need to dynamically add sensors, we do a local import inside the webhook handler.
class WeatherDataUpdateCoordinator(DataUpdateCoordinator): class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
"""Coordinator for push updates. """Manage fetched data."""
Even though Home Assistant's `DataUpdateCoordinator` is often used for polling,
it also works well as a "fan-out" mechanism for push integrations:
- webhook handler updates `self.data` via `async_set_updated_data`
- all `CoordinatorEntity` instances subscribed to this coordinator update themselves
"""
def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None: def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Initialize the coordinator. """Init global updater."""
`config` is the config entry for this integration instance. We store it because
the webhook handler needs access to options (auth data, enabled features, etc.).
"""
self.hass: HomeAssistant = hass self.hass: HomeAssistant = hass
self.config: ConfigEntry = config self.config: ConfigEntry = config
self.windy: WindyPush = WindyPush(hass, config) self.windy: WindyPush = WindyPush(hass, config)
self.pocasi: PocasiPush = PocasiPush(hass, config) self.pocasi: PocasiPush = PocasiPush(hass, config)
super().__init__(hass, _LOGGER, name=DOMAIN) super().__init__(hass, _LOGGER, name=DOMAIN)
async def received_data(self, webdata: aiohttp.web.Request) -> aiohttp.web.Response: async def recieved_data(self, webdata: aiohttp.web.Request) -> aiohttp.web.Response:
"""Handle incoming webhook payload from the station. """Handle incoming data query."""
This method:
- validates authentication (different keys for WU vs WSLink)
- optionally forwards data to third-party services (Windy / Pocasi)
- remaps payload keys to internal sensor keys
- auto-discovers new sensor fields and adds entities dynamically
- updates coordinator data so existing entities refresh immediately
"""
# WSLink uses different auth and payload field naming than the legacy endpoint.
_wslink: bool = checked_or(self.config.options.get(WSLINK), bool, False) _wslink: bool = checked_or(self.config.options.get(WSLINK), bool, False)
# Incoming station payload is delivered as query params.
# We copy it to a plain dict so it can be passed around safely.
data: dict[str, Any] = dict(webdata.query) data: dict[str, Any] = dict(webdata.query)
# Validate auth keys (different parameter names depending on endpoint mode). # Check if station is sending auth data
if not _wslink and ("ID" not in data or "PASSWORD" not in data): if not _wslink and ("ID" not in data or "PASSWORD" not in data):
_LOGGER.error("Invalid request. No security data provided!") _LOGGER.error("Invalid request. No security data provided!")
raise HTTPUnauthorized raise HTTPUnauthorized
@ -143,8 +88,7 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
id_data = data.get("ID", "") id_data = data.get("ID", "")
key_data = data.get("PASSWORD", "") key_data = data.get("PASSWORD", "")
# Validate credentials against the integration's configured options. # Check if we have valid auth data in the integration
# If auth doesn't match, we reject the request (prevents random pushes from the LAN/Internet).
if (_id := checked(self.config.options.get(API_ID), str)) is None: if (_id := checked(self.config.options.get(API_ID), str)) is None:
_LOGGER.error("We don't have API ID set! Update your config!") _LOGGER.error("We don't have API ID set! Update your config!")
@ -158,21 +102,16 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
_LOGGER.error("Unauthorised access!") _LOGGER.error("Unauthorised access!")
raise HTTPUnauthorized raise HTTPUnauthorized
# Optional forwarding to external services. This is kept here (in the webhook handler)
# to avoid additional background polling tasks.
if self.config.options.get(WINDY_ENABLED, False): if self.config.options.get(WINDY_ENABLED, False):
await self.windy.push_data_to_windy(data) await self.windy.push_data_to_windy(data)
if self.config.options.get(POCASI_CZ_ENABLED, False): if self.config.options.get(POCASI_CZ_ENABLED, False):
await self.pocasi.push_data_to_server(data, "WSLINK" if _wslink else "WU") await self.pocasi.push_data_to_server(data, "WSLINK" if _wslink else "WU")
# Convert raw payload keys to our internal sensor keys (stable identifiers).
remaped_items: dict[str, str] = ( remaped_items: dict[str, str] = (
remap_wslink_items(data) if _wslink else remap_items(data) remap_wslink_items(data) if _wslink else remap_items(data)
) )
# Auto-discovery: if payload contains keys that are not enabled/loaded yet,
# add them to the option list and create entities dynamically.
if sensors := check_disabled(remaped_items, self.config): if sensors := check_disabled(remaped_items, self.config):
if ( if (
translate_sensors := checked( translate_sensors := checked(
@ -207,36 +146,14 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator):
"added", "added",
{"added_sensors": f"{human_readable}\n"}, {"added_sensors": f"{human_readable}\n"},
) )
if _loaded_sensors := loaded_sensors(self.config_entry):
# Persist newly discovered sensor keys to options (so they remain enabled after restart).
newly_discovered = list(sensors)
if _loaded_sensors := loaded_sensors(self.config):
sensors.extend(_loaded_sensors) sensors.extend(_loaded_sensors)
await update_options(self.hass, self.config, SENSORS_TO_LOAD, sensors) await update_options(self.hass, self.config_entry, SENSORS_TO_LOAD, sensors)
# await self.hass.config_entries.async_reload(self.config.entry_id)
# Dynamically add newly discovered sensors *without* reloading the entry.
#
# Why: Reloading a config entry unloads platforms temporarily. That removes coordinator
# listeners; with frequent webhook pushes the UI can appear "frozen" until the listeners
# are re-established. Dynamic adds avoid this window completely.
#
# We do a local import to avoid circular imports at module import time.
#
# NOTE: Some linters prefer top-level imports. In this case the local import is
# intentional and prevents "partially initialized module" errors.
from .sensor import ( # noqa: PLC0415 (local import is intentional)
add_new_sensors,
)
add_new_sensors(self.hass, self.config, newly_discovered)
# Fan-out update: notify all subscribed entities.
self.async_set_updated_data(remaped_items) self.async_set_updated_data(remaped_items)
# Optional dev logging (keep it lightweight to avoid log spam under high-frequency updates). if self.config_entry.options.get(DEV_DBG):
if self.config.options.get("dev_debug_checkbox"):
_LOGGER.info("Dev log: %s", anonymize(data)) _LOGGER.info("Dev log: %s", anonymize(data))
return aiohttp.web.Response(body="OK", status=200) return aiohttp.web.Response(body="OK", status=200)
@ -247,12 +164,7 @@ def register_path(
coordinator: WeatherDataUpdateCoordinator, coordinator: WeatherDataUpdateCoordinator,
config: ConfigEntry, config: ConfigEntry,
) -> bool: ) -> bool:
"""Register webhook paths. """Register paths to webhook."""
We register both possible endpoints and use an internal dispatcher (`Routes`) to
enable exactly one of them. This lets us toggle WSLink mode without re-registering
routes on the aiohttp router.
"""
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
if (hass_data := checked(hass.data[DOMAIN], dict[str, Any])) is None: if (hass_data := checked(hass.data[DOMAIN], dict[str, Any])) is None:
@ -262,13 +174,13 @@ def register_path(
# Create internal route dispatcher with provided urls # Create internal route dispatcher with provided urls
routes: Routes = Routes() routes: Routes = Routes()
routes.add_route(DEFAULT_URL, coordinator.received_data, enabled=not _wslink) routes.add_route(DEFAULT_URL, coordinator.recieved_data, enabled=not _wslink)
routes.add_route(WSLINK_URL, coordinator.received_data, enabled=_wslink) routes.add_route(WSLINK_URL, coordinator.recieved_data, enabled=_wslink)
# Register webhooks in HomeAssistant with dispatcher # Register webhooks in HomeAssistant with dispatcher
try: try:
_ = hass.http.app.router.add_get(DEFAULT_URL, routes.dispatch) _ = hass.http.app.router.add_get(DEFAULT_URL, routes.dispatch)
_ = hass.http.app.router.add_post(WSLINK_URL, routes.dispatch) _ = hass.http.app.router.add_get(WSLINK_URL, routes.dispatch)
# Save initialised routes # Save initialised routes
hass_data["routes"] = routes hass_data["routes"] = routes
@ -283,53 +195,15 @@ def register_path(
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a config entry. """Set up the config entry for my device."""
Important: coordinator = WeatherDataUpdateCoordinator(hass, entry)
- We store per-entry runtime state under `hass.data[DOMAIN][entry_id]` as a dict.
- We reuse the same coordinator instance across reloads so that:
- the webhook handler keeps updating the same coordinator
- already-created entities remain subscribed
""" hass_data = hass.data.setdefault(DOMAIN, {})
hass_data[entry.entry_id] = coordinator
hass_data_any = hass.data.setdefault(DOMAIN, {})
hass_data = cast("dict[str, Any]", hass_data_any)
# Per-entry runtime storage:
# hass.data[DOMAIN][entry_id] is always a dict (never the coordinator itself).
# Mixing types here (sometimes dict, sometimes coordinator) is a common source of hard-to-debug
# issues where entities stop receiving updates.
entry_data_any = hass_data.get(entry.entry_id)
if not isinstance(entry_data_any, dict):
entry_data_any = {}
hass_data[entry.entry_id] = entry_data_any
entry_data = cast("dict[str, Any]", entry_data_any)
# Reuse the existing coordinator across reloads so webhook handlers and entities
# remain connected to the same coordinator instance.
#
# Note: Routes store a bound method (`coordinator.received_data`). If we replaced the coordinator
# instance on reload, the dispatcher could keep calling the old instance while entities listen
# to the new one, causing updates to "disappear".
coordinator_any = entry_data.get(ENTRY_COORDINATOR)
if isinstance(coordinator_any, WeatherDataUpdateCoordinator):
coordinator_any.config = entry
# Recreate helper instances so they pick up updated options safely.
coordinator_any.windy = WindyPush(hass, entry)
coordinator_any.pocasi = PocasiPush(hass, entry)
coordinator = coordinator_any
else:
coordinator = WeatherDataUpdateCoordinator(hass, entry)
entry_data[ENTRY_COORDINATOR] = coordinator
routes: Routes | None = hass_data.get("routes", None) routes: Routes | None = hass_data.get("routes", None)
# Keep an options snapshot so update_listener can skip reloads when only `SENSORS_TO_LOAD` changes.
# Auto-discovery updates this option frequently and we do not want to reload for that case.
entry_data[ENTRY_LAST_OPTIONS] = dict(entry.options)
_wslink = checked_or(entry.options.get(WSLINK), bool, False) _wslink = checked_or(entry.options.get(WSLINK), bool, False)
_LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled") _LOGGER.debug("WS Link is %s", "enbled" if _wslink else "disabled")
@ -353,46 +227,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def update_listener(hass: HomeAssistant, entry: ConfigEntry): async def update_listener(hass: HomeAssistant, entry: ConfigEntry):
"""Handle config entry option updates. """Update setup listener."""
We skip reloading when only `SENSORS_TO_LOAD` changes.
Why:
- Auto-discovery updates `SENSORS_TO_LOAD` as new payload fields appear.
- Reloading a push-based integration temporarily unloads platforms and removes
coordinator listeners, which can make the UI appear "stuck" until restart.
"""
hass_data_any = hass.data.get(DOMAIN)
if isinstance(hass_data_any, dict):
hass_data = cast("dict[str, Any]", hass_data_any)
entry_data_any = hass_data.get(entry.entry_id)
if isinstance(entry_data_any, dict):
entry_data = cast("dict[str, Any]", entry_data_any)
old_options_any = entry_data.get(ENTRY_LAST_OPTIONS)
if isinstance(old_options_any, dict):
old_options = cast("dict[str, Any]", old_options_any)
new_options = dict(entry.options)
changed_keys = {
k
for k in set(old_options.keys()) | set(new_options.keys())
if old_options.get(k) != new_options.get(k)
}
# Update snapshot early for the next comparison.
entry_data[ENTRY_LAST_OPTIONS] = new_options
if changed_keys == {SENSORS_TO_LOAD}:
_LOGGER.debug(
"Options updated (%s); skipping reload.", SENSORS_TO_LOAD
)
return
else:
# No/invalid snapshot: store current options for next comparison.
entry_data[ENTRY_LAST_OPTIONS] = dict(entry.options)
_ = await hass.config_entries.async_reload(entry.entry_id) _ = await hass.config_entries.async_reload(entry.entry_id)
_LOGGER.info("Settings updated") _LOGGER.info("Settings updated")

View File

@ -249,7 +249,7 @@ class UnitOfBat(StrEnum):
LOW = "low" LOW = "low"
NORMAL = "normal" NORMAL = "normal"
UNKNOWN = "drained" UNKNOWN = "unknown"
BATTERY_LEVEL: list[UnitOfBat] = [ BATTERY_LEVEL: list[UnitOfBat] = [

View File

@ -1,19 +0,0 @@
"""Shared keys for storing integration runtime state in `hass.data`.
This integration stores runtime state under:
hass.data[DOMAIN][entry_id] -> dict
Keeping keys in a dedicated module prevents subtle bugs where different modules
store different types under the same key.
"""
from __future__ import annotations
from typing import Final
# Per-entry dict keys stored under hass.data[DOMAIN][entry_id]
ENTRY_COORDINATOR: Final[str] = "coordinator"
ENTRY_ADD_ENTITIES: Final[str] = "async_add_entities"
ENTRY_DESCRIPTIONS: Final[str] = "sensor_descriptions"
ENTRY_LAST_OPTIONS: Final[str] = "last_options"

View File

@ -1,14 +0,0 @@
{
"entity": {
"sensor": {
"indoor_battery": {
"default": "mdi:battery-unknown",
"state": {
"low": "mdi:battery-low",
"normal": "mdi:battery",
"drained": "mdi:battery-alert"
}
}
}
}
}

View File

@ -1,19 +1,4 @@
"""Routes implementation. """Routes implementation."""
Why this dispatcher exists
--------------------------
Home Assistant registers aiohttp routes on startup. Re-registering or removing routes at runtime
is awkward and error-prone (and can raise if routes already exist). This integration supports two
different push endpoints (legacy WU-style vs WSLink). To allow switching between them without
touching the aiohttp router, we register both routes once and use this in-process dispatcher to
decide which one is currently enabled.
Important note:
- Each route stores a *bound method* handler (e.g. `coordinator.received_data`). That means the
route points to a specific coordinator instance. When the integration reloads, we must keep the
same coordinator instance or update the stored handler accordingly. Otherwise requests may go to
an old coordinator while entities listen to a new one (result: UI appears "frozen").
"""
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -28,11 +13,7 @@ Handler = Callable[[Request], Awaitable[Response]]
@dataclass @dataclass
class RouteInfo: class RouteInfo:
"""Route definition held by the dispatcher. """Route struct."""
- `handler` is the real webhook handler (bound method).
- `fallback` is used when the route exists but is currently disabled.
"""
url_path: str url_path: str
handler: Handler handler: Handler
@ -41,19 +22,14 @@ class RouteInfo:
class Routes: class Routes:
"""Simple route dispatcher. """Routes class."""
We register aiohttp routes once and direct traffic to the currently enabled endpoint
using `switch_route`. This keeps route registration stable while still allowing the
integration to support multiple incoming push formats.
"""
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize dispatcher storage.""" """Init."""
self.routes: dict[str, RouteInfo] = {} self.routes: dict[str, RouteInfo] = {}
async def dispatch(self, request: Request) -> Response: async def dispatch(self, request: Request) -> Response:
"""Dispatch incoming request to either the enabled handler or a fallback.""" """Dispatch."""
info = self.routes.get(request.path) info = self.routes.get(request.path)
if not info: if not info:
_LOGGER.debug("Route %s is not registered!", request.path) _LOGGER.debug("Route %s is not registered!", request.path)
@ -62,27 +38,20 @@ class Routes:
return await handler(request) return await handler(request)
def switch_route(self, url_path: str) -> None: def switch_route(self, url_path: str) -> None:
"""Enable exactly one route and disable all others. """Switch route to new handler."""
This is called when options change (e.g. WSLink toggle). The aiohttp router stays
untouched; we only flip which internal handler is active.
"""
for path, info in self.routes.items(): for path, info in self.routes.items():
info.enabled = path == url_path info.enabled = path == url_path
def add_route( def add_route(
self, url_path: str, handler: Handler, *, enabled: bool = False self, url_path: str, handler: Handler, *, enabled: bool = False
) -> None: ) -> None:
"""Register a route in the dispatcher. """Add route to dispatcher."""
This does not register anything in aiohttp. It only stores routing metadata that
`dispatch` uses after aiohttp has routed the request by path.
"""
self.routes[url_path] = RouteInfo(url_path, handler, enabled=enabled) self.routes[url_path] = RouteInfo(url_path, handler, enabled=enabled)
_LOGGER.debug("Registered dispatcher for route %s", url_path) _LOGGER.debug("Registered dispatcher for route %s", url_path)
def show_enabled(self) -> str: def show_enabled(self) -> str:
"""Return a human-readable description of the currently enabled route.""" """Show info of enabled route."""
for url, route in self.routes.items(): for url, route in self.routes.items():
if route.enabled: if route.enabled:
return ( return (
@ -92,11 +61,7 @@ class Routes:
async def unregistred(request: Request) -> Response: async def unregistred(request: Request) -> Response:
"""Fallback response for unknown/disabled routes. """Return unregistred error."""
This should normally never happen for correctly configured stations, but it provides
a clear error message when the station pushes to the wrong endpoint.
"""
_ = request _ = request
_LOGGER.debug("Received data to unregistred or disabled webhook.") _LOGGER.debug("Received data to unregistred or disabled webhook.")
return Response(text="Unregistred webhook. Check your settings.", status=400) return Response(text="Unregistred webhook. Check your settings.", status=400)

View File

@ -1,36 +1,18 @@
"""Sensor platform for SWS12500. """Sensors definition for SWS12500."""
This module creates sensor entities based on the config entry options.
The integration is push-based (webhook), so we avoid reloading the entry for
auto-discovered sensors. Instead, we dynamically add new entities at runtime
using the `async_add_entities` callback stored in `hass.data`.
Why not reload on auto-discovery?
Reloading a config entry unloads platforms temporarily, which removes coordinator
listeners. With frequent webhook pushes, this can create a window where nothing is
subscribed and the frontend appears "frozen" until another full reload/restart.
Runtime state is stored under:
hass.data[DOMAIN][entry_id] -> dict with known keys (see `data.py`)
"""
from collections.abc import Callable
from functools import cached_property
import logging import logging
from typing import Any, cast
from py_typecheck import checked_or
from homeassistant.components.sensor import SensorEntity from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo, generate_entity_id from homeassistant.helpers.entity import DeviceInfo, generate_entity_id
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import WeatherDataUpdateCoordinator
from .const import ( from .const import (
BATTERY_LIST,
CHILL_INDEX, CHILL_INDEX,
DOMAIN, DOMAIN,
HEAT_INDEX, HEAT_INDEX,
@ -41,202 +23,133 @@ from .const import (
WIND_DIR, WIND_DIR,
WIND_SPEED, WIND_SPEED,
WSLINK, WSLINK,
UnitOfBat,
) )
from .data import ENTRY_ADD_ENTITIES, ENTRY_COORDINATOR, ENTRY_DESCRIPTIONS
from .sensors_common import WeatherSensorEntityDescription from .sensors_common import WeatherSensorEntityDescription
from .sensors_weather import SENSOR_TYPES_WEATHER_API from .sensors_weather import SENSOR_TYPES_WEATHER_API
from .sensors_wslink import SENSOR_TYPES_WSLINK from .sensors_wslink import SENSOR_TYPES_WSLINK
from .utils import battery_level_to_icon, battery_level_to_text, chill_index, heat_index
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# The `async_add_entities` callback accepts a list of Entity-like objects.
# We keep the type loose here to avoid propagating HA generics (`DataUpdateCoordinator[T]`)
# that often end up as "partially unknown" under type-checkers.
_AddEntitiesFn = Callable[[list[SensorEntity]], None]
def _auto_enable_derived_sensors(requested: set[str]) -> set[str]:
"""Auto-enable derived sensors when their source fields are present.
This does NOT model strict dependencies ("if you want X, we force-add inputs").
Instead, it opportunistically enables derived outputs when the station already
provides the raw fields needed to compute them.
"""
expanded = set(requested)
# Wind azimut depends on wind dir
if WIND_DIR in expanded:
expanded.add(WIND_AZIMUT)
# Heat index depends on temp + humidity
if OUTSIDE_TEMP in expanded and OUTSIDE_HUMIDITY in expanded:
expanded.add(HEAT_INDEX)
# Chill index depends on temp + wind speed
if OUTSIDE_TEMP in expanded and WIND_SPEED in expanded:
expanded.add(CHILL_INDEX)
return expanded
async def async_setup_entry( async def async_setup_entry(
hass: HomeAssistant, hass: HomeAssistant,
config_entry: ConfigEntry, config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback,
) -> None: ) -> None:
"""Set up Weather Station sensors. """Set up Weather Station sensors."""
We also store `async_add_entities` and a map of sensor descriptions in `hass.data` coordinator: WeatherDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
so the webhook handler can add newly discovered entities dynamically without
reloading the config entry.
"""
hass_data_any = hass.data.setdefault(DOMAIN, {})
hass_data = cast("dict[str, Any]", hass_data_any)
entry_data_any = hass_data.get(config_entry.entry_id) sensors_to_load: list = []
if not isinstance(entry_data_any, dict): sensors: list = []
# Created by the integration setup, but keep this defensive for safety. _wslink = config_entry.options.get(WSLINK)
entry_data_any = {}
hass_data[config_entry.entry_id] = entry_data_any
entry_data = cast("dict[str, Any]", entry_data_any)
coordinator = entry_data.get(ENTRY_COORDINATOR) SENSOR_TYPES = SENSOR_TYPES_WSLINK if _wslink else SENSOR_TYPES_WEATHER_API
if coordinator is None:
# Coordinator is created by the integration (`__init__.py`). Without it, we cannot set up entities.
# This should not happen in normal operation; treat it as a no-op setup.
return
# Store the platform callback so we can add entities later (auto-discovery) without reload. # Check if we have some sensors to load.
entry_data[ENTRY_ADD_ENTITIES] = async_add_entities if sensors_to_load := config_entry.options.get(SENSORS_TO_LOAD, []):
if WIND_DIR in sensors_to_load:
sensors_to_load.append(WIND_AZIMUT)
if (OUTSIDE_HUMIDITY in sensors_to_load) and (OUTSIDE_TEMP in sensors_to_load):
sensors_to_load.append(HEAT_INDEX)
wslink_enabled = checked_or(config_entry.options.get(WSLINK), bool, False) if (WIND_SPEED in sensors_to_load) and (OUTSIDE_TEMP in sensors_to_load):
sensor_types = SENSOR_TYPES_WSLINK if wslink_enabled else SENSOR_TYPES_WEATHER_API sensors_to_load.append(CHILL_INDEX)
sensors = [
# Keep a descriptions map for dynamic entity creation by key. WeatherSensor(hass, description, coordinator)
# When the station starts sending a new payload field, the webhook handler can for description in SENSOR_TYPES
# look up its description here and instantiate the matching entity. if description.key in sensors_to_load
entry_data[ENTRY_DESCRIPTIONS] = {desc.key: desc for desc in sensor_types} ]
async_add_entities(sensors)
sensors_to_load = checked_or(
config_entry.options.get(SENSORS_TO_LOAD), list[str], []
)
if not sensors_to_load:
return
requested = _auto_enable_derived_sensors(set(sensors_to_load))
entities: list[WeatherSensor] = [
WeatherSensor(description, coordinator)
for description in sensor_types
if description.key in requested
]
async_add_entities(entities)
def add_new_sensors(
hass: HomeAssistant, config_entry: ConfigEntry, keys: list[str]
) -> None:
"""Dynamically add newly discovered sensors without reloading the entry.
Called by the webhook handler when the station starts sending new fields.
Design notes:
- This function is intentionally a safe no-op if the sensor platform hasn't
finished setting up yet (e.g. callback/description map missing).
- Unknown payload keys are ignored (only keys with an entity description are added).
"""
hass_data_any = hass.data.get(DOMAIN)
if not isinstance(hass_data_any, dict):
return
hass_data = cast("dict[str, Any]", hass_data_any)
entry_data_any = hass_data.get(config_entry.entry_id)
if not isinstance(entry_data_any, dict):
return
entry_data = cast("dict[str, Any]", entry_data_any)
add_entities_any = entry_data.get(ENTRY_ADD_ENTITIES)
descriptions_any = entry_data.get(ENTRY_DESCRIPTIONS)
coordinator_any = entry_data.get(ENTRY_COORDINATOR)
if add_entities_any is None or descriptions_any is None or coordinator_any is None:
return
add_entities_fn = cast("_AddEntitiesFn", add_entities_any)
descriptions_map = cast(
"dict[str, WeatherSensorEntityDescription]", descriptions_any
)
new_entities: list[SensorEntity] = []
for key in keys:
desc = descriptions_map.get(key)
if desc is None:
continue
new_entities.append(WeatherSensor(desc, coordinator_any))
if new_entities:
add_entities_fn(new_entities)
class WeatherSensor( # pyright: ignore[reportIncompatibleVariableOverride] class WeatherSensor( # pyright: ignore[reportIncompatibleVariableOverride]
CoordinatorEntity, SensorEntity CoordinatorEntity[WeatherDataUpdateCoordinator], SensorEntity
): # pyright: ignore[reportIncompatibleVariableOverride] ): # pyright: ignore[reportIncompatibleVariableOverride]
"""Implementation of Weather Sensor entity. """Implementation of Weather Sensor entity."""
We intentionally keep the coordinator type unparameterized here to avoid
propagating HA's generic `DataUpdateCoordinator[T]` typing into this module.
"""
_attr_has_entity_name = True _attr_has_entity_name = True
_attr_should_poll = False _attr_should_poll = False
def __init__( def __init__(
self, self,
hass: HomeAssistant,
description: WeatherSensorEntityDescription, description: WeatherSensorEntityDescription,
coordinator: Any, coordinator: WeatherDataUpdateCoordinator,
) -> None: ) -> None:
"""Initialize sensor.""" """Initialize sensor."""
super().__init__(coordinator) super().__init__(coordinator)
self.hass = hass
self.coordinator = coordinator
self.entity_description = description self.entity_description = description
self._attr_unique_id = description.key self._attr_unique_id = description.key
self._data = None
async def async_added_to_hass(self) -> None:
"""Handle listeners to reloaded sensors."""
await super().async_added_to_hass()
self.coordinator.async_add_listener(self._handle_coordinator_update)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._data = self.coordinator.data.get(self.entity_description.key)
super()._handle_coordinator_update()
self.async_write_ha_state()
@property @property
def native_value(self): # pyright: ignore[reportIncompatibleVariableOverride] def native_value(self): # pyright: ignore[reportIncompatibleVariableOverride]
"""Return the current sensor state. """Return value of entity."""
Resolution order: _wslink = self.coordinator.config.options.get(WSLINK)
1) If `value_from_data_fn` is provided, it receives the full payload dict and can compute
derived values (e.g. battery enum mapping, azimut text, heat/chill indices).
2) Otherwise we read the raw value for this key from the payload and pass it through `value_fn`.
Payload normalization: if self.coordinator.data and (WIND_AZIMUT in self.entity_description.key):
- The station sometimes sends empty strings for missing fields; we treat "" as no value (None). return self.entity_description.value_fn(self.coordinator.data.get(WIND_DIR)) # pyright: ignore[ reportAttributeAccessIssue]
"""
data: dict[str, Any] = checked_or(self.coordinator.data, dict[str, Any], {})
key = self.entity_description.key
description = cast("WeatherSensorEntityDescription", self.entity_description) if (
if description.value_from_data_fn is not None: self.coordinator.data
return description.value_from_data_fn(data) and (HEAT_INDEX in self.entity_description.key)
and not _wslink
):
return self.entity_description.value_fn(heat_index(self.coordinator.data)) # pyright: ignore[ reportAttributeAccessIssue]
raw = data.get(key) if (
if raw is None or raw == "": self.coordinator.data
return None and (CHILL_INDEX in self.entity_description.key)
and not _wslink
):
return self.entity_description.value_fn(chill_index(self.coordinator.data)) # pyright: ignore[ reportAttributeAccessIssue]
if description.value_fn is None: return (
return None None if self._data == "" else self.entity_description.value_fn(self._data) # pyright: ignore[ reportAttributeAccessIssue]
)
return description.value_fn(raw)
@property @property
def suggested_entity_id(self) -> str: def suggested_entity_id(self) -> str:
"""Return name.""" """Return name."""
return generate_entity_id("sensor.{}", self.entity_description.key) return generate_entity_id("sensor.{}", self.entity_description.key)
@cached_property @property
def device_info(self) -> DeviceInfo: def icon(self) -> str | None: # pyright: ignore[reportIncompatibleVariableOverride]
"""Return the dynamic icon for battery representation."""
if self.entity_description.key in BATTERY_LIST:
if self.native_value:
battery_level = battery_level_to_text(self.native_value)
return battery_level_to_icon(battery_level)
return battery_level_to_icon(UnitOfBat.UNKNOWN)
return self.entity_description.icon
@property
def device_info(self) -> DeviceInfo: # pyright: ignore[reportIncompatibleVariableOverride]
"""Device info.""" """Device info."""
return DeviceInfo( return DeviceInfo(
connections=set(), connections=set(),

View File

@ -11,7 +11,4 @@ from homeassistant.components.sensor import SensorEntityDescription
class WeatherSensorEntityDescription(SensorEntityDescription): class WeatherSensorEntityDescription(SensorEntityDescription):
"""Describe Weather Sensor entities.""" """Describe Weather Sensor entities."""
value_fn: Callable[[Any], int | float | str | None] | None = None value_fn: Callable[[Any], int | float | str | None]
value_from_data_fn: Callable[[dict[str, Any]], int | float | str | None] | None = (
None
)

View File

@ -41,7 +41,7 @@ from .const import (
UnitOfDir, UnitOfDir,
) )
from .sensors_common import WeatherSensorEntityDescription from .sensors_common import WeatherSensorEntityDescription
from .utils import chill_index, heat_index, wind_dir_to_text from .utils import wind_dir_to_text
SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = ( SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = (
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
@ -133,11 +133,8 @@ SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = (
key=WIND_AZIMUT, key=WIND_AZIMUT,
icon="mdi:sign-direction", icon="mdi:sign-direction",
value_fn=lambda data: cast("str", wind_dir_to_text(data)), value_fn=lambda data: cast("str", wind_dir_to_text(data)),
value_from_data_fn=lambda data: cast(
"str", wind_dir_to_text(cast("float", data.get(WIND_DIR) or 0.0))
),
device_class=SensorDeviceClass.ENUM, device_class=SensorDeviceClass.ENUM,
options=[e.value for e in UnitOfDir], options=list(UnitOfDir),
translation_key=WIND_AZIMUT, translation_key=WIND_AZIMUT,
), ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
@ -247,7 +244,6 @@ SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = (
icon="mdi:weather-sunny", icon="mdi:weather-sunny",
translation_key=HEAT_INDEX, translation_key=HEAT_INDEX,
value_fn=lambda data: cast("int", data), value_fn=lambda data: cast("int", data),
value_from_data_fn=lambda data: heat_index(data),
), ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
key=CHILL_INDEX, key=CHILL_INDEX,
@ -259,6 +255,5 @@ SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = (
icon="mdi:weather-sunny", icon="mdi:weather-sunny",
translation_key=CHILL_INDEX, translation_key=CHILL_INDEX,
value_fn=lambda data: cast("int", data), value_fn=lambda data: cast("int", data),
value_from_data_fn=lambda data: chill_index(data),
), ),
) )

View File

@ -44,11 +44,10 @@ from .const import (
WIND_GUST, WIND_GUST,
WIND_SPEED, WIND_SPEED,
YEARLY_RAIN, YEARLY_RAIN,
UnitOfBat,
UnitOfDir, UnitOfDir,
) )
from .sensors_common import WeatherSensorEntityDescription from .sensors_common import WeatherSensorEntityDescription
from .utils import battery_level, wind_dir_to_text from .utils import wind_dir_to_text
SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = ( SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = (
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
@ -140,11 +139,8 @@ SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = (
key=WIND_AZIMUT, key=WIND_AZIMUT,
icon="mdi:sign-direction", icon="mdi:sign-direction",
value_fn=lambda data: cast("str", wind_dir_to_text(data)), value_fn=lambda data: cast("str", wind_dir_to_text(data)),
value_from_data_fn=lambda data: cast(
"str", wind_dir_to_text(cast("float", data.get(WIND_DIR) or 0.0))
),
device_class=SensorDeviceClass.ENUM, device_class=SensorDeviceClass.ENUM,
options=[e.value for e in UnitOfDir], options=list(UnitOfDir),
translation_key=WIND_AZIMUT, translation_key=WIND_AZIMUT,
), ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
@ -269,6 +265,25 @@ SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = (
translation_key=CH3_HUMIDITY, translation_key=CH3_HUMIDITY,
value_fn=lambda data: cast("int", data), value_fn=lambda data: cast("int", data),
), ),
# WeatherSensorEntityDescription(
# key=CH4_TEMP,
# native_unit_of_measurement=UnitOfTemperature.FAHRENHEIT,
# state_class=SensorStateClass.MEASUREMENT,
# device_class=SensorDeviceClass.TEMPERATURE,
# suggested_unit_of_measurement=UnitOfTemperature.CELSIUS,
# icon="mdi:weather-sunny",
# translation_key=CH4_TEMP,
# value_fn=lambda data: cast(float, data),
# ),
# WeatherSensorEntityDescription(
# key=CH4_HUMIDITY,
# native_unit_of_measurement=PERCENTAGE,
# state_class=SensorStateClass.MEASUREMENT,
# device_class=SensorDeviceClass.HUMIDITY,
# icon="mdi:weather-sunny",
# translation_key=CH4_HUMIDITY,
# value_fn=lambda data: cast(int, data),
# ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
key=HEAT_INDEX, key=HEAT_INDEX,
native_unit_of_measurement=UnitOfTemperature.CELSIUS, native_unit_of_measurement=UnitOfTemperature.CELSIUS,
@ -294,32 +309,23 @@ SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = (
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
key=OUTSIDE_BATTERY, key=OUTSIDE_BATTERY,
translation_key=OUTSIDE_BATTERY, translation_key=OUTSIDE_BATTERY,
icon="mdi:battery-unknown",
device_class=SensorDeviceClass.ENUM, device_class=SensorDeviceClass.ENUM,
options=[e.value for e in UnitOfBat], value_fn=lambda data: (data),
value_fn=None,
value_from_data_fn=lambda data: battery_level(
data.get(OUTSIDE_BATTERY, None)
).value,
), ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
key=CH2_BATTERY, key=CH2_BATTERY,
translation_key=CH2_BATTERY, translation_key=CH2_BATTERY,
icon="mdi:battery-unknown",
device_class=SensorDeviceClass.ENUM, device_class=SensorDeviceClass.ENUM,
options=[e.value for e in UnitOfBat], value_fn=lambda data: (data),
value_fn=None,
value_from_data_fn=lambda data: battery_level(
data.get(CH2_BATTERY, None)
).value,
), ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
key=INDOOR_BATTERY, key=INDOOR_BATTERY,
translation_key=INDOOR_BATTERY, translation_key=INDOOR_BATTERY,
icon="mdi:battery-unknown",
device_class=SensorDeviceClass.ENUM, device_class=SensorDeviceClass.ENUM,
options=[e.value for e in UnitOfBat], value_fn=lambda data: (data),
value_fn=None,
value_from_data_fn=lambda data: battery_level(
data.get(INDOOR_BATTERY, None)
).value,
), ),
WeatherSensorEntityDescription( WeatherSensorEntityDescription(
key=WBGT_TEMP, key=WBGT_TEMP,

View File

@ -87,18 +87,6 @@
"pocasi_logger_checkbox": "Enable only if you want to send debbug data to the developer" "pocasi_logger_checkbox": "Enable only if you want to send debbug data to the developer"
} }
}, },
"ecowitt": {
"description": "Nastavení pro Ecowitt",
"title": "Konfigurace pro stanice Ecowitt",
"data": {
"ecowitt_webhook_id": "Unikátní webhook ID",
"ecowitt_enabled": "Povolit data ze stanice Ecowitt"
},
"data_description": {
"ecowitt_webhook_id": "Nastavení pro stanici: {url}:{port}/weatherhub/{webhook_id}",
"ecowitt_enabled": "Povolit přijímání dat ze stanic Ecowitt"
}
},
"migration": { "migration": {
"title": "Statistic migration.", "title": "Statistic migration.",
"description": "For the correct functioning of long-term statistics, it is necessary to migrate the sensor unit in the long-term statistics. The original unit of long-term statistics for daily precipitation was in mm/d, however, the station only sends data in mm without time differentiation.\n\n The sensor to be migrated is for daily precipitation. If the correct value is already in the list for the daily precipitation sensor (mm), then the migration is already complete.\n\n Migration result for the sensor: {migration_status}, a total of {migration_count} rows converted.", "description": "For the correct functioning of long-term statistics, it is necessary to migrate the sensor unit in the long-term statistics. The original unit of long-term statistics for daily precipitation was in mm/d, however, the station only sends data in mm without time differentiation.\n\n The sensor to be migrated is for daily precipitation. If the correct value is already in the list for the daily precipitation sensor (mm), then the migration is already complete.\n\n Migration result for the sensor: {migration_status}, a total of {migration_count} rows converted.",
@ -178,21 +166,6 @@
"chill_index": { "chill_index": {
"name": "Wind chill" "name": "Wind chill"
}, },
"hourly_rain": {
"name": "Hourly precipitation"
},
"weekly_rain": {
"name": "Weekly precipitation"
},
"monthly_rain": {
"name": "Monthly precipitation"
},
"yearly_rain": {
"name": "Yearly precipitation"
},
"wbgt_index": {
"name": "WBGT index"
},
"wind_azimut": { "wind_azimut": {
"name": "Bearing", "name": "Bearing",
"state": { "state": {
@ -212,30 +185,14 @@
"wnw": "WNW", "wnw": "WNW",
"nw": "NW", "nw": "NW",
"nnw": "NNW" "nnw": "NNW"
} },
}, "outside_battery": {
"outside_battery": { "name": "Outside battery level",
"name": "Outside battery level", "state": {
"state": { "normal": "OK",
"normal": "OK", "low": "Low",
"low": "Low", "unknown": "Unknown / drained out"
"unknown": "Unknown / drained out" }
}
},
"ch2_battery": {
"name": "Channel 2 battery level",
"state": {
"normal": "OK",
"low": "Low",
"unknown": "Unknown / drained out"
}
},
"indoor_battery": {
"name": "Console battery level",
"state": {
"normal": "OK",
"low": "Low",
"unknown": "Unknown / drained out"
} }
} }
} }

View File

@ -233,7 +233,7 @@
"state": { "state": {
"low": "Nízká", "low": "Nízká",
"normal": "Normální", "normal": "Normální",
"drained": "Neznámá / zcela vybitá" "unknown": "Neznámá / zcela vybitá"
} }
}, },
"ch2_battery": { "ch2_battery": {

View File

@ -1,21 +1,12 @@
"""Utils for SWS12500. """Utils for SWS12500."""
This module contains small helpers used across the integration.
Notable responsibilities:
- Payload remapping: convert raw station/webhook field names into stable internal keys.
- Auto-discovery helpers: detect new payload fields that are not enabled yet and persist them
to config entry options so sensors can be created dynamically.
- Formatting/conversion helpers (wind direction text, battery mapping, temperature conversions).
Keeping these concerns in one place avoids duplicating logic in the webhook handler and entity code.
"""
import logging import logging
import math import math
from multiprocessing import Value
from typing import Any, cast from typing import Any, cast
import numpy as np import numpy as np
from py_typecheck import checked
from py_typecheck.core import checked_or from py_typecheck.core import checked_or
from homeassistant.components import persistent_notification from homeassistant.components import persistent_notification
@ -115,35 +106,24 @@ async def update_options(
def anonymize( def anonymize(
data: dict[str, str | int | float | bool], data: dict[str, str | int | float | bool],
) -> dict[str, str | int | float | bool]: ) -> dict[str, str | int | float | bool]:
"""Anonymize received data for safe logging. """Anoynimize recieved data."""
anonym: dict[str, str] = {}
- Keep all keys, but mask sensitive values. return {
- Do not raise on unexpected/missing keys. anonym[key]: value
""" for key, value in data.items()
secrets = {"ID", "PASSWORD", "wsid", "wspw"} if key not in {"ID", "PASSWORD", "wsid", "wspw"}
}
return {k: ("***" if k in secrets else v) for k, v in data.items()}
def remap_items(entities: dict[str, str]) -> dict[str, str]: def remap_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap legacy (WU-style) payload field names into internal sensor keys. """Remap items in query."""
The station sends short/legacy field names (e.g. "tempf", "humidity"). Internally we use
stable keys from `const.py` (e.g. "outside_temp", "outside_humidity"). This function produces
a normalized dict that the rest of the integration can work with.
"""
return { return {
REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS REMAP_ITEMS[key]: value for key, value in entities.items() if key in REMAP_ITEMS
} }
def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]: def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]:
"""Remap WSLink payload field names into internal sensor keys. """Remap items in query for WSLink API."""
WSLink uses a different naming scheme than the legacy endpoint (e.g. "t1tem", "t1ws").
Just like `remap_items`, this function normalizes the payload to the integration's stable
internal keys.
"""
return { return {
REMAP_WSLINK_ITEMS[key]: value REMAP_WSLINK_ITEMS[key]: value
for key, value in entities.items() for key, value in entities.items()
@ -152,32 +132,19 @@ def remap_wslink_items(entities: dict[str, str]) -> dict[str, str]:
def loaded_sensors(config_entry: ConfigEntry) -> list[str]: def loaded_sensors(config_entry: ConfigEntry) -> list[str]:
"""Return sensor keys currently enabled for this config entry. """Get loaded sensors."""
Auto-discovery persists new keys into `config_entry.options[SENSORS_TO_LOAD]`. The sensor
platform uses this list to decide which entities to create.
"""
return config_entry.options.get(SENSORS_TO_LOAD) or [] return config_entry.options.get(SENSORS_TO_LOAD) or []
def check_disabled( def check_disabled(
items: dict[str, str], config_entry: ConfigEntry items: dict[str, str], config_entry: ConfigEntry
) -> list[str] | None: ) -> list[str] | None:
"""Detect payload fields that are not enabled yet (auto-discovery). """Check if we have data for unloaded sensors.
The integration supports "auto-discovery" of sensors: when the station starts sending a new If so, then add sensor to load queue.
field, we can automatically enable and create the corresponding entity.
This helper compares the normalized payload keys (`items`) with the currently enabled sensor
keys stored in options (`SENSORS_TO_LOAD`) and returns the missing keys.
Returns:
- list[str] of newly discovered sensor keys (to be added/enabled), or
- None if no new keys were found.
Notes:
- Logging is controlled via `DEV_DBG` because payloads can arrive frequently.
Returns list of found sensors or None
""" """
log = checked_or(config_entry.options.get(DEV_DBG), bool, False) log = checked_or(config_entry.options.get(DEV_DBG), bool, False)
@ -211,12 +178,9 @@ def wind_dir_to_text(deg: float) -> UnitOfDir | None:
return None return None
def battery_level(battery: int | str | None) -> UnitOfBat: def battery_level(battery: int) -> UnitOfBat:
"""Return battery level. """Return battery level.
WSLink payload values often arrive as strings (e.g. "0"/"1"), so we accept
both ints and strings and coerce to int before mapping.
Returns UnitOfBat Returns UnitOfBat
""" """
@ -225,19 +189,10 @@ def battery_level(battery: int | str | None) -> UnitOfBat:
1: UnitOfBat.NORMAL, 1: UnitOfBat.NORMAL,
} }
if (battery is None) or (battery == ""): if (v := checked(battery, int)) is None:
return UnitOfBat.UNKNOWN return UnitOfBat.UNKNOWN
vi: int return level_map.get(v, UnitOfBat.UNKNOWN)
if isinstance(battery, int):
vi = battery
else:
try:
vi = int(battery)
except ValueError:
return UnitOfBat.UNKNOWN
return level_map.get(vi, UnitOfBat.UNKNOWN)
def battery_level_to_icon(battery: UnitOfBat) -> str: def battery_level_to_icon(battery: UnitOfBat) -> str:

View File

@ -2,6 +2,7 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from typing import Final
from aiohttp.client_exceptions import ClientError from aiohttp.client_exceptions import ClientError
from py_typecheck.core import checked from py_typecheck.core import checked
@ -25,6 +26,8 @@ from .utils import update_options
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
RESPONSE_FOR_TEST = False
class WindyNotInserted(Exception): class WindyNotInserted(Exception):
"""NotInserted state.""" """NotInserted state."""
@ -51,8 +54,8 @@ class WindyPush:
def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None: def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None:
"""Init.""" """Init."""
self.hass = hass self.hass: Final = hass
self.config = config self.config: Final = config
""" lets wait for 1 minute to get initial data from station """ lets wait for 1 minute to get initial data from station
and then try to push first data to Windy and then try to push first data to Windy
@ -63,7 +66,7 @@ class WindyPush:
self.log: bool = self.config.options.get(WINDY_LOGGER_ENABLED, False) self.log: bool = self.config.options.get(WINDY_LOGGER_ENABLED, False)
self.invalid_response_count: int = 0 self.invalid_response_count: int = 0
def verify_windy_response( def verify_windy_response( # pylint: disable=useless-return
self, self,
response: str, response: str,
): ):
@ -84,7 +87,7 @@ class WindyPush:
if "Unauthorized" in response: if "Unauthorized" in response:
raise WindyApiKeyError raise WindyApiKeyError
async def push_data_to_windy(self, data: dict[str, str]) -> bool: async def push_data_to_windy(self, data: dict[str, str]) -> bool:
"""Pushes weather data do Windy stations. """Pushes weather data do Windy stations.
Interval is 5 minutes, otherwise Windy would not accepts data. Interval is 5 minutes, otherwise Windy would not accepts data.