diff --git a/custom_components/sws12500/__init__.py b/custom_components/sws12500/__init__.py index 42d119a..8ffbc6c 100644 --- a/custom_components/sws12500/__init__.py +++ b/custom_components/sws12500/__init__.py @@ -26,13 +26,16 @@ With a high-frequency push source (webhook), a reload at the wrong moment can le period where no entities are subscribed, causing stale states until another full reload/restart. """ +from asyncio import timeout import logging from typing import Any, cast +from aiohttp import ClientConnectionError import aiohttp.web from aiohttp.web_exceptions import HTTPUnauthorized from py_typecheck import checked, checked_or +from homeassistant.components.network import async_get_source_ip from homeassistant.config_entries import ConfigEntry from homeassistant.const import Platform from homeassistant.core import HomeAssistant @@ -41,6 +44,8 @@ from homeassistant.exceptions import ( InvalidStateError, PlatformNotReady, ) +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.helpers.network import get_url from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from .const import ( @@ -48,13 +53,14 @@ from .const import ( API_KEY, DEFAULT_URL, DOMAIN, + HEALTH_URL, POCASI_CZ_ENABLED, SENSORS_TO_LOAD, WINDY_ENABLED, WSLINK, WSLINK_URL, ) -from .data import ENTRY_COORDINATOR, ENTRY_LAST_OPTIONS +from .data import ENTRY_COORDINATOR, ENTRY_HEALTH_COORD, ENTRY_LAST_OPTIONS from .pocasti_cz import PocasiPush from .routes import Routes from .utils import ( @@ -77,6 +83,76 @@ class IncorrectDataError(InvalidStateError): """Invalid exception.""" +"""Helper coordinator for health status endpoint. + +This is separate from the main `WeatherDataUpdateCoordinator` +Coordinator checks the WSLink Addon reachability and returns basic health info. + +Serves health status for diagnostic sensors and the integration health page in HA UI. +""" + + +class HealthCoordinator(DataUpdateCoordinator): + """Coordinator for health status of integration. + + This coordinator will listen on `/station/health`. + """ + + # TODO Add update interval and periodic checks for WSLink Addon reachability, so that health status is always up-to-date even without incoming station pushes. + + def __init__(self, hass: HomeAssistant, config: ConfigEntry) -> None: + """Initialize coordinator for health status.""" + + self.hass: HomeAssistant = hass + self.config: ConfigEntry = config + self.data: dict[str, str] = {} + + super().__init__(hass, logger=_LOGGER, name=DOMAIN) + + async def health_status(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + """Handle and inform of integration status. + + Note: aiohttp route handlers must accept the incoming Request. + """ + + session = async_get_clientsession(self.hass, False) + + # Keep this endpoint lightweight and always available. + url = get_url(self.hass) + ip = await async_get_source_ip(self.hass) + + request_url = f"https://{ip}" + + try: + async with timeout(5), session.get(request_url) as response: + if checked(response.status, int) == 200: + resp = await response.text() + else: + resp = {"error": f"Unexpected status code {response.status}"} + except ClientConnectionError: + resp = {"error": "Connection error, WSLink addon is unreachable."} + + data = { + "Integration status": "ok", + "HomeAssistant source_ip": str(ip), + "HomeAssistant base_url": url, + "WSLink Addon response": resp, + } + + self.async_set_updated_data(data) + + # TODO Remove this response, as it is intentded to tests only. + return aiohttp.web.json_response( + { + "Integration status": "ok", + "HomeAssistant source_ip": str(ip), + "HomeAssistant base_url": url, + "WSLink Addon response": resp, + }, + status=200, + ) + + # NOTE: # We intentionally avoid importing the sensor platform module at import-time here. # Home Assistant can import modules in different orders; keeping imports acyclic @@ -245,6 +321,7 @@ class WeatherDataUpdateCoordinator(DataUpdateCoordinator): def register_path( hass: HomeAssistant, coordinator: WeatherDataUpdateCoordinator, + coordinator_h: HealthCoordinator, config: ConfigEntry, ) -> bool: """Register webhook paths. @@ -264,11 +341,13 @@ def register_path( routes: Routes = Routes() routes.add_route(DEFAULT_URL, coordinator.received_data, enabled=not _wslink) routes.add_route(WSLINK_URL, coordinator.received_data, enabled=_wslink) + routes.add_route(HEALTH_URL, coordinator_h.health_status, enabled=True) # Register webhooks in HomeAssistant with dispatcher try: _ = hass.http.app.router.add_get(DEFAULT_URL, routes.dispatch) _ = hass.http.app.router.add_post(WSLINK_URL, routes.dispatch) + _ = hass.http.app.router.add_get(HEALTH_URL, routes.dispatch) # Save initialised routes hass_data["routes"] = routes @@ -324,6 +403,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: coordinator = WeatherDataUpdateCoordinator(hass, entry) entry_data[ENTRY_COORDINATOR] = coordinator + # Similar to the coordinator, we want to reuse the same health coordinator instance across + # reloads so that the health endpoint remains responsive and doesn't lose its listeners. + coordinator_health = entry_data.get(ENTRY_HEALTH_COORD) + if isinstance(coordinator_health, HealthCoordinator): + coordinator_health.config = entry + else: + coordinator_health = HealthCoordinator(hass, entry) + entry_data[ENTRY_HEALTH_COORD] = coordinator_health + routes: Routes | None = hass_data.get("routes", None) # Keep an options snapshot so update_listener can skip reloads when only `SENSORS_TO_LOAD` changes. @@ -339,7 +427,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: routes.switch_route(DEFAULT_URL if not _wslink else WSLINK_URL) _LOGGER.debug("%s", routes.show_enabled()) else: - routes_enabled = register_path(hass, coordinator, entry) + routes_enabled = register_path(hass, coordinator, coordinator_health, entry) if not routes_enabled: _LOGGER.error("Fatal: path not registered!") diff --git a/custom_components/sws12500/config_flow.py b/custom_components/sws12500/config_flow.py index 1de781c..9356fbb 100644 --- a/custom_components/sws12500/config_flow.py +++ b/custom_components/sws12500/config_flow.py @@ -23,6 +23,7 @@ from .const import ( DOMAIN, ECOWITT_ENABLED, ECOWITT_WEBHOOK_ID, + # HEALTH_BEARER_TOKEN, INVALID_CREDENTIALS, POCASI_CZ_API_ID, POCASI_CZ_API_KEY, @@ -100,6 +101,7 @@ class ConfigOptionsFlowHandler(OptionsFlow): WINDY_LOGGER_ENABLED: self.config_entry.options.get( WINDY_LOGGER_ENABLED, False ), + WINDY_ENABLED: self.config_entry.options.get(WINDY_ENABLED, False), } self.windy_data_schema = { diff --git a/custom_components/sws12500/const.py b/custom_components/sws12500/const.py index e70343b..2d518d8 100644 --- a/custom_components/sws12500/const.py +++ b/custom_components/sws12500/const.py @@ -6,6 +6,7 @@ from typing import Final DOMAIN = "sws12500" DEFAULT_URL = "/weatherstation/updateweatherstation.php" WSLINK_URL = "/data/upload.php" +HEALTH_URL = "/station/health" WINDY_URL = "https://stations.windy.com/api/v2/observation/update" DATABASE_PATH = "/config/home-assistant_v2.db" @@ -24,6 +25,88 @@ SENSOR_TO_MIGRATE: Final = "sensor_to_migrate" DEV_DBG: Final = "dev_debug_checkbox" WSLINK: Final = "wslink" +__all__ = [ + "DOMAIN", + "DEFAULT_URL", + "WSLINK_URL", + "HEALTH_URL", + "WINDY_URL", + "DATABASE_PATH", + "POCASI_CZ_URL", + "POCASI_CZ_SEND_MINIMUM", + "ICON", + "API_KEY", + "API_ID", + "SENSORS_TO_LOAD", + "SENSOR_TO_MIGRATE", + "DEV_DBG", + "WSLINK", + "ECOWITT", + "ECOWITT_WEBHOOK_ID", + "ECOWITT_ENABLED", + "POCASI_CZ_API_KEY", + "POCASI_CZ_API_ID", + "POCASI_CZ_SEND_INTERVAL", + "POCASI_CZ_ENABLED", + "POCASI_CZ_LOGGER_ENABLED", + "POCASI_INVALID_KEY", + "POCASI_CZ_SUCCESS", + "POCASI_CZ_UNEXPECTED", + "WINDY_STATION_ID", + "WINDY_STATION_PW", + "WINDY_ENABLED", + "WINDY_LOGGER_ENABLED", + "WINDY_NOT_INSERTED", + "WINDY_INVALID_KEY", + "WINDY_SUCCESS", + "WINDY_UNEXPECTED", + "INVALID_CREDENTIALS", + "PURGE_DATA", + "PURGE_DATA_POCAS", + "BARO_PRESSURE", + "OUTSIDE_TEMP", + "DEW_POINT", + "OUTSIDE_HUMIDITY", + "OUTSIDE_CONNECTION", + "OUTSIDE_BATTERY", + "WIND_SPEED", + "WIND_GUST", + "WIND_DIR", + "WIND_AZIMUT", + "RAIN", + "HOURLY_RAIN", + "WEEKLY_RAIN", + "MONTHLY_RAIN", + "YEARLY_RAIN", + "DAILY_RAIN", + "SOLAR_RADIATION", + "INDOOR_TEMP", + "INDOOR_HUMIDITY", + "INDOOR_BATTERY", + "UV", + "CH2_TEMP", + "CH2_HUMIDITY", + "CH2_CONNECTION", + "CH2_BATTERY", + "CH3_TEMP", + "CH3_HUMIDITY", + "CH3_CONNECTION", + "CH4_TEMP", + "CH4_HUMIDITY", + "CH4_CONNECTION", + "HEAT_INDEX", + "CHILL_INDEX", + "WBGT_TEMP", + "REMAP_ITEMS", + "REMAP_WSLINK_ITEMS", + "DISABLED_BY_DEFAULT", + "BATTERY_LIST", + "UnitOfDir", + "AZIMUT", + "UnitOfBat", + "BATTERY_LEVEL", +] + ECOWITT: Final = "ecowitt" ECOWITT_WEBHOOK_ID: Final = "ecowitt_webhook_id" ECOWITT_ENABLED: Final = "ecowitt_enabled" diff --git a/custom_components/sws12500/data.py b/custom_components/sws12500/data.py index e80c1cc..b84d2c7 100644 --- a/custom_components/sws12500/data.py +++ b/custom_components/sws12500/data.py @@ -17,3 +17,4 @@ ENTRY_COORDINATOR: Final[str] = "coordinator" ENTRY_ADD_ENTITIES: Final[str] = "async_add_entities" ENTRY_DESCRIPTIONS: Final[str] = "sensor_descriptions" ENTRY_LAST_OPTIONS: Final[str] = "last_options" +ENTRY_HEALTH_COORD: Final[str] = "coord_h" diff --git a/custom_components/sws12500/health_sensor.py b/custom_components/sws12500/health_sensor.py new file mode 100644 index 0000000..8f06653 --- /dev/null +++ b/custom_components/sws12500/health_sensor.py @@ -0,0 +1,78 @@ +"""Health diagnostic sensor for SWS-12500. + +Home Assistant only auto-loads standard platform modules (e.g. `sensor.py`). +This file is a helper module and must be wired from `sensor.py`. +""" + +from __future__ import annotations + +from typing import Any, cast + +from homeassistant.components.sensor import SensorEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import EntityCategory +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import DOMAIN +from .data import ENTRY_HEALTH_COORD + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the health diagnostic sensor.""" + + domain_data_any = hass.data.get(DOMAIN) + if not isinstance(domain_data_any, dict): + return + domain_data = cast("dict[str, Any]", domain_data_any) + + entry_data_any = domain_data.get(entry.entry_id) + if not isinstance(entry_data_any, dict): + return + entry_data = cast("dict[str, Any]", entry_data_any) + + coordinator_any = entry_data.get(ENTRY_HEALTH_COORD) + if coordinator_any is None: + return + + async_add_entities([HealthDiagnosticSensor(coordinator_any, entry)]) + + +class HealthDiagnosticSensor( # pyright: ignore[reportIncompatibleVariableOverride] + CoordinatorEntity, SensorEntity +): + """Health diagnostic sensor for SWS-12500.""" + + _attr_has_entity_name = True + _attr_should_poll = False + + def __init__(self, coordinator: Any, entry: ConfigEntry) -> None: + """Initialize the sensor.""" + super().__init__(coordinator) + + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_unique_id = f"{entry.entry_id}_health" + self._attr_name = "Health" + self._attr_icon = "mdi:heart-pulse" + + @property + def native_value(self) -> str | None: # pyright: ignore[reportIncompatibleVariableOverride] + """Return a compact health state.""" + + data = cast("dict[str, Any]", getattr(self.coordinator, "data", {}) or {}) + value = data.get("Integration status") + return cast("str | None", value) + + @property + def extra_state_attributes(self) -> dict[str, Any] | None: # pyright: ignore[reportIncompatibleVariableOverride] + """Return detailed health diagnostics as attributes.""" + + data_any = getattr(self.coordinator, "data", None) + if not isinstance(data_any, dict): + return None + return cast("dict[str, Any]", data_any) diff --git a/custom_components/sws12500/sensor.py b/custom_components/sws12500/sensor.py index c5b8e09..44c4b99 100644 --- a/custom_components/sws12500/sensor.py +++ b/custom_components/sws12500/sensor.py @@ -30,6 +30,7 @@ from homeassistant.helpers.entity import DeviceInfo, generate_entity_id from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity +from . import health_sensor from .const import ( CHILL_INDEX, DOMAIN, @@ -110,6 +111,10 @@ async def async_setup_entry( # Store the platform callback so we can add entities later (auto-discovery) without reload. entry_data[ENTRY_ADD_ENTITIES] = async_add_entities + # Wire up the integration health diagnostic sensor. + # This is kept in a dedicated module (`health_sensor.py`) for readability. + await health_sensor.async_setup_entry(hass, config_entry, async_add_entities) + wslink_enabled = checked_or(config_entry.options.get(WSLINK), bool, False) sensor_types = SENSOR_TYPES_WSLINK if wslink_enabled else SENSOR_TYPES_WEATHER_API @@ -202,6 +207,15 @@ class WeatherSensor( # pyright: ignore[reportIncompatibleVariableOverride] self.entity_description = description self._attr_unique_id = description.key + config_entry = getattr(self.coordinator, "config", None) + self._dev_log = checked_or( + config_entry.options.get("dev_debug_checkbox") + if config_entry is not None + else False, + bool, + False, + ) + @property def native_value(self): # pyright: ignore[reportIncompatibleVariableOverride] """Return the current sensor state. @@ -218,17 +232,60 @@ class WeatherSensor( # pyright: ignore[reportIncompatibleVariableOverride] key = self.entity_description.key description = cast("WeatherSensorEntityDescription", self.entity_description) + + if self._dev_log: + _LOGGER.debug( + "native_value start: key=%s, has_value_from_data_fn=%s, has_value_fn=%s, data_keys=%s", + key, + description.value_from_data_fn is not None, + description.value_fn is not None, + sorted(data), + ) + if description.value_from_data_fn is not None: - return description.value_from_data_fn(data) + try: + value = description.value_from_data_fn(data) + except Exception: # noqa: BLE001 + _LOGGER.exception( + "native_value compute failed via value_from_data_fn for key=%s", key + ) + return None + if self._dev_log: + _LOGGER.debug( + "native_value computed via value_from_data_fn: key=%s -> %s", + key, + value, + ) + return value raw = data.get(key) if raw is None or raw == "": + if self._dev_log: + _LOGGER.debug("native_value missing raw: key=%s raw=%s", key, raw) return None if description.value_fn is None: + if self._dev_log: + _LOGGER.debug("native_value has no value_fn: key=%s raw=%s", key, raw) return None - return description.value_fn(raw) + try: + value = description.value_fn(raw) + except Exception: # noqa: BLE001 + _LOGGER.exception( + "native_value compute failed via value_fn for key=%s raw=%s", key, raw + ) + return None + + if self._dev_log: + _LOGGER.debug( + "native_value computed via value_fn: key=%s raw=%s -> %s", + key, + raw, + value, + ) + + return value @property def suggested_entity_id(self) -> str: diff --git a/custom_components/sws12500/sensors_weather.py b/custom_components/sws12500/sensors_weather.py index d6f617e..84de10e 100644 --- a/custom_components/sws12500/sensors_weather.py +++ b/custom_components/sws12500/sensors_weather.py @@ -247,7 +247,7 @@ SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = ( icon="mdi:weather-sunny", translation_key=HEAT_INDEX, value_fn=lambda data: cast("int", data), - value_from_data_fn=lambda data: heat_index(data), + value_from_data_fn=heat_index, ), WeatherSensorEntityDescription( key=CHILL_INDEX, @@ -259,6 +259,6 @@ SENSOR_TYPES_WEATHER_API: tuple[WeatherSensorEntityDescription, ...] = ( icon="mdi:weather-sunny", translation_key=CHILL_INDEX, value_fn=lambda data: cast("int", data), - value_from_data_fn=lambda data: chill_index(data), + value_from_data_fn=chill_index, ), ) diff --git a/custom_components/sws12500/sensors_wslink.py b/custom_components/sws12500/sensors_wslink.py index fc33b79..7f89955 100644 --- a/custom_components/sws12500/sensors_wslink.py +++ b/custom_components/sws12500/sensors_wslink.py @@ -297,9 +297,9 @@ SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = ( device_class=SensorDeviceClass.ENUM, options=[e.value for e in UnitOfBat], value_fn=None, - value_from_data_fn=lambda data: battery_level( - data.get(OUTSIDE_BATTERY, None) - ).value, + value_from_data_fn=lambda data: ( + battery_level(data.get(OUTSIDE_BATTERY, None)).value + ), ), WeatherSensorEntityDescription( key=CH2_BATTERY, @@ -307,9 +307,9 @@ SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = ( device_class=SensorDeviceClass.ENUM, options=[e.value for e in UnitOfBat], value_fn=None, - value_from_data_fn=lambda data: battery_level( - data.get(CH2_BATTERY, None) - ).value, + value_from_data_fn=lambda data: ( + battery_level(data.get(CH2_BATTERY, None)).value + ), ), WeatherSensorEntityDescription( key=INDOOR_BATTERY, @@ -317,9 +317,9 @@ SENSOR_TYPES_WSLINK: tuple[WeatherSensorEntityDescription, ...] = ( device_class=SensorDeviceClass.ENUM, options=[e.value for e in UnitOfBat], value_fn=None, - value_from_data_fn=lambda data: battery_level( - data.get(INDOOR_BATTERY, None) - ).value, + value_from_data_fn=lambda data: ( + battery_level(data.get(INDOOR_BATTERY, None)).value + ), ), WeatherSensorEntityDescription( key=WBGT_TEMP, diff --git a/custom_components/sws12500/sws12500 b/custom_components/sws12500/sws12500 new file mode 120000 index 0000000..ce15d22 --- /dev/null +++ b/custom_components/sws12500/sws12500 @@ -0,0 +1 @@ +../dev/custom_components/sws12500 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b73fb6b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,38 @@ +"""Pytest configuration for tests under `dev/tests`. + +Goals: +- Make `custom_components.*` importable. +- Keep this file lightweight and avoid global HA test-harness side effects. + +Repository layout: +- Root custom components: `SWS-12500/custom_components/...` (symlinked to `dev/custom_components/...`) +- Integration sources: `SWS-12500/dev/custom_components/...` + +Note: +Some tests use lightweight `hass` stubs (e.g. SimpleNamespace) that are not compatible with +Home Assistant's full test fixtures. Do NOT enable HA-only fixtures globally here. +Instead, request such fixtures (e.g. `enable_custom_integrations`) explicitly in the specific +tests that need HA's integration loader / flow managers. + +""" + +from __future__ import annotations + +from pathlib import Path +import sys + + +def pytest_configure() -> None: + """Adjust sys.path so imports and HA loader discovery work in tests.""" + repo_root = Path(__file__).resolve().parents[2] # .../SWS-12500 + dev_root = repo_root / "dev" + + # Ensure the repo root is importable so HA can find `custom_components//manifest.json`. + repo_root_str = str(repo_root) + if repo_root_str not in sys.path: + sys.path.insert(0, repo_root_str) + + # Also ensure `dev/` is importable for direct imports from dev tooling/tests. + dev_root_str = str(dev_root) + if dev_root_str not in sys.path: + sys.path.insert(0, dev_root_str) diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py new file mode 100644 index 0000000..00ab63b --- /dev/null +++ b/tests/test_config_flow.py @@ -0,0 +1,383 @@ +from __future__ import annotations + +from unittest.mock import patch + +import pytest +from pytest_homeassistant_custom_component.common import MockConfigEntry + +from custom_components.sws12500.const import ( + API_ID, + API_KEY, + DEV_DBG, + DOMAIN, + ECOWITT_ENABLED, + ECOWITT_WEBHOOK_ID, + INVALID_CREDENTIALS, + POCASI_CZ_API_ID, + POCASI_CZ_API_KEY, + POCASI_CZ_ENABLED, + POCASI_CZ_LOGGER_ENABLED, + POCASI_CZ_SEND_INTERVAL, + POCASI_CZ_SEND_MINIMUM, + WINDY_ENABLED, + WINDY_LOGGER_ENABLED, + WINDY_STATION_ID, + WINDY_STATION_PW, + WSLINK, +) +from homeassistant import config_entries + + +@pytest.mark.asyncio +async def test_config_flow_user_form_then_create_entry( + hass, enable_custom_integrations +) -> None: + """Online HA: config flow shows form then creates entry and options.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == "form" + assert result["step_id"] == "user" + + user_input = { + API_ID: "my_id", + API_KEY: "my_key", + WSLINK: False, + DEV_DBG: False, + } + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input=user_input + ) + assert result2["type"] == "create_entry" + assert result2["title"] == DOMAIN + assert result2["data"] == user_input + assert result2["options"] == user_input + + +@pytest.mark.asyncio +async def test_config_flow_user_invalid_credentials_api_id( + hass, enable_custom_integrations +) -> None: + """API_ID in INVALID_CREDENTIALS -> error on API_ID.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == "form" + + user_input = { + API_ID: INVALID_CREDENTIALS[0], + API_KEY: "ok_key", + WSLINK: False, + DEV_DBG: False, + } + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input=user_input + ) + assert result2["type"] == "form" + assert result2["step_id"] == "user" + assert result2["errors"][API_ID] == "valid_credentials_api" + + +@pytest.mark.asyncio +async def test_config_flow_user_invalid_credentials_api_key( + hass, enable_custom_integrations +) -> None: + """API_KEY in INVALID_CREDENTIALS -> error on API_KEY.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == "form" + + user_input = { + API_ID: "ok_id", + API_KEY: INVALID_CREDENTIALS[0], + WSLINK: False, + DEV_DBG: False, + } + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input=user_input + ) + assert result2["type"] == "form" + assert result2["step_id"] == "user" + assert result2["errors"][API_KEY] == "valid_credentials_key" + + +@pytest.mark.asyncio +async def test_config_flow_user_invalid_credentials_match( + hass, enable_custom_integrations +) -> None: + """API_KEY == API_ID -> base error.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == "form" + + user_input = { + API_ID: "same", + API_KEY: "same", + WSLINK: False, + DEV_DBG: False, + } + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input=user_input + ) + assert result2["type"] == "form" + assert result2["step_id"] == "user" + assert result2["errors"]["base"] == "valid_credentials_match" + + +@pytest.mark.asyncio +async def test_options_flow_init_menu(hass, enable_custom_integrations) -> None: + """Options flow shows menu with expected steps.""" + entry = MockConfigEntry(domain=DOMAIN, data={}, options={}) + entry.add_to_hass(hass) + + result = await hass.config_entries.options.async_init(entry.entry_id) + assert result["type"] == "menu" + assert result["step_id"] == "init" + assert set(result["menu_options"]) == {"basic", "ecowitt", "windy", "pocasi"} + + +@pytest.mark.asyncio +async def test_options_flow_basic_validation_and_create_entry( + hass, enable_custom_integrations +) -> None: + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + API_ID: "old", + API_KEY: "oldkey", + WSLINK: False, + DEV_DBG: False, + }, + ) + entry.add_to_hass(hass) + + init = await hass.config_entries.options.async_init(entry.entry_id) + assert init["type"] == "menu" + + form = await hass.config_entries.options.async_configure( + init["flow_id"], user_input={"next_step_id": "basic"} + ) + assert form["type"] == "form" + assert form["step_id"] == "basic" + + # Cover invalid API_ID branch in options flow basic step. + bad_api_id = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + API_ID: INVALID_CREDENTIALS[0], + API_KEY: "ok_key", + WSLINK: False, + DEV_DBG: False, + }, + ) + assert bad_api_id["type"] == "form" + assert bad_api_id["step_id"] == "basic" + assert bad_api_id["errors"][API_ID] == "valid_credentials_api" + + # Cover invalid API_KEY branch in options flow basic step. + bad_api_key = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + API_ID: "ok_id", + API_KEY: INVALID_CREDENTIALS[0], + WSLINK: False, + DEV_DBG: False, + }, + ) + assert bad_api_key["type"] == "form" + assert bad_api_key["step_id"] == "basic" + assert bad_api_key["errors"][API_KEY] == "valid_credentials_key" + + bad = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={API_ID: "same", API_KEY: "same", WSLINK: False, DEV_DBG: False}, + ) + assert bad["type"] == "form" + assert bad["step_id"] == "basic" + assert bad["errors"]["base"] == "valid_credentials_match" + + good = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={API_ID: "new", API_KEY: "newkey", WSLINK: True, DEV_DBG: True}, + ) + assert good["type"] == "create_entry" + assert good["title"] == DOMAIN + assert good["data"][API_ID] == "new" + assert good["data"][API_KEY] == "newkey" + assert good["data"][WSLINK] is True + assert good["data"][DEV_DBG] is True + + +@pytest.mark.asyncio +async def test_options_flow_windy_requires_keys_when_enabled( + hass, enable_custom_integrations +) -> None: + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + WINDY_ENABLED: False, + WINDY_LOGGER_ENABLED: False, + WINDY_STATION_ID: "", + WINDY_STATION_PW: "", + }, + ) + entry.add_to_hass(hass) + + init = await hass.config_entries.options.async_init(entry.entry_id) + form = await hass.config_entries.options.async_configure( + init["flow_id"], user_input={"next_step_id": "windy"} + ) + assert form["type"] == "form" + assert form["step_id"] == "windy" + + bad = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + WINDY_ENABLED: True, + WINDY_LOGGER_ENABLED: False, + WINDY_STATION_ID: "", + WINDY_STATION_PW: "", + }, + ) + assert bad["type"] == "form" + assert bad["step_id"] == "windy" + assert bad["errors"][WINDY_STATION_ID] == "windy_key_required" + + good = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + WINDY_ENABLED: True, + WINDY_LOGGER_ENABLED: True, + WINDY_STATION_ID: "sid", + WINDY_STATION_PW: "spw", + }, + ) + assert good["type"] == "create_entry" + assert good["data"][WINDY_ENABLED] is True + + +@pytest.mark.asyncio +async def test_options_flow_pocasi_validation_minimum_interval_and_required_keys( + hass, + enable_custom_integrations, +) -> None: + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + POCASI_CZ_API_ID: "", + POCASI_CZ_API_KEY: "", + POCASI_CZ_ENABLED: False, + POCASI_CZ_LOGGER_ENABLED: False, + POCASI_CZ_SEND_INTERVAL: 30, + }, + ) + entry.add_to_hass(hass) + + init = await hass.config_entries.options.async_init(entry.entry_id) + form = await hass.config_entries.options.async_configure( + init["flow_id"], user_input={"next_step_id": "pocasi"} + ) + assert form["type"] == "form" + assert form["step_id"] == "pocasi" + + bad = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + POCASI_CZ_API_ID: "", + POCASI_CZ_API_KEY: "", + POCASI_CZ_ENABLED: True, + POCASI_CZ_LOGGER_ENABLED: False, + POCASI_CZ_SEND_INTERVAL: POCASI_CZ_SEND_MINIMUM - 1, + }, + ) + assert bad["type"] == "form" + assert bad["step_id"] == "pocasi" + assert bad["errors"][POCASI_CZ_SEND_INTERVAL] == "pocasi_send_minimum" + assert bad["errors"][POCASI_CZ_API_ID] == "pocasi_id_required" + assert bad["errors"][POCASI_CZ_API_KEY] == "pocasi_key_required" + + good = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + POCASI_CZ_API_ID: "pid", + POCASI_CZ_API_KEY: "pkey", + POCASI_CZ_ENABLED: True, + POCASI_CZ_LOGGER_ENABLED: True, + POCASI_CZ_SEND_INTERVAL: POCASI_CZ_SEND_MINIMUM, + }, + ) + assert good["type"] == "create_entry" + assert good["data"][POCASI_CZ_ENABLED] is True + + +@pytest.mark.asyncio +async def test_options_flow_ecowitt_uses_get_url_placeholders_and_webhook_default( + hass, + enable_custom_integrations, +) -> None: + """Online HA: ecowitt step uses get_url() placeholders and secrets token when webhook id missing.""" + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + ECOWITT_WEBHOOK_ID: "", + ECOWITT_ENABLED: False, + }, + ) + entry.add_to_hass(hass) + + init = await hass.config_entries.options.async_init(entry.entry_id) + assert init["type"] == "menu" + + # NOTE: + # The integration currently attempts to mutate `yarl.URL.host` when it is missing: + # + # url: URL = URL(get_url(self.hass)) + # if not url.host: + # url.host = "UNKNOWN" + # + # With current yarl versions, `URL.host` is a cached, read-only property, so this + # raises `AttributeError: cached property is read-only`. + # + # We assert that behavior explicitly to keep coverage deterministic and document the + # runtime incompatibility. If the integration code is updated to handle missing hosts + # without mutation (e.g. using `url.raw_host` or building placeholders without setting + # attributes), this assertion should be updated accordingly. + with patch( + "custom_components.sws12500.config_flow.get_url", + return_value="http://", + ): + with pytest.raises(AttributeError): + await hass.config_entries.options.async_configure( + init["flow_id"], user_input={"next_step_id": "ecowitt"} + ) + + # Second call uses a normal URL and completes the flow. + with patch( + "custom_components.sws12500.config_flow.get_url", + return_value="http://example.local:8123", + ): + form = await hass.config_entries.options.async_configure( + init["flow_id"], user_input={"next_step_id": "ecowitt"} + ) + assert form["type"] == "form" + assert form["step_id"] == "ecowitt" + placeholders = form.get("description_placeholders") or {} + assert placeholders["url"] == "example.local" + assert placeholders["port"] == "8123" + assert placeholders["webhook_id"] # generated + + done = await hass.config_entries.options.async_configure( + init["flow_id"], + user_input={ + ECOWITT_WEBHOOK_ID: placeholders["webhook_id"], + ECOWITT_ENABLED: True, + }, + ) + assert done["type"] == "create_entry" + assert done["data"][ECOWITT_ENABLED] is True diff --git a/tests/test_const.py b/tests/test_const.py new file mode 100644 index 0000000..7ad9732 --- /dev/null +++ b/tests/test_const.py @@ -0,0 +1,8 @@ +from custom_components.sws12500.const import DEFAULT_URL, DOMAIN, WINDY_URL, WSLINK_URL + + +def test_const_values(): + assert DOMAIN == "sws12500" + assert DEFAULT_URL == "/weatherstation/updateweatherstation.php" + assert WSLINK_URL == "/data/upload.php" + assert WINDY_URL == "https://stations.windy.com/api/v2/observation/update" diff --git a/tests/test_data.py b/tests/test_data.py new file mode 100644 index 0000000..8ad6cac --- /dev/null +++ b/tests/test_data.py @@ -0,0 +1,13 @@ +from custom_components.sws12500.data import ( + ENTRY_ADD_ENTITIES, + ENTRY_COORDINATOR, + ENTRY_DESCRIPTIONS, + ENTRY_LAST_OPTIONS, +) + + +def test_data_constants(): + assert ENTRY_COORDINATOR == "coordinator" + assert ENTRY_ADD_ENTITIES == "async_add_entities" + assert ENTRY_DESCRIPTIONS == "sensor_descriptions" + assert ENTRY_LAST_OPTIONS == "last_options" diff --git a/tests/test_init.py b/tests/test_init.py new file mode 100644 index 0000000..20e8c03 --- /dev/null +++ b/tests/test_init.py @@ -0,0 +1,95 @@ +"""Integration init tests using Home Assistant pytest fixtures. + +These tests rely on `pytest-homeassistant-custom-component` to provide: +- `hass` fixture (running Home Assistant instance) +- `MockConfigEntry` helper for config entries + +They validate that the integration can set up a config entry and that the +coordinator is created and stored in `hass.data`. + +Note: +This integration registers aiohttp routes via `hass.http.app.router`. In this +test environment, `hass.http` may not be set up, so we patch route registration +to keep these tests focused on setup logic. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest +from pytest_homeassistant_custom_component.common import MockConfigEntry + +from custom_components.sws12500 import WeatherDataUpdateCoordinator, async_setup_entry +from custom_components.sws12500.const import DOMAIN + + +@pytest.fixture +def config_entry() -> MockConfigEntry: + """Create a minimal config entry for the integration.""" + return MockConfigEntry(domain=DOMAIN, data={}, options={}) + + +async def test_async_setup_entry_creates_runtime_state( + hass, config_entry: MockConfigEntry, monkeypatch +): + """Setting up a config entry should succeed and populate hass.data.""" + config_entry.add_to_hass(hass) + + # `async_setup_entry` calls `register_path`, which needs `hass.http`. + # Patch it out so the test doesn't depend on aiohttp being initialized. + monkeypatch.setattr( + "custom_components.sws12500.register_path", + lambda _hass, _coordinator, _entry: True, + ) + + # Avoid depending on Home Assistant integration loader in this test. + # This keeps the test focused on our integration's setup behavior. + monkeypatch.setattr( + hass.config_entries, + "async_forward_entry_setups", + AsyncMock(return_value=True), + ) + + result = await async_setup_entry(hass, config_entry) + assert result is True + + assert DOMAIN in hass.data + assert config_entry.entry_id in hass.data[DOMAIN] + assert isinstance(hass.data[DOMAIN][config_entry.entry_id], dict) + + +async def test_async_setup_entry_forwards_sensor_platform( + hass, config_entry: MockConfigEntry, monkeypatch +): + """The integration should forward entry setups to the sensor platform.""" + config_entry.add_to_hass(hass) + + # `async_setup_entry` calls `register_path`, which needs `hass.http`. + # Patch it out so the test doesn't depend on aiohttp being initialized. + monkeypatch.setattr( + "custom_components.sws12500.register_path", + lambda _hass, _coordinator, _entry: True, + ) + + # Patch forwarding so we don't need to load real platforms for this unit/integration test. + hass.config_entries.async_forward_entry_setups = AsyncMock(return_value=True) + + result = await async_setup_entry(hass, config_entry) + assert result is True + + hass.config_entries.async_forward_entry_setups.assert_awaited() + forwarded_entry, forwarded_platforms = ( + hass.config_entries.async_forward_entry_setups.await_args.args + ) + assert forwarded_entry.entry_id == config_entry.entry_id + assert "sensor" in list(forwarded_platforms) + + +async def test_weather_data_update_coordinator_can_be_constructed( + hass, config_entry: MockConfigEntry +): + """Coordinator should be constructible with a real hass fixture.""" + coordinator = WeatherDataUpdateCoordinator(hass, config_entry) + assert coordinator.hass is hass + assert coordinator.config is config_entry diff --git a/tests/test_integration_lifecycle.py b/tests/test_integration_lifecycle.py new file mode 100644 index 0000000..fa9b858 --- /dev/null +++ b/tests/test_integration_lifecycle.py @@ -0,0 +1,445 @@ +from __future__ import annotations + +from dataclasses import dataclass +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from aiohttp.web_exceptions import HTTPUnauthorized +import pytest +from pytest_homeassistant_custom_component.common import MockConfigEntry + +from custom_components.sws12500 import ( + IncorrectDataError, + WeatherDataUpdateCoordinator, + async_setup_entry, + async_unload_entry, + register_path, + update_listener, +) +from custom_components.sws12500.const import ( + API_ID, + API_KEY, + DEFAULT_URL, + DOMAIN, + SENSORS_TO_LOAD, + WSLINK, + WSLINK_URL, +) +from custom_components.sws12500.data import ENTRY_COORDINATOR, ENTRY_LAST_OPTIONS + + +@dataclass(slots=True) +class _RequestStub: + """Minimal aiohttp Request stub used by `received_data`.""" + + query: dict[str, Any] + + +class _RouterStub: + """Router stub that records route registrations.""" + + def __init__(self) -> None: + self.add_get_calls: list[tuple[str, Any]] = [] + self.add_post_calls: list[tuple[str, Any]] = [] + self.raise_on_add: Exception | None = None + + def add_get(self, path: str, handler: Any) -> Any: + if self.raise_on_add is not None: + raise self.raise_on_add + self.add_get_calls.append((path, handler)) + return object() + + def add_post(self, path: str, handler: Any) -> Any: + if self.raise_on_add is not None: + raise self.raise_on_add + self.add_post_calls.append((path, handler)) + return object() + + +@pytest.fixture +def hass_with_http(hass): + """Provide a real HA hass fixture augmented with a stub http router.""" + router = _RouterStub() + hass.http = SimpleNamespace(app=SimpleNamespace(router=router)) + return hass + + +@pytest.mark.asyncio +async def test_register_path_registers_routes_and_stores_dispatcher(hass_with_http): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + API_ID: "id", + API_KEY: "key", + WSLINK: False, + }, + ) + entry.add_to_hass(hass_with_http) + + coordinator = WeatherDataUpdateCoordinator(hass_with_http, entry) + + ok = register_path(hass_with_http, coordinator, entry) + assert ok is True + + # Router registrations + router: _RouterStub = hass_with_http.http.app.router + assert [p for (p, _h) in router.add_get_calls] == [DEFAULT_URL] + assert [p for (p, _h) in router.add_post_calls] == [WSLINK_URL] + + # Dispatcher stored + assert DOMAIN in hass_with_http.data + assert "routes" in hass_with_http.data[DOMAIN] + routes = hass_with_http.data[DOMAIN]["routes"] + assert routes is not None + # show_enabled() should return a string + assert isinstance(routes.show_enabled(), str) + + +@pytest.mark.asyncio +async def test_register_path_raises_config_entry_not_ready_on_router_runtime_error( + hass_with_http, +): + from homeassistant.exceptions import ConfigEntryNotReady + + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + API_ID: "id", + API_KEY: "key", + WSLINK: False, + }, + ) + entry.add_to_hass(hass_with_http) + + coordinator = WeatherDataUpdateCoordinator(hass_with_http, entry) + + # Make router raise RuntimeError on add + router: _RouterStub = hass_with_http.http.app.router + router.raise_on_add = RuntimeError("router broken") + + with pytest.raises(ConfigEntryNotReady): + register_path(hass_with_http, coordinator, entry) + + +@pytest.mark.asyncio +async def test_register_path_checked_hass_data_wrong_type_raises_config_entry_not_ready( + hass_with_http, +): + """Cover register_path branch where `checked(hass.data[DOMAIN], dict)` returns None.""" + from homeassistant.exceptions import ConfigEntryNotReady + + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={ + API_ID: "id", + API_KEY: "key", + WSLINK: False, + }, + ) + entry.add_to_hass(hass_with_http) + + coordinator = WeatherDataUpdateCoordinator(hass_with_http, entry) + + # Force wrong type under DOMAIN so `checked(..., dict)` fails. + hass_with_http.data[DOMAIN] = [] + + with pytest.raises(ConfigEntryNotReady): + register_path(hass_with_http, coordinator, entry) + + +@pytest.mark.asyncio +async def test_async_setup_entry_creates_entry_dict_and_coordinator_and_forwards_platforms( + hass_with_http, + monkeypatch, +): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", WSLINK: False}, + ) + entry.add_to_hass(hass_with_http) + + # Avoid loading actual platforms via HA loader. + monkeypatch.setattr( + hass_with_http.config_entries, + "async_forward_entry_setups", + AsyncMock(return_value=True), + ) + + ok = await async_setup_entry(hass_with_http, entry) + assert ok is True + + # Runtime storage exists and is a dict + assert DOMAIN in hass_with_http.data + assert entry.entry_id in hass_with_http.data[DOMAIN] + entry_data = hass_with_http.data[DOMAIN][entry.entry_id] + assert isinstance(entry_data, dict) + + # Coordinator stored and last options snapshot stored + assert isinstance(entry_data.get(ENTRY_COORDINATOR), WeatherDataUpdateCoordinator) + assert isinstance(entry_data.get(ENTRY_LAST_OPTIONS), dict) + + # Forwarded setups invoked + hass_with_http.config_entries.async_forward_entry_setups.assert_awaited() + + +@pytest.mark.asyncio +async def test_async_setup_entry_fatal_when_register_path_returns_false( + hass_with_http, monkeypatch +): + """Cover the fatal branch when `register_path` returns False. + + async_setup_entry does: + routes_enabled = register_path(...) + if not routes_enabled: raise PlatformNotReady + """ + from homeassistant.exceptions import PlatformNotReady + + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", WSLINK: False}, + ) + entry.add_to_hass(hass_with_http) + + # Ensure there are no pre-registered routes so async_setup_entry calls register_path. + hass_with_http.data.setdefault(DOMAIN, {}) + hass_with_http.data[DOMAIN].pop("routes", None) + + # Force register_path to return False + monkeypatch.setattr( + "custom_components.sws12500.register_path", + lambda _hass, _coordinator, _entry: False, + ) + + # Forwarding shouldn't be reached; patch anyway to avoid accidental loader calls. + monkeypatch.setattr( + hass_with_http.config_entries, + "async_forward_entry_setups", + AsyncMock(return_value=True), + ) + + with pytest.raises(PlatformNotReady): + await async_setup_entry(hass_with_http, entry) + + +@pytest.mark.asyncio +async def test_async_setup_entry_reuses_existing_coordinator_and_switches_routes( + hass_with_http, + monkeypatch, +): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", WSLINK: False}, + ) + entry.add_to_hass(hass_with_http) + + # Pretend setup already happened and a coordinator exists + hass_with_http.data.setdefault(DOMAIN, {}) + existing_coordinator = WeatherDataUpdateCoordinator(hass_with_http, entry) + hass_with_http.data[DOMAIN][entry.entry_id] = { + ENTRY_COORDINATOR: existing_coordinator, + ENTRY_LAST_OPTIONS: dict(entry.options), + } + + # Provide pre-registered routes dispatcher + routes = hass_with_http.data[DOMAIN].get("routes") + if routes is None: + # Create a dispatcher via register_path once + register_path(hass_with_http, existing_coordinator, entry) + routes = hass_with_http.data[DOMAIN]["routes"] + + # Turn on WSLINK to trigger dispatcher switching. + # ConfigEntry.options cannot be changed directly; use async_update_entry. + hass_with_http.config_entries.async_update_entry( + entry, options={**dict(entry.options), WSLINK: True} + ) + + # Avoid loading actual platforms via HA loader. + monkeypatch.setattr( + hass_with_http.config_entries, + "async_forward_entry_setups", + AsyncMock(return_value=True), + ) + + ok = await async_setup_entry(hass_with_http, entry) + assert ok is True + + # Coordinator reused (same object) + entry_data = hass_with_http.data[DOMAIN][entry.entry_id] + assert entry_data[ENTRY_COORDINATOR] is existing_coordinator + + +@pytest.mark.asyncio +async def test_update_listener_skips_reload_when_only_sensors_to_load_changes( + hass_with_http, +): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", SENSORS_TO_LOAD: ["a"]}, + ) + entry.add_to_hass(hass_with_http) + + # Seed hass.data snapshot + hass_with_http.data.setdefault(DOMAIN, {}) + hass_with_http.data[DOMAIN][entry.entry_id] = { + # Seed the full old options snapshot. If we only store SENSORS_TO_LOAD here, + # update_listener will detect differences for other keys (e.g. auth keys) and reload. + ENTRY_LAST_OPTIONS: dict(entry.options), + } + + hass_with_http.config_entries.async_reload = AsyncMock() + + # Only SENSORS_TO_LOAD changes. + # ConfigEntry.options cannot be changed directly; use async_update_entry. + hass_with_http.config_entries.async_update_entry( + entry, options={**dict(entry.options), SENSORS_TO_LOAD: ["a", "b"]} + ) + + await update_listener(hass_with_http, entry) + + hass_with_http.config_entries.async_reload.assert_not_awaited() + # Snapshot should be updated + entry_data = hass_with_http.data[DOMAIN][entry.entry_id] + assert entry_data[ENTRY_LAST_OPTIONS] == dict(entry.options) + + +@pytest.mark.asyncio +async def test_update_listener_triggers_reload_when_other_option_changes( + hass_with_http, + monkeypatch, +): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", SENSORS_TO_LOAD: ["a"], WSLINK: False}, + ) + entry.add_to_hass(hass_with_http) + + hass_with_http.data.setdefault(DOMAIN, {}) + hass_with_http.data[DOMAIN][entry.entry_id] = { + ENTRY_LAST_OPTIONS: dict(entry.options), + } + + hass_with_http.config_entries.async_reload = AsyncMock(return_value=True) + + # Change a different option. + # ConfigEntry.options cannot be changed directly; use async_update_entry. + hass_with_http.config_entries.async_update_entry( + entry, options={**dict(entry.options), WSLINK: True} + ) + + info = MagicMock() + monkeypatch.setattr("custom_components.sws12500._LOGGER.info", info) + + await update_listener(hass_with_http, entry) + + hass_with_http.config_entries.async_reload.assert_awaited_once_with(entry.entry_id) + info.assert_called() + + +@pytest.mark.asyncio +async def test_update_listener_missing_snapshot_stores_current_options_then_reloads( + hass_with_http, +): + """Cover update_listener branch where the options snapshot is missing/invalid. + + This hits: + entry_data[ENTRY_LAST_OPTIONS] = dict(entry.options) + and then proceeds to reload. + """ + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", SENSORS_TO_LOAD: ["a"], WSLINK: False}, + ) + entry.add_to_hass(hass_with_http) + + hass_with_http.data.setdefault(DOMAIN, {}) + # Store an invalid snapshot type to force the "No/invalid snapshot" branch. + hass_with_http.data[DOMAIN][entry.entry_id] = {ENTRY_LAST_OPTIONS: "invalid"} + + hass_with_http.config_entries.async_reload = AsyncMock(return_value=True) + + await update_listener(hass_with_http, entry) + + entry_data = hass_with_http.data[DOMAIN][entry.entry_id] + assert entry_data[ENTRY_LAST_OPTIONS] == dict(entry.options) + hass_with_http.config_entries.async_reload.assert_awaited_once_with(entry.entry_id) + + +@pytest.mark.asyncio +async def test_async_unload_entry_pops_runtime_data_on_success(hass_with_http): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key"}, + ) + entry.add_to_hass(hass_with_http) + + hass_with_http.data.setdefault(DOMAIN, {}) + hass_with_http.data[DOMAIN][entry.entry_id] = {ENTRY_COORDINATOR: object()} + + hass_with_http.config_entries.async_unload_platforms = AsyncMock(return_value=True) + + ok = await async_unload_entry(hass_with_http, entry) + assert ok is True + assert entry.entry_id not in hass_with_http.data[DOMAIN] + + +@pytest.mark.asyncio +async def test_async_unload_entry_keeps_runtime_data_on_failure(hass_with_http): + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key"}, + ) + entry.add_to_hass(hass_with_http) + + hass_with_http.data.setdefault(DOMAIN, {}) + hass_with_http.data[DOMAIN][entry.entry_id] = {ENTRY_COORDINATOR: object()} + + hass_with_http.config_entries.async_unload_platforms = AsyncMock(return_value=False) + + ok = await async_unload_entry(hass_with_http, entry) + assert ok is False + assert entry.entry_id in hass_with_http.data[DOMAIN] + + +@pytest.mark.asyncio +async def test_received_data_auth_unauthorized_and_incorrect_data_paths(hass): + """A few lifecycle-adjacent assertions to cover coordinator auth behavior in __init__.py.""" + entry = MockConfigEntry( + domain=DOMAIN, + data={}, + options={API_ID: "id", API_KEY: "key", WSLINK: False}, + ) + entry.add_to_hass(hass) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + # Missing security params -> unauthorized + with pytest.raises(HTTPUnauthorized): + await coordinator.received_data(_RequestStub(query={"x": "y"})) # type: ignore[arg-type] + + # Wrong credentials -> unauthorized + with pytest.raises(HTTPUnauthorized): + await coordinator.received_data( + _RequestStub(query={"ID": "id", "PASSWORD": "no"}) + ) # type: ignore[arg-type] + + # Missing API_ID in options -> IncorrectDataError + entry2 = MockConfigEntry( + domain=DOMAIN, data={}, options={API_KEY: "key", WSLINK: False} + ) + entry2.add_to_hass(hass) + coordinator2 = WeatherDataUpdateCoordinator(hass, entry2) + with pytest.raises(IncorrectDataError): + await coordinator2.received_data( + _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + ) # type: ignore[arg-type] diff --git a/tests/test_pocasi_push.py b/tests/test_pocasi_push.py new file mode 100644 index 0000000..73af3b6 --- /dev/null +++ b/tests/test_pocasi_push.py @@ -0,0 +1,302 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta +from types import SimpleNamespace +from typing import Any, Literal +from unittest.mock import AsyncMock, MagicMock + +from aiohttp import ClientError +import pytest + +from custom_components.sws12500.const import ( + DEFAULT_URL, + POCASI_CZ_API_ID, + POCASI_CZ_API_KEY, + POCASI_CZ_ENABLED, + POCASI_CZ_LOGGER_ENABLED, + POCASI_CZ_SEND_INTERVAL, + POCASI_CZ_UNEXPECTED, + POCASI_CZ_URL, + POCASI_INVALID_KEY, + WSLINK_URL, +) +from custom_components.sws12500.pocasti_cz import ( + PocasiApiKeyError, + PocasiPush, + PocasiSuccess, +) + + +@dataclass(slots=True) +class _FakeResponse: + text_value: str + + async def text(self) -> str: + return self.text_value + + async def __aenter__(self) -> "_FakeResponse": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + return None + + +class _FakeSession: + def __init__( + self, *, response: _FakeResponse | None = None, exc: Exception | None = None + ): + self._response = response + self._exc = exc + self.calls: list[dict[str, Any]] = [] + + def get(self, url: str, *, params: dict[str, Any] | None = None): + self.calls.append({"url": url, "params": dict(params or {})}) + if self._exc is not None: + raise self._exc + assert self._response is not None + return self._response + + +def _make_entry( + *, + api_id: str | None = "id", + api_key: str | None = "key", + interval: int = 30, + logger: bool = False, +) -> Any: + options: dict[str, Any] = { + POCASI_CZ_SEND_INTERVAL: interval, + POCASI_CZ_LOGGER_ENABLED: logger, + POCASI_CZ_ENABLED: True, + } + if api_id is not None: + options[POCASI_CZ_API_ID] = api_id + if api_key is not None: + options[POCASI_CZ_API_KEY] = api_key + + entry = SimpleNamespace() + entry.options = options + entry.entry_id = "test_entry_id" + return entry + + +@pytest.fixture +def hass(): + # Minimal hass-like object; we patch client session retrieval. + return SimpleNamespace() + + +@pytest.mark.asyncio +async def test_push_data_to_server_missing_api_id_returns_early(monkeypatch, hass): + entry = _make_entry(api_id=None, api_key="key") + pp = PocasiPush(hass, entry) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + + await pp.push_data_to_server({"x": 1}, "WU") + assert session.calls == [] + + +@pytest.mark.asyncio +async def test_push_data_to_server_missing_api_key_returns_early(monkeypatch, hass): + entry = _make_entry(api_id="id", api_key=None) + pp = PocasiPush(hass, entry) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + + await pp.push_data_to_server({"x": 1}, "WU") + assert session.calls == [] + + +@pytest.mark.asyncio +async def test_push_data_to_server_respects_interval_limit(monkeypatch, hass): + entry = _make_entry(interval=30, logger=True) + pp = PocasiPush(hass, entry) + + # Ensure "next_update > now" so it returns early before doing HTTP. + pp.next_update = datetime.now() + timedelta(seconds=999) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + + await pp.push_data_to_server({"x": 1}, "WU") + assert session.calls == [] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "mode,expected_path", [("WU", DEFAULT_URL), ("WSLINK", WSLINK_URL)] +) +async def test_push_data_to_server_injects_auth_and_chooses_url( + monkeypatch, hass, mode: Literal["WU", "WSLINK"], expected_path: str +): + entry = _make_entry(api_id="id", api_key="key") + pp = PocasiPush(hass, entry) + + # Force send now. + pp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + + # Avoid depending on anonymize output; just make it deterministic. + monkeypatch.setattr("custom_components.sws12500.pocasti_cz.anonymize", lambda d: d) + + await pp.push_data_to_server({"temp": 1}, mode) + + assert len(session.calls) == 1 + call = session.calls[0] + assert call["url"] == f"{POCASI_CZ_URL}{expected_path}" + + params = call["params"] + if mode == "WU": + assert params["ID"] == "id" + assert params["PASSWORD"] == "key" + else: + assert params["wsid"] == "id" + assert params["wspw"] == "key" + + +@pytest.mark.asyncio +async def test_push_data_to_server_calls_verify_response(monkeypatch, hass): + entry = _make_entry() + pp = PocasiPush(hass, entry) + pp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + monkeypatch.setattr("custom_components.sws12500.pocasti_cz.anonymize", lambda d: d) + + verify = MagicMock(return_value=None) + monkeypatch.setattr(pp, "verify_response", verify) + + await pp.push_data_to_server({"x": 1}, "WU") + verify.assert_called_once_with("OK") + + +@pytest.mark.asyncio +async def test_push_data_to_server_api_key_error_disables_feature(monkeypatch, hass): + entry = _make_entry() + pp = PocasiPush(hass, entry) + pp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + monkeypatch.setattr("custom_components.sws12500.pocasti_cz.anonymize", lambda d: d) + + def _raise(_status: str): + raise PocasiApiKeyError + + monkeypatch.setattr(pp, "verify_response", _raise) + + update_options = AsyncMock(return_value=True) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.update_options", update_options + ) + + crit = MagicMock() + monkeypatch.setattr("custom_components.sws12500.pocasti_cz._LOGGER.critical", crit) + + await pp.push_data_to_server({"x": 1}, "WU") + + crit.assert_called() + # Should log invalid key message and disable feature. + assert any( + POCASI_INVALID_KEY in str(c.args[0]) for c in crit.call_args_list if c.args + ) + update_options.assert_awaited_once_with(hass, entry, POCASI_CZ_ENABLED, False) + + +@pytest.mark.asyncio +async def test_push_data_to_server_success_logs_when_logger_enabled(monkeypatch, hass): + entry = _make_entry(logger=True) + pp = PocasiPush(hass, entry) + pp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + monkeypatch.setattr("custom_components.sws12500.pocasti_cz.anonymize", lambda d: d) + + def _raise_success(_status: str): + raise PocasiSuccess + + monkeypatch.setattr(pp, "verify_response", _raise_success) + + info = MagicMock() + monkeypatch.setattr("custom_components.sws12500.pocasti_cz._LOGGER.info", info) + + await pp.push_data_to_server({"x": 1}, "WU") + info.assert_called() + + +@pytest.mark.asyncio +async def test_push_data_to_server_client_error_increments_and_disables_after_three( + monkeypatch, hass +): + entry = _make_entry() + pp = PocasiPush(hass, entry) + + update_options = AsyncMock(return_value=True) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.update_options", update_options + ) + + crit = MagicMock() + monkeypatch.setattr("custom_components.sws12500.pocasti_cz._LOGGER.critical", crit) + + session = _FakeSession(exc=ClientError("boom")) + monkeypatch.setattr( + "custom_components.sws12500.pocasti_cz.async_get_clientsession", + lambda _h: session, + ) + + # Force request attempts and exceed invalid count threshold. + for _i in range(4): + pp.next_update = datetime.now() - timedelta(seconds=1) + await pp.push_data_to_server({"x": 1}, "WU") + + assert pp.invalid_response_count == 4 + # Should disable after >3 + update_options.assert_awaited() + args = update_options.await_args.args + assert args[2] == POCASI_CZ_ENABLED + assert args[3] is False + # Should log unexpected at least once + assert any( + POCASI_CZ_UNEXPECTED in str(c.args[0]) for c in crit.call_args_list if c.args + ) + + +def test_verify_response_logs_debug_when_logger_enabled(monkeypatch, hass): + entry = _make_entry(logger=True) + pp = PocasiPush(hass, entry) + + dbg = MagicMock() + monkeypatch.setattr("custom_components.sws12500.pocasti_cz._LOGGER.debug", dbg) + + assert pp.verify_response("anything") is None + dbg.assert_called() diff --git a/tests/test_received_data.py b/tests/test_received_data.py new file mode 100644 index 0000000..f26fd97 --- /dev/null +++ b/tests/test_received_data.py @@ -0,0 +1,494 @@ +from __future__ import annotations + +from dataclasses import dataclass +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from aiohttp.web_exceptions import HTTPUnauthorized +import pytest + +from custom_components.sws12500 import IncorrectDataError, WeatherDataUpdateCoordinator +from custom_components.sws12500.const import ( + API_ID, + API_KEY, + DEFAULT_URL, + DOMAIN, + POCASI_CZ_ENABLED, + SENSORS_TO_LOAD, + WINDY_ENABLED, + WSLINK, + WSLINK_URL, +) + + +@dataclass(slots=True) +class _RequestStub: + """Minimal aiohttp Request stub. + + The coordinator only uses `webdata.query` (a mapping of query parameters). + """ + + query: dict[str, Any] + + +def _make_entry( + *, + wslink: bool = False, + api_id: str | None = "id", + api_key: str | None = "key", + windy_enabled: bool = False, + pocasi_enabled: bool = False, + dev_debug: bool = False, +) -> Any: + """Create a minimal config entry stub with `.options` and `.entry_id`.""" + options: dict[str, Any] = { + WSLINK: wslink, + WINDY_ENABLED: windy_enabled, + POCASI_CZ_ENABLED: pocasi_enabled, + "dev_debug_checkbox": dev_debug, + } + if api_id is not None: + options[API_ID] = api_id + if api_key is not None: + options[API_KEY] = api_key + + entry = SimpleNamespace() + entry.entry_id = "test_entry_id" + entry.options = options + return entry + + +@pytest.mark.asyncio +async def test_received_data_wu_missing_security_params_raises_http_unauthorized( + hass, monkeypatch +): + entry = _make_entry(wslink=False) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + # No ID/PASSWORD -> unauthorized + request = _RequestStub(query={"foo": "bar"}) + with pytest.raises(HTTPUnauthorized): + await coordinator.received_data(request) # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_received_data_wslink_missing_security_params_raises_http_unauthorized( + hass, monkeypatch +): + entry = _make_entry(wslink=True) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + # No wsid/wspw -> unauthorized + request = _RequestStub(query={"foo": "bar"}) + with pytest.raises(HTTPUnauthorized): + await coordinator.received_data(request) # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_received_data_missing_api_id_in_options_raises_incorrect_data_error( + hass, monkeypatch +): + entry = _make_entry(wslink=False, api_id=None, api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + with pytest.raises(IncorrectDataError): + await coordinator.received_data(request) # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_received_data_missing_api_key_in_options_raises_incorrect_data_error( + hass, monkeypatch +): + entry = _make_entry(wslink=False, api_id="id", api_key=None) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + with pytest.raises(IncorrectDataError): + await coordinator.received_data(request) # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_received_data_wrong_credentials_raises_http_unauthorized( + hass, monkeypatch +): + entry = _make_entry(wslink=False, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + request = _RequestStub(query={"ID": "id", "PASSWORD": "wrong"}) + with pytest.raises(HTTPUnauthorized): + await coordinator.received_data(request) # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_received_data_success_remaps_and_updates_coordinator_data( + hass, monkeypatch +): + entry = _make_entry(wslink=False, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + # Patch remapping so this test doesn't depend on mapping tables. + remapped = {"outside_temp": "10"} + monkeypatch.setattr( + "custom_components.sws12500.remap_items", + lambda _data: remapped, + ) + + # Ensure no autodiscovery triggers + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: [], + ) + + # Capture updates + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key", "tempf": "50"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + coordinator.async_set_updated_data.assert_called_once_with(remapped) + + +@pytest.mark.asyncio +async def test_received_data_success_wslink_uses_wslink_remap(hass, monkeypatch): + entry = _make_entry(wslink=True, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + remapped = {"ws_temp": "1"} + monkeypatch.setattr( + "custom_components.sws12500.remap_wslink_items", + lambda _data: remapped, + ) + # If the wrong remapper is used, we'd crash because we won't patch it: + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: [], + ) + + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"wsid": "id", "wspw": "key", "t": "1"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + coordinator.async_set_updated_data.assert_called_once_with(remapped) + + +@pytest.mark.asyncio +async def test_received_data_forwards_to_windy_when_enabled(hass, monkeypatch): + entry = _make_entry(wslink=False, api_id="id", api_key="key", windy_enabled=True) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + coordinator.windy.push_data_to_windy = AsyncMock() + + monkeypatch.setattr( + "custom_components.sws12500.remap_items", + lambda _data: {"k": "v"}, + ) + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: [], + ) + + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key", "x": "y"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + coordinator.windy.push_data_to_windy.assert_awaited_once() + args, _kwargs = coordinator.windy.push_data_to_windy.await_args + assert isinstance(args[0], dict) # raw data dict + assert args[1] is False # wslink flag + + +@pytest.mark.asyncio +async def test_received_data_forwards_to_pocasi_when_enabled(hass, monkeypatch): + entry = _make_entry(wslink=True, api_id="id", api_key="key", pocasi_enabled=True) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + coordinator.pocasi.push_data_to_server = AsyncMock() + + monkeypatch.setattr( + "custom_components.sws12500.remap_wslink_items", + lambda _data: {"k": "v"}, + ) + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: [], + ) + + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"wsid": "id", "wspw": "key", "x": "y"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + coordinator.pocasi.push_data_to_server.assert_awaited_once() + args, _kwargs = coordinator.pocasi.push_data_to_server.await_args + assert isinstance(args[0], dict) # raw data dict + assert args[1] == "WSLINK" + + +@pytest.mark.asyncio +async def test_received_data_autodiscovery_updates_options_notifies_and_adds_sensors( + hass, + monkeypatch, +): + entry = _make_entry(wslink=False, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + # Arrange: remapped payload contains keys that are disabled. + remapped = {"a": "1", "b": "2"} + monkeypatch.setattr("custom_components.sws12500.remap_items", lambda _d: remapped) + + # Autodiscovery finds two sensors to add + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: ["a", "b"], + ) + + # No previously loaded sensors + monkeypatch.setattr("custom_components.sws12500.loaded_sensors", lambda _c: []) + + # translations returns a friendly name for each sensor key + async def _translations(_hass, _domain, _key, **_kwargs): + # return something non-None so it's included in human readable string + return "Name" + + monkeypatch.setattr("custom_components.sws12500.translations", _translations) + + translated_notification = AsyncMock() + monkeypatch.setattr( + "custom_components.sws12500.translated_notification", translated_notification + ) + + update_options = AsyncMock() + monkeypatch.setattr("custom_components.sws12500.update_options", update_options) + + add_new_sensors = MagicMock() + monkeypatch.setattr( + "custom_components.sws12500.sensor.add_new_sensors", add_new_sensors + ) + + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + + # It should notify + translated_notification.assert_awaited() + + # It should persist newly discovered sensors + update_options.assert_awaited() + args, _kwargs = update_options.await_args + assert args[2] == SENSORS_TO_LOAD + assert set(args[3]) >= {"a", "b"} + + # It should add new sensors dynamically + add_new_sensors.assert_called_once() + _hass_arg, _entry_arg, keys = add_new_sensors.call_args.args + assert _hass_arg is hass + assert _entry_arg is entry + assert set(keys) == {"a", "b"} + + coordinator.async_set_updated_data.assert_called_once_with(remapped) + + +@pytest.mark.asyncio +async def test_received_data_autodiscovery_human_readable_empty_branch_via_checked_none( + hass, + monkeypatch, +): + """Force `checked([...], list[str])` to return None so `human_readable = ""` branch is executed.""" + entry = _make_entry(wslink=False, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + remapped = {"a": "1"} + monkeypatch.setattr("custom_components.sws12500.remap_items", lambda _d: remapped) + + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: ["a"], + ) + monkeypatch.setattr("custom_components.sws12500.loaded_sensors", lambda _c: []) + + # Return a translation so the list comprehension would normally include an item. + async def _translations(_hass, _domain, _key, **_kwargs): + return "Name" + + monkeypatch.setattr("custom_components.sws12500.translations", _translations) + + # Force checked(...) to return None when the code tries to validate translate_sensors as list[str]. + def _checked_override(value, expected_type): + if expected_type == list[str]: + return None + return value + + monkeypatch.setattr("custom_components.sws12500.checked", _checked_override) + + translated_notification = AsyncMock() + monkeypatch.setattr( + "custom_components.sws12500.translated_notification", translated_notification + ) + + update_options = AsyncMock() + monkeypatch.setattr("custom_components.sws12500.update_options", update_options) + + add_new_sensors = MagicMock() + monkeypatch.setattr( + "custom_components.sws12500.sensor.add_new_sensors", add_new_sensors + ) + + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + + # Ensure it still notifies (with empty human readable list) + translated_notification.assert_awaited() + # And persists sensors + update_options.assert_awaited() + coordinator.async_set_updated_data.assert_called_once_with(remapped) + + +@pytest.mark.asyncio +async def test_received_data_autodiscovery_extends_with_loaded_sensors_branch( + hass, monkeypatch +): + """Cover `_loaded_sensors := loaded_sensors(self.config)` branch (extend existing).""" + entry = _make_entry(wslink=False, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + remapped = {"new": "1"} + monkeypatch.setattr("custom_components.sws12500.remap_items", lambda _d: remapped) + + # Autodiscovery finds one new sensor + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: ["new"], + ) + + # Pretend there are already loaded sensors in options + monkeypatch.setattr( + "custom_components.sws12500.loaded_sensors", lambda _c: ["existing"] + ) + + async def _translations(_hass, _domain, _key, **_kwargs): + return "Name" + + monkeypatch.setattr("custom_components.sws12500.translations", _translations) + + monkeypatch.setattr( + "custom_components.sws12500.translated_notification", AsyncMock() + ) + + update_options = AsyncMock() + monkeypatch.setattr("custom_components.sws12500.update_options", update_options) + + monkeypatch.setattr( + "custom_components.sws12500.sensor.add_new_sensors", MagicMock() + ) + + coordinator.async_set_updated_data = MagicMock() + + resp = await coordinator.received_data( + _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + ) # type: ignore[arg-type] + + assert resp.status == 200 + + # Ensure the persisted list includes both new and existing sensors + update_options.assert_awaited() + args, _kwargs = update_options.await_args + assert args[2] == SENSORS_TO_LOAD + assert set(args[3]) >= {"new", "existing"} + + +@pytest.mark.asyncio +async def test_received_data_autodiscovery_translations_all_none_still_notifies_and_updates( + hass, monkeypatch +): + """Cover the branch where translated sensor names cannot be resolved (human_readable becomes empty).""" + entry = _make_entry(wslink=False, api_id="id", api_key="key") + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + remapped = {"a": "1"} + monkeypatch.setattr("custom_components.sws12500.remap_items", lambda _d: remapped) + + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: ["a"], + ) + monkeypatch.setattr("custom_components.sws12500.loaded_sensors", lambda _c: []) + + # Force translations to return None for every lookup -> translate_sensors becomes None and human_readable "" + async def _translations(_hass, _domain, _key, **_kwargs): + return None + + monkeypatch.setattr("custom_components.sws12500.translations", _translations) + + translated_notification = AsyncMock() + monkeypatch.setattr( + "custom_components.sws12500.translated_notification", translated_notification + ) + + update_options = AsyncMock() + monkeypatch.setattr("custom_components.sws12500.update_options", update_options) + + add_new_sensors = MagicMock() + monkeypatch.setattr( + "custom_components.sws12500.sensor.add_new_sensors", add_new_sensors + ) + + coordinator.async_set_updated_data = MagicMock() + + resp = await coordinator.received_data( + _RequestStub(query={"ID": "id", "PASSWORD": "key"}) + ) # type: ignore[arg-type] + + assert resp.status == 200 + translated_notification.assert_awaited() + update_options.assert_awaited() + add_new_sensors.assert_called_once() + coordinator.async_set_updated_data.assert_called_once_with(remapped) + + +@pytest.mark.asyncio +async def test_received_data_dev_logging_calls_anonymize_and_logs(hass, monkeypatch): + entry = _make_entry(wslink=False, api_id="id", api_key="key", dev_debug=True) + coordinator = WeatherDataUpdateCoordinator(hass, entry) + + monkeypatch.setattr("custom_components.sws12500.remap_items", lambda _d: {"k": "v"}) + monkeypatch.setattr( + "custom_components.sws12500.check_disabled", + lambda _remaped_items, _config: [], + ) + + anonymize = MagicMock(return_value={"safe": True}) + monkeypatch.setattr("custom_components.sws12500.anonymize", anonymize) + + log_info = MagicMock() + monkeypatch.setattr("custom_components.sws12500._LOGGER.info", log_info) + + coordinator.async_set_updated_data = MagicMock() + + request = _RequestStub(query={"ID": "id", "PASSWORD": "key", "x": "y"}) + resp = await coordinator.received_data(request) # type: ignore[arg-type] + + assert resp.status == 200 + anonymize.assert_called_once() + log_info.assert_called_once() + + +@pytest.mark.asyncio +async def test_register_path_switching_logic_is_exercised_via_routes(monkeypatch): + """Sanity: constants exist and are distinct (helps guard tests relying on them).""" + assert DEFAULT_URL != WSLINK_URL + assert DOMAIN == "sws12500" diff --git a/tests/test_routes.py b/tests/test_routes.py new file mode 100644 index 0000000..53a0914 --- /dev/null +++ b/tests/test_routes.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Awaitable, Callable + +from aiohttp.web import Response +import pytest + +from custom_components.sws12500.routes import Routes, unregistered + +Handler = Callable[["_RequestStub"], Awaitable[Response]] + + +@dataclass(slots=True) +class _RequestStub: + """Minimal request stub for unit-testing the dispatcher. + + `Routes.dispatch` relies on `request.path`. + `unregistered` accepts a request object but does not use it. + """ + + path: str + + +@pytest.fixture +def routes() -> Routes: + return Routes() + + +async def test_dispatch_unknown_path_calls_unregistered(routes: Routes) -> None: + request = _RequestStub(path="/unregistered") + response = await routes.dispatch(request) # type: ignore[arg-type] + assert response.status == 400 + + +async def test_unregistered_handler_returns_400() -> None: + request = _RequestStub(path="/invalid") + response = await unregistered(request) # type: ignore[arg-type] + assert response.status == 400 + + +async def test_dispatch_registered_but_disabled_uses_fallback(routes: Routes) -> None: + async def handler(_request: _RequestStub) -> Response: + return Response(text="OK", status=200) + + routes.add_route("/a", handler, enabled=False) + + response = await routes.dispatch(_RequestStub(path="/a")) # type: ignore[arg-type] + assert response.status == 400 + + +async def test_dispatch_registered_and_enabled_uses_handler(routes: Routes) -> None: + async def handler(_request: _RequestStub) -> Response: + return Response(text="OK", status=201) + + routes.add_route("/a", handler, enabled=True) + + response = await routes.dispatch(_RequestStub(path="/a")) # type: ignore[arg-type] + assert response.status == 201 + + +def test_switch_route_enables_exactly_one(routes: Routes) -> None: + async def handler_a(_request: _RequestStub) -> Response: + return Response(text="A", status=200) + + async def handler_b(_request: _RequestStub) -> Response: + return Response(text="B", status=200) + + routes.add_route("/a", handler_a, enabled=True) + routes.add_route("/b", handler_b, enabled=False) + + routes.switch_route("/b") + + assert routes.routes["/a"].enabled is False + assert routes.routes["/b"].enabled is True + + +def test_show_enabled_returns_message_when_none_enabled(routes: Routes) -> None: + async def handler(_request: _RequestStub) -> Response: + return Response(text="OK", status=200) + + routes.add_route("/a", handler, enabled=False) + routes.add_route("/b", handler, enabled=False) + + assert routes.show_enabled() == "No routes is enabled." + + +def test_show_enabled_includes_url_when_enabled(routes: Routes) -> None: + async def handler(_request: _RequestStub) -> Response: + return Response(text="OK", status=200) + + routes.add_route("/a", handler, enabled=False) + routes.add_route("/b", handler, enabled=True) + + msg = routes.show_enabled() + assert "Dispatcher enabled for URL: /b" in msg + assert "handler" in msg diff --git a/tests/test_sensor_platform.py b/tests/test_sensor_platform.py new file mode 100644 index 0000000..8b865c6 --- /dev/null +++ b/tests/test_sensor_platform.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from custom_components.sws12500.const import ( + CHILL_INDEX, + HEAT_INDEX, + OUTSIDE_HUMIDITY, + OUTSIDE_TEMP, + SENSORS_TO_LOAD, + WIND_AZIMUT, + WIND_DIR, + WIND_SPEED, + WSLINK, +) +from custom_components.sws12500.data import ( + ENTRY_ADD_ENTITIES, + ENTRY_COORDINATOR, + ENTRY_DESCRIPTIONS, +) +from custom_components.sws12500.sensor import ( + WeatherSensor, + _auto_enable_derived_sensors, + add_new_sensors, + async_setup_entry, +) +from custom_components.sws12500.sensors_weather import SENSOR_TYPES_WEATHER_API +from custom_components.sws12500.sensors_wslink import SENSOR_TYPES_WSLINK + + +@dataclass(slots=True) +class _ConfigEntryStub: + entry_id: str + options: dict[str, Any] + + +class _CoordinatorStub: + """Minimal coordinator stub for WeatherSensor and platform setup.""" + + def __init__( + self, data: dict[str, Any] | None = None, *, config: Any | None = None + ) -> None: + self.data = data if data is not None else {} + self.config = config + + +@pytest.fixture +def hass(): + # Use a very small hass-like object; sensor platform uses only `hass.data`. + class _Hass: + def __init__(self) -> None: + self.data: dict[str, Any] = {} + + return _Hass() + + +@pytest.fixture +def config_entry() -> _ConfigEntryStub: + return _ConfigEntryStub(entry_id="test_entry_id", options={}) + + +def _capture_add_entities(): + captured: list[Any] = [] + + def _add_entities(entities: list[Any]) -> None: + captured.extend(entities) + + return captured, _add_entities + + +def test_auto_enable_derived_sensors_wind_azimut(): + requested = {WIND_DIR} + expanded = _auto_enable_derived_sensors(requested) + assert WIND_DIR in expanded + assert WIND_AZIMUT in expanded + + +def test_auto_enable_derived_sensors_heat_index(): + requested = {OUTSIDE_TEMP, OUTSIDE_HUMIDITY} + expanded = _auto_enable_derived_sensors(requested) + assert HEAT_INDEX in expanded + + +def test_auto_enable_derived_sensors_chill_index(): + requested = {OUTSIDE_TEMP, WIND_SPEED} + expanded = _auto_enable_derived_sensors(requested) + assert CHILL_INDEX in expanded + + +@pytest.mark.asyncio +async def test_sensor_async_setup_entry_no_coordinator_is_noop(hass, config_entry): + # No entry dict created by integration yet; async_setup_entry should be defensive and no-op. + captured, add_entities = _capture_add_entities() + + await async_setup_entry(hass, config_entry, add_entities) + + assert captured == [] + + +@pytest.mark.asyncio +async def test_sensor_async_setup_entry_stores_callback_and_descriptions_even_if_no_sensors_to_load( + hass, config_entry +): + # Prepare runtime entry data and coordinator like integration does. + hass.data.setdefault("sws12500", {}) + hass.data["sws12500"][config_entry.entry_id] = { + ENTRY_COORDINATOR: _CoordinatorStub() + } + + captured, add_entities = _capture_add_entities() + + # No SENSORS_TO_LOAD set -> early return, but it should still store callback + descriptions. + await async_setup_entry(hass, config_entry, add_entities) + + entry_data = hass.data["sws12500"][config_entry.entry_id] + assert entry_data[ENTRY_ADD_ENTITIES] is add_entities + assert isinstance(entry_data[ENTRY_DESCRIPTIONS], dict) + assert captured == [] + + +@pytest.mark.asyncio +async def test_sensor_async_setup_entry_selects_weather_api_descriptions_when_wslink_disabled( + hass, config_entry +): + hass.data.setdefault("sws12500", {}) + hass.data["sws12500"][config_entry.entry_id] = { + ENTRY_COORDINATOR: _CoordinatorStub() + } + + captured, add_entities = _capture_add_entities() + + # Explicitly disabled WSLINK + config_entry.options[WSLINK] = False + + await async_setup_entry(hass, config_entry, add_entities) + + descriptions = hass.data["sws12500"][config_entry.entry_id][ENTRY_DESCRIPTIONS] + assert set(descriptions.keys()) == {d.key for d in SENSOR_TYPES_WEATHER_API} + assert captured == [] + + +@pytest.mark.asyncio +async def test_sensor_async_setup_entry_selects_wslink_descriptions_when_wslink_enabled( + hass, config_entry +): + hass.data.setdefault("sws12500", {}) + hass.data["sws12500"][config_entry.entry_id] = { + ENTRY_COORDINATOR: _CoordinatorStub() + } + + captured, add_entities = _capture_add_entities() + + config_entry.options[WSLINK] = True + + await async_setup_entry(hass, config_entry, add_entities) + + descriptions = hass.data["sws12500"][config_entry.entry_id][ENTRY_DESCRIPTIONS] + assert set(descriptions.keys()) == {d.key for d in SENSOR_TYPES_WSLINK} + assert captured == [] + + +@pytest.mark.asyncio +async def test_sensor_async_setup_entry_adds_requested_entities_and_auto_enables_derived( + hass, config_entry +): + hass.data.setdefault("sws12500", {}) + coordinator = _CoordinatorStub() + hass.data["sws12500"][config_entry.entry_id] = {ENTRY_COORDINATOR: coordinator} + + captured, add_entities = _capture_add_entities() + + # Request WIND_DIR, OUTSIDE_TEMP, OUTSIDE_HUMIDITY, WIND_SPEED -> should auto-add derived keys too. + config_entry.options[WSLINK] = False + config_entry.options[SENSORS_TO_LOAD] = [ + WIND_DIR, + OUTSIDE_TEMP, + OUTSIDE_HUMIDITY, + WIND_SPEED, + ] + + await async_setup_entry(hass, config_entry, add_entities) + + # We should have at least those requested + derived in the added entities. + keys_added = { + e.entity_description.key for e in captured if isinstance(e, WeatherSensor) + } + assert WIND_DIR in keys_added + assert OUTSIDE_TEMP in keys_added + assert OUTSIDE_HUMIDITY in keys_added + assert WIND_SPEED in keys_added + + # Derived: + assert WIND_AZIMUT in keys_added + assert HEAT_INDEX in keys_added + assert CHILL_INDEX in keys_added + + +def test_add_new_sensors_is_noop_when_domain_missing(hass, config_entry): + called = False + + def add_entities(_entities: list[Any]) -> None: + nonlocal called + called = True + + # No hass.data["sws12500"] at all. + add_new_sensors(hass, config_entry, keys=["anything"]) + + assert called is False + + +def test_add_new_sensors_is_noop_when_entry_missing(hass, config_entry): + hass.data["sws12500"] = {} + called = False + + def add_entities(_entities: list[Any]) -> None: + nonlocal called + called = True + + add_new_sensors(hass, config_entry, keys=["anything"]) + + assert called is False + + +def test_add_new_sensors_is_noop_when_callback_or_descriptions_missing( + hass, config_entry +): + hass.data["sws12500"] = { + config_entry.entry_id: {ENTRY_COORDINATOR: _CoordinatorStub()} + } + called = False + + def add_entities(_entities: list[Any]) -> None: + nonlocal called + called = True + + # Missing ENTRY_ADD_ENTITIES + ENTRY_DESCRIPTIONS -> no-op. + add_new_sensors(hass, config_entry, keys=["anything"]) + + assert called is False + + +def test_add_new_sensors_ignores_unknown_keys(hass, config_entry): + hass.data["sws12500"] = { + config_entry.entry_id: { + ENTRY_COORDINATOR: _CoordinatorStub(), + ENTRY_ADD_ENTITIES: MagicMock(), + ENTRY_DESCRIPTIONS: {}, # nothing known + } + } + + add_new_sensors(hass, config_entry, keys=["unknown_key"]) + + hass.data["sws12500"][config_entry.entry_id][ENTRY_ADD_ENTITIES].assert_not_called() + + +def test_add_new_sensors_adds_known_keys(hass, config_entry): + coordinator = _CoordinatorStub() + add_entities = MagicMock() + + # Use one known description from the weather API list. + known_desc = SENSOR_TYPES_WEATHER_API[0] + + hass.data["sws12500"] = { + config_entry.entry_id: { + ENTRY_COORDINATOR: coordinator, + ENTRY_ADD_ENTITIES: add_entities, + ENTRY_DESCRIPTIONS: {known_desc.key: known_desc}, + } + } + + add_new_sensors(hass, config_entry, keys=[known_desc.key]) + + add_entities.assert_called_once() + (entities_arg,) = add_entities.call_args.args + assert isinstance(entities_arg, list) + assert len(entities_arg) == 1 + assert isinstance(entities_arg[0], WeatherSensor) + assert entities_arg[0].entity_description.key == known_desc.key diff --git a/tests/test_sensors_common.py b/tests/test_sensors_common.py new file mode 100644 index 0000000..0fbaa52 --- /dev/null +++ b/tests/test_sensors_common.py @@ -0,0 +1,6 @@ +# Test file for sensors_common.py module + +def test_sensors_common_functionality(): + # Add your test cases here + pass + diff --git a/tests/test_sensors_weather.py b/tests/test_sensors_weather.py new file mode 100644 index 0000000..5b1a6b3 --- /dev/null +++ b/tests/test_sensors_weather.py @@ -0,0 +1,6 @@ +# Test file for sensors_weather.py module + +def test_sensors_weather_functionality(): + # Add your test cases here + pass + diff --git a/tests/test_sensors_wslink.py b/tests/test_sensors_wslink.py new file mode 100644 index 0000000..9f9317e --- /dev/null +++ b/tests/test_sensors_wslink.py @@ -0,0 +1,10 @@ +from custom_components.sws12500.sensors_wslink import SENSOR_TYPES_WSLINK +import pytest + + +def test_sensor_types_wslink_structure(): + assert isinstance(SENSOR_TYPES_WSLINK, tuple) + assert len(SENSOR_TYPES_WSLINK) > 0 + for sensor in SENSOR_TYPES_WSLINK: + assert hasattr(sensor, "key") + assert hasattr(sensor, "native_unit_of_measurement") diff --git a/tests/test_strings.py b/tests/test_strings.py new file mode 100644 index 0000000..1625286 --- /dev/null +++ b/tests/test_strings.py @@ -0,0 +1,6 @@ +# Test file for strings.json module + +def test_strings_functionality(): + # Add your test cases here + pass + diff --git a/tests/test_translations_cs.py b/tests/test_translations_cs.py new file mode 100644 index 0000000..35d0762 --- /dev/null +++ b/tests/test_translations_cs.py @@ -0,0 +1,6 @@ +# Test file for translations/cs.json module + +def test_translations_cs_functionality(): + # Add your test cases here + pass + diff --git a/tests/test_translations_en.py b/tests/test_translations_en.py new file mode 100644 index 0000000..51b6392 --- /dev/null +++ b/tests/test_translations_en.py @@ -0,0 +1,6 @@ +# Test file for translations/en.json module + +def test_translations_en_functionality(): + # Add your test cases here + pass + diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..0a11644 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,8 @@ +from custom_components.sws12500.utils import celsius_to_fahrenheit, fahrenheit_to_celsius + + +def test_temperature_conversion(): + assert celsius_to_fahrenheit(0) == 32 + assert celsius_to_fahrenheit(100) == 212 + assert fahrenheit_to_celsius(32) == 0 + assert fahrenheit_to_celsius(212) == 100 diff --git a/tests/test_utils_more.py b/tests/test_utils_more.py new file mode 100644 index 0000000..353dbc2 --- /dev/null +++ b/tests/test_utils_more.py @@ -0,0 +1,364 @@ +from __future__ import annotations + +from dataclasses import dataclass +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from custom_components.sws12500.const import ( + DEV_DBG, + OUTSIDE_HUMIDITY, + OUTSIDE_TEMP, + REMAP_ITEMS, + REMAP_WSLINK_ITEMS, + SENSORS_TO_LOAD, + WIND_SPEED, + UnitOfBat, +) +from custom_components.sws12500.utils import ( + anonymize, + battery_level, + battery_level_to_icon, + celsius_to_fahrenheit, + check_disabled, + chill_index, + fahrenheit_to_celsius, + heat_index, + loaded_sensors, + remap_items, + remap_wslink_items, + translated_notification, + translations, + update_options, + wind_dir_to_text, +) + + +@dataclass(slots=True) +class _EntryStub: + entry_id: str = "test_entry_id" + options: dict[str, Any] = None # type: ignore[assignment] + + +class _ConfigEntriesStub: + def __init__(self) -> None: + self.async_update_entry = MagicMock(return_value=True) + + +class _HassStub: + def __init__(self, language: str = "en") -> None: + self.config = SimpleNamespace(language=language) + self.config_entries = _ConfigEntriesStub() + + +@pytest.fixture +def hass() -> _HassStub: + return _HassStub(language="en") + + +@pytest.fixture +def entry() -> _EntryStub: + return _EntryStub(options={}) + + +def test_anonymize_masks_secrets_and_keeps_other_values(): + data = { + "ID": "abc", + "PASSWORD": "secret", + "wsid": "id2", + "wspw": "pw2", + "temp": 10, + "ok": True, + } + out = anonymize(data) + assert out["ID"] == "***" + assert out["PASSWORD"] == "***" + assert out["wsid"] == "***" + assert out["wspw"] == "***" + assert out["temp"] == 10 + assert out["ok"] is True + + +def test_remap_items_filters_unknown_keys(): + # Pick a known legacy key from the mapping + legacy_key = next(iter(REMAP_ITEMS.keys())) + internal_key = REMAP_ITEMS[legacy_key] + + entities = {legacy_key: "1", "unknown": "2"} + out = remap_items(entities) + + assert out == {internal_key: "1"} + + +def test_remap_wslink_items_filters_unknown_keys(): + wslink_key = next(iter(REMAP_WSLINK_ITEMS.keys())) + internal_key = REMAP_WSLINK_ITEMS[wslink_key] + + entities = {wslink_key: "x", "unknown": "y"} + out = remap_wslink_items(entities) + + assert out == {internal_key: "x"} + + +def test_loaded_sensors_returns_list_or_empty(entry: _EntryStub): + entry.options[SENSORS_TO_LOAD] = ["a", "b"] + assert loaded_sensors(entry) == ["a", "b"] + + entry.options[SENSORS_TO_LOAD] = [] + assert loaded_sensors(entry) == [] + + entry.options.pop(SENSORS_TO_LOAD) + assert loaded_sensors(entry) == [] + + +def test_check_disabled_returns_none_when_all_present(entry: _EntryStub): + entry.options[SENSORS_TO_LOAD] = ["a", "b"] + entry.options[DEV_DBG] = False + + missing = check_disabled({"a": "1", "b": "2"}, entry) + assert missing is None + + +def test_check_disabled_returns_missing_keys(entry: _EntryStub): + entry.options[SENSORS_TO_LOAD] = ["a"] + entry.options[DEV_DBG] = False + + missing = check_disabled({"a": "1", "b": "2", "c": "3"}, entry) + assert missing == ["b", "c"] + + +def test_check_disabled_logs_when_dev_dbg_enabled(entry: _EntryStub, monkeypatch): + # Just ensure logging branches are exercised without asserting exact messages. + entry.options[SENSORS_TO_LOAD] = [] + entry.options[DEV_DBG] = True + + monkeypatch.setattr( + "custom_components.sws12500.utils._LOGGER.info", lambda *a, **k: None + ) + + missing = check_disabled({"a": "1"}, entry) + assert missing == ["a"] + + +@pytest.mark.asyncio +async def test_update_options_calls_async_update_entry( + hass: _HassStub, entry: _EntryStub +): + entry.options = {"x": 1} + ok = await update_options(hass, entry, "y", True) + assert ok is True + hass.config_entries.async_update_entry.assert_called_once() + _called_entry = hass.config_entries.async_update_entry.call_args.args[0] + assert _called_entry is entry + called_options = hass.config_entries.async_update_entry.call_args.kwargs["options"] + assert called_options["x"] == 1 + assert called_options["y"] is True + + +@pytest.mark.asyncio +async def test_translations_returns_value_when_key_present( + hass: _HassStub, monkeypatch +): + # Build the key that translations() will look for + localize_key = "component.sws12500.entity.sensor.test.name" + get_translations = AsyncMock(return_value={localize_key: "Translated"}) + monkeypatch.setattr( + "custom_components.sws12500.utils.async_get_translations", get_translations + ) + + out = await translations( + hass, + "sws12500", + "sensor.test", + key="name", + category="entity", + ) + assert out == "Translated" + + +@pytest.mark.asyncio +async def test_translations_returns_none_when_key_missing(hass: _HassStub, monkeypatch): + get_translations = AsyncMock(return_value={}) + monkeypatch.setattr( + "custom_components.sws12500.utils.async_get_translations", get_translations + ) + + out = await translations(hass, "sws12500", "missing") + assert out is None + + +@pytest.mark.asyncio +async def test_translated_notification_creates_notification_without_placeholders( + hass: _HassStub, monkeypatch +): + base_key = "component.sws12500.notify.added.message" + title_key = "component.sws12500.notify.added.title" + get_translations = AsyncMock(return_value={base_key: "Msg", title_key: "Title"}) + monkeypatch.setattr( + "custom_components.sws12500.utils.async_get_translations", get_translations + ) + + create = MagicMock() + monkeypatch.setattr( + "custom_components.sws12500.utils.persistent_notification.async_create", create + ) + + await translated_notification(hass, "sws12500", "added") + create.assert_called_once() + args = create.call_args.args + assert args[0] is hass + assert args[1] == "Msg" + assert args[2] == "Title" + + +@pytest.mark.asyncio +async def test_translated_notification_formats_placeholders( + hass: _HassStub, monkeypatch +): + base_key = "component.sws12500.notify.added.message" + title_key = "component.sws12500.notify.added.title" + get_translations = AsyncMock( + return_value={base_key: "Hello {name}", title_key: "Title"} + ) + monkeypatch.setattr( + "custom_components.sws12500.utils.async_get_translations", get_translations + ) + + create = MagicMock() + monkeypatch.setattr( + "custom_components.sws12500.utils.persistent_notification.async_create", create + ) + + await translated_notification( + hass, "sws12500", "added", translation_placeholders={"name": "World"} + ) + create.assert_called_once() + assert create.call_args.args[1] == "Hello World" + + +def test_battery_level_handles_none_empty_invalid_and_known_values(): + assert battery_level(None) == UnitOfBat.UNKNOWN + assert battery_level("") == UnitOfBat.UNKNOWN + assert battery_level("x") == UnitOfBat.UNKNOWN + + assert battery_level(0) == UnitOfBat.LOW + assert battery_level("0") == UnitOfBat.LOW + assert battery_level(1) == UnitOfBat.NORMAL + assert battery_level("1") == UnitOfBat.NORMAL + + # Unknown numeric values map to UNKNOWN + assert battery_level(2) == UnitOfBat.UNKNOWN + assert battery_level("2") == UnitOfBat.UNKNOWN + + +def test_battery_level_to_icon_maps_all_and_unknown(): + assert battery_level_to_icon(UnitOfBat.LOW) == "mdi:battery-low" + assert battery_level_to_icon(UnitOfBat.NORMAL) == "mdi:battery" + assert battery_level_to_icon(UnitOfBat.UNKNOWN) == "mdi:battery-unknown" + + +def test_temperature_conversions_round_trip(): + # Use a value that is exactly representable in binary-ish floats + f = 32.0 + c = fahrenheit_to_celsius(f) + assert c == 0.0 + assert celsius_to_fahrenheit(c) == 32.0 + + # General check (approx) + f2 = 77.0 + c2 = fahrenheit_to_celsius(f2) + assert c2 == pytest.approx(25.0) + assert celsius_to_fahrenheit(c2) == pytest.approx(77.0) + + +def test_wind_dir_to_text_returns_none_for_zero_and_valid_for_positive(): + assert wind_dir_to_text(0.0) is None + assert wind_dir_to_text(0) is None + + # For a non-zero degree it should return some enum value + out = wind_dir_to_text(10.0) + assert out is not None + + +def test_heat_index_returns_none_when_missing_temp_or_humidity(monkeypatch): + monkeypatch.setattr( + "custom_components.sws12500.utils._LOGGER.error", lambda *a, **k: None + ) + + assert heat_index({OUTSIDE_HUMIDITY: "50"}) is None + assert heat_index({OUTSIDE_TEMP: "80"}) is None + + assert heat_index({OUTSIDE_TEMP: "x", OUTSIDE_HUMIDITY: "50"}) is None + assert heat_index({OUTSIDE_TEMP: "80", OUTSIDE_HUMIDITY: "x"}) is None + + +def test_heat_index_simple_path_and_full_index_path(): + # Simple path: keep simple average under threshold. + # Using temp=70F, rh=40 keeps ((simple+temp)/2) under 80 typically. + simple = heat_index({OUTSIDE_TEMP: "70", OUTSIDE_HUMIDITY: "40"}) + assert simple is not None + + # Full index path: choose high temp/rh -> triggers full index. + full = heat_index({OUTSIDE_TEMP: "90", OUTSIDE_HUMIDITY: "85"}) + assert full is not None + + +def test_heat_index_low_humidity_adjustment_branch(): + # This targets: + # if rh < 13 and (80 <= temp <= 112): adjustment = ... + # + # Pick a temp/rh combo that: + # - triggers the full-index path: ((simple + temp) / 2) > 80 + # - satisfies low humidity adjustment bounds + out = heat_index({OUTSIDE_TEMP: "95", OUTSIDE_HUMIDITY: "10"}) + assert out is not None + + +def test_heat_index_convert_from_celsius_path(): + # If convert=True, temp is interpreted as Celsius and converted to Fahrenheit internally. + # Use 30C (~86F) and high humidity to trigger full index path. + out = heat_index({OUTSIDE_TEMP: "30", OUTSIDE_HUMIDITY: "85"}, convert=True) + assert out is not None + + +def test_chill_index_returns_none_when_missing_temp_or_wind(monkeypatch): + monkeypatch.setattr( + "custom_components.sws12500.utils._LOGGER.error", lambda *a, **k: None + ) + + assert chill_index({WIND_SPEED: "10"}) is None + assert chill_index({OUTSIDE_TEMP: "10"}) is None + assert chill_index({OUTSIDE_TEMP: "x", WIND_SPEED: "10"}) is None + assert chill_index({OUTSIDE_TEMP: "10", WIND_SPEED: "x"}) is None + + +def test_chill_index_returns_calculated_when_cold_and_windy(): + # temp in F, wind > 3 -> calculate when temp < 50 + out = chill_index({OUTSIDE_TEMP: "40", WIND_SPEED: "10"}) + assert out is not None + assert isinstance(out, float) + + +def test_chill_index_returns_temp_when_not_cold_or_not_windy(): + # Not cold -> hits the `else temp` branch + out1 = chill_index({OUTSIDE_TEMP: "60", WIND_SPEED: "10"}) + assert out1 == 60.0 + + # Not windy -> hits the `else temp` branch + out2 = chill_index({OUTSIDE_TEMP: "40", WIND_SPEED: "2"}) + assert out2 == 40.0 + + # Boundary: exactly 50F should also hit the `else temp` branch (since condition is temp < 50) + out3 = chill_index({OUTSIDE_TEMP: "50", WIND_SPEED: "10"}) + assert out3 == 50.0 + + # Boundary: exactly 3 mph should also hit the `else temp` branch (since condition is wind > 3) + out4 = chill_index({OUTSIDE_TEMP: "40", WIND_SPEED: "3"}) + assert out4 == 40.0 + + +def test_chill_index_convert_from_celsius_path(): + out = chill_index({OUTSIDE_TEMP: "5", WIND_SPEED: "10"}, convert=True) + assert out is not None diff --git a/tests/test_weather_sensor_entity.py b/tests/test_weather_sensor_entity.py new file mode 100644 index 0000000..8b8f7f0 --- /dev/null +++ b/tests/test_weather_sensor_entity.py @@ -0,0 +1,263 @@ +from __future__ import annotations + +from dataclasses import dataclass +from types import SimpleNamespace +from typing import Any, Callable +from unittest.mock import MagicMock + +import pytest + +from custom_components.sws12500.const import DOMAIN +from custom_components.sws12500.sensor import WeatherSensor + + +@dataclass(slots=True) +class _DescriptionStub: + """Minimal stand-in for WeatherSensorEntityDescription. + + WeatherSensor only relies on: + - key + - value_fn + - value_from_data_fn + """ + + key: str + value_fn: Callable[[Any], Any] | None = None + value_from_data_fn: Callable[[dict[str, Any]], Any] | None = None + + +class _CoordinatorStub: + """Minimal coordinator stub used by WeatherSensor.""" + + def __init__( + self, data: dict[str, Any] | None = None, *, config: Any | None = None + ): + self.data = data if data is not None else {} + self.config = config + + +def test_native_value_prefers_value_from_data_fn_success(): + desc = _DescriptionStub( + key="derived", + value_from_data_fn=lambda data: f"v:{data.get('x')}", + value_fn=lambda raw: f"raw:{raw}", # should not be used + ) + coordinator = _CoordinatorStub(data={"x": 123, "derived": "ignored"}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value == "v:123" + + +def test_native_value_value_from_data_fn_success_with_dev_logging_hits_computed_debug_branch( + monkeypatch, +): + """Cover the dev-log debug branch after successful value_from_data_fn computation.""" + debug = MagicMock() + monkeypatch.setattr("custom_components.sws12500.sensor._LOGGER.debug", debug) + + desc = _DescriptionStub( + key="derived", + value_from_data_fn=lambda data: data["x"] + 1, + ) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={"x": 41}, config=config) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value == 42 + + debug.assert_any_call( + "native_value computed via value_from_data_fn: key=%s -> %s", + "derived", + 42, + ) + + +def test_native_value_value_from_data_fn_exception_returns_none(): + def boom(_data: dict[str, Any]) -> Any: + raise RuntimeError("nope") + + desc = _DescriptionStub(key="derived", value_from_data_fn=boom) + coordinator = _CoordinatorStub(data={"derived": 1}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_missing_raw_returns_none(): + desc = _DescriptionStub(key="missing", value_fn=lambda raw: raw) + coordinator = _CoordinatorStub(data={}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_missing_raw_with_dev_logging_hits_debug_branch(monkeypatch): + monkeypatch.setattr( + "custom_components.sws12500.sensor._LOGGER.debug", lambda *a, **k: None + ) + + desc = _DescriptionStub(key="missing", value_fn=lambda raw: raw) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={}, config=config) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_raw_none_with_dev_logging_hits_debug_branch(monkeypatch): + # This targets the `raw is None` branch (not empty string) and ensures the debug line + # is actually executed (coverage sometimes won't attribute it when data is missing). + called = {"debug": 0} + + def _debug(*_a, **_k): + called["debug"] += 1 + + monkeypatch.setattr("custom_components.sws12500.sensor._LOGGER.debug", _debug) + + desc = _DescriptionStub(key="k", value_fn=lambda raw: raw) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + + # Ensure the key exists and explicitly maps to None so `data.get(key)` returns None + # in a deterministic way for coverage. + coordinator = _CoordinatorStub(data={"k": None}, config=config) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + assert called["debug"] >= 1 + + +def test_native_value_missing_raw_logs_specific_message(monkeypatch): + """Target the exact debug log line for missing raw values. + + This is meant to hit the specific `_LOGGER.debug("native_value missing raw: ...")` + statement to help achieve full `sensor.py` coverage. + """ + debug = MagicMock() + monkeypatch.setattr("custom_components.sws12500.sensor._LOGGER.debug", debug) + + desc = _DescriptionStub(key="k", value_fn=lambda raw: raw) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={"k": None}, config=config) + + entity = WeatherSensor(desc, coordinator) + assert entity.native_value is None + + debug.assert_any_call("native_value missing raw: key=%s raw=%s", "k", None) + + +def test_native_value_empty_string_raw_returns_none(): + desc = _DescriptionStub(key="k", value_fn=lambda raw: raw) + coordinator = _CoordinatorStub(data={"k": ""}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_empty_string_raw_with_dev_logging_hits_debug_branch(monkeypatch): + monkeypatch.setattr( + "custom_components.sws12500.sensor._LOGGER.debug", lambda *a, **k: None + ) + + desc = _DescriptionStub(key="k", value_fn=lambda raw: raw) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={"k": ""}, config=config) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_no_value_fn_returns_none(): + desc = _DescriptionStub(key="k", value_fn=None) + coordinator = _CoordinatorStub(data={"k": 10}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_no_value_fn_with_dev_logging_hits_debug_branch(monkeypatch): + monkeypatch.setattr( + "custom_components.sws12500.sensor._LOGGER.debug", lambda *a, **k: None + ) + + desc = _DescriptionStub(key="k", value_fn=None) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={"k": 10}, config=config) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_native_value_value_fn_success(): + desc = _DescriptionStub(key="k", value_fn=lambda raw: int(raw) + 1) + coordinator = _CoordinatorStub(data={"k": "41"}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value == 42 + + +def test_native_value_value_fn_success_with_dev_logging_hits_debug_branch(monkeypatch): + monkeypatch.setattr( + "custom_components.sws12500.sensor._LOGGER.debug", lambda *a, **k: None + ) + + desc = _DescriptionStub(key="k", value_fn=lambda raw: int(raw) + 1) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={"k": "41"}, config=config) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value == 42 + + +def test_native_value_value_fn_exception_returns_none(): + def boom(_raw: Any) -> Any: + raise ValueError("bad") + + desc = _DescriptionStub(key="k", value_fn=boom) + coordinator = _CoordinatorStub(data={"k": "x"}) + entity = WeatherSensor(desc, coordinator) + + assert entity.native_value is None + + +def test_suggested_entity_id_uses_sensor_domain_and_key(monkeypatch): + # `homeassistant.helpers.entity.generate_entity_id` requires either `current_ids` or `hass`. + # Our entity isn't attached to hass in this unit test, so patch it to a deterministic result. + monkeypatch.setattr( + "custom_components.sws12500.sensor.generate_entity_id", + lambda _fmt, key: f"sensor.{key}", + ) + + desc = _DescriptionStub(key="outside_temp", value_fn=lambda raw: raw) + coordinator = _CoordinatorStub(data={"outside_temp": 1}) + entity = WeatherSensor(desc, coordinator) + + suggested = entity.suggested_entity_id + assert suggested == "sensor.outside_temp" + + +def test_device_info_contains_expected_identifiers_and_domain(): + desc = _DescriptionStub(key="k", value_fn=lambda raw: raw) + coordinator = _CoordinatorStub(data={"k": 1}) + entity = WeatherSensor(desc, coordinator) + + info = entity.device_info + assert info is not None + # DeviceInfo is mapping-like; access defensively. + assert info.get("name") == "Weather Station SWS 12500" + assert info.get("manufacturer") == "Schizza" + assert info.get("model") == "Weather Station SWS 12500" + + identifiers = info.get("identifiers") + assert isinstance(identifiers, set) + assert (DOMAIN,) in identifiers + + +def test_dev_log_flag_reads_from_config_entry_options(): + # When coordinator has a config with options, WeatherSensor should read dev_debug_checkbox. + desc = _DescriptionStub(key="k", value_fn=lambda raw: raw) + config = SimpleNamespace(options={"dev_debug_checkbox": True}) + coordinator = _CoordinatorStub(data={"k": 1}, config=config) + entity = WeatherSensor(desc, coordinator) + + # We don't assert logs; we just ensure native_value still works with dev logging enabled. + assert entity.native_value == 1 diff --git a/tests/test_windy_func.py b/tests/test_windy_func.py new file mode 100644 index 0000000..460f2db --- /dev/null +++ b/tests/test_windy_func.py @@ -0,0 +1,6 @@ +# Test file for windy_func.py module + +def test_windy_func_functionality(): + # Add your test cases here + pass + diff --git a/tests/test_windy_push.py b/tests/test_windy_push.py new file mode 100644 index 0000000..d19a49e --- /dev/null +++ b/tests/test_windy_push.py @@ -0,0 +1,447 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from aiohttp.client_exceptions import ClientError +import pytest + +from custom_components.sws12500.const import ( + PURGE_DATA, + WINDY_ENABLED, + WINDY_INVALID_KEY, + WINDY_LOGGER_ENABLED, + WINDY_NOT_INSERTED, + WINDY_STATION_ID, + WINDY_STATION_PW, + WINDY_SUCCESS, + WINDY_UNEXPECTED, + WINDY_URL, +) +from custom_components.sws12500.windy_func import ( + WindyApiKeyError, + WindyNotInserted, + WindyPush, + WindySuccess, +) + + +@dataclass(slots=True) +class _FakeResponse: + text_value: str + + async def text(self) -> str: + return self.text_value + + async def __aenter__(self) -> "_FakeResponse": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + return None + + +class _FakeSession: + def __init__( + self, *, response: _FakeResponse | None = None, exc: Exception | None = None + ): + self._response = response + self._exc = exc + self.calls: list[dict[str, Any]] = [] + + def get( + self, + url: str, + *, + params: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, + ): + self.calls.append( + {"url": url, "params": dict(params or {}), "headers": dict(headers or {})} + ) + if self._exc is not None: + raise self._exc + assert self._response is not None + return self._response + + +@pytest.fixture +def hass(): + # Use HA provided fixture if available; otherwise a minimal stub works because we patch session getter. + return SimpleNamespace() + + +def _make_entry(**options: Any): + defaults = { + WINDY_LOGGER_ENABLED: False, + WINDY_ENABLED: True, + WINDY_STATION_ID: "station", + WINDY_STATION_PW: "token", + } + defaults.update(options) + return SimpleNamespace(options=defaults) + + +def test_verify_windy_response_notice_raises_not_inserted(hass): + wp = WindyPush(hass, _make_entry()) + with pytest.raises(WindyNotInserted): + wp.verify_windy_response("NOTICE: something") + + +def test_verify_windy_response_success_raises_success(hass): + wp = WindyPush(hass, _make_entry()) + with pytest.raises(WindySuccess): + wp.verify_windy_response("SUCCESS") + + +@pytest.mark.parametrize("msg", ["Invalid API key", "Unauthorized"]) +def test_verify_windy_response_api_key_errors_raise(msg, hass): + wp = WindyPush(hass, _make_entry()) + with pytest.raises(WindyApiKeyError): + wp.verify_windy_response(msg) + + +def test_covert_wslink_to_pws_maps_keys(hass): + wp = WindyPush(hass, _make_entry()) + data = { + "t1ws": "1", + "t1wgust": "2", + "t1wdir": "3", + "t1hum": "4", + "t1dew": "5", + "t1tem": "6", + "rbar": "7", + "t1rainhr": "8", + "t1uvi": "9", + "t1solrad": "10", + "other": "keep", + } + out = wp._covert_wslink_to_pws(data) + assert out["wind"] == "1" + assert out["gust"] == "2" + assert out["winddir"] == "3" + assert out["humidity"] == "4" + assert out["dewpoint"] == "5" + assert out["temp"] == "6" + assert out["mbar"] == "7" + assert out["precip"] == "8" + assert out["uv"] == "9" + assert out["solarradiation"] == "10" + assert out["other"] == "keep" + for k in ( + "t1ws", + "t1wgust", + "t1wdir", + "t1hum", + "t1dew", + "t1tem", + "rbar", + "t1rainhr", + "t1uvi", + "t1solrad", + ): + assert k not in out + + +@pytest.mark.asyncio +async def test_push_data_to_windy_respects_initial_next_update(monkeypatch, hass): + entry = _make_entry() + wp = WindyPush(hass, entry) + + # Ensure "next_update > now" is true + wp.next_update = datetime.now() + timedelta(minutes=10) + + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: _FakeSession(response=_FakeResponse("SUCCESS")), + ) + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is False + + +@pytest.mark.asyncio +async def test_push_data_to_windy_purges_data_and_sets_auth(monkeypatch, hass): + entry = _make_entry(**{WINDY_LOGGER_ENABLED: True}) + wp = WindyPush(hass, entry) + + # Force it to send now + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("SUCCESS")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + data = {k: "x" for k in PURGE_DATA} + data.update({"keep": "1"}) + ok = await wp.push_data_to_windy(data, wslink=False) + assert ok is True + + assert len(session.calls) == 1 + call = session.calls[0] + assert call["url"] == WINDY_URL + # Purged keys removed + for k in PURGE_DATA: + assert k not in call["params"] + # Added keys + assert call["params"]["id"] == entry.options[WINDY_STATION_ID] + assert call["params"]["time"] == "now" + assert ( + call["headers"]["Authorization"] == f"Bearer {entry.options[WINDY_STATION_PW]}" + ) + + +@pytest.mark.asyncio +async def test_push_data_to_windy_wslink_conversion_applied(monkeypatch, hass): + entry = _make_entry() + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("SUCCESS")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + ok = await wp.push_data_to_windy({"t1ws": "1", "t1tem": "2"}, wslink=True) + assert ok is True + params = session.calls[0]["params"] + assert "wind" in params and params["wind"] == "1" + assert "temp" in params and params["temp"] == "2" + assert "t1ws" not in params and "t1tem" not in params + + +@pytest.mark.asyncio +async def test_push_data_to_windy_missing_station_id_returns_false(monkeypatch, hass): + entry = _make_entry() + entry.options.pop(WINDY_STATION_ID) + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("SUCCESS")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is False + assert session.calls == [] + + +@pytest.mark.asyncio +async def test_push_data_to_windy_missing_station_pw_returns_false(monkeypatch, hass): + entry = _make_entry() + entry.options.pop(WINDY_STATION_PW) + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("SUCCESS")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is False + assert session.calls == [] + + +@pytest.mark.asyncio +async def test_push_data_to_windy_invalid_api_key_disables_windy(monkeypatch, hass): + entry = _make_entry() + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + # Response triggers WindyApiKeyError + session = _FakeSession(response=_FakeResponse("Invalid API key")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + update_options = AsyncMock(return_value=True) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.update_options", update_options + ) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + update_options.assert_awaited_once_with(hass, entry, WINDY_ENABLED, False) + + +@pytest.mark.asyncio +async def test_push_data_to_windy_invalid_api_key_update_options_failure_logs_debug( + monkeypatch, hass +): + entry = _make_entry() + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("Unauthorized")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + update_options = AsyncMock(return_value=False) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.update_options", update_options + ) + + dbg = MagicMock() + monkeypatch.setattr("custom_components.sws12500.windy_func._LOGGER.debug", dbg) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + update_options.assert_awaited_once_with(hass, entry, WINDY_ENABLED, False) + dbg.assert_called() + + +@pytest.mark.asyncio +async def test_push_data_to_windy_notice_logs_not_inserted(monkeypatch, hass): + entry = _make_entry(**{WINDY_LOGGER_ENABLED: True}) + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("NOTICE: no insert")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + err = MagicMock() + monkeypatch.setattr("custom_components.sws12500.windy_func._LOGGER.error", err) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + # It logs WINDY_NOT_INSERTED regardless of log setting + err.assert_called() + + +@pytest.mark.asyncio +async def test_push_data_to_windy_success_logs_info_when_logger_enabled( + monkeypatch, hass +): + entry = _make_entry(**{WINDY_LOGGER_ENABLED: True}) + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + session = _FakeSession(response=_FakeResponse("SUCCESS")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + info = MagicMock() + monkeypatch.setattr("custom_components.sws12500.windy_func._LOGGER.info", info) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + # It should log WINDY_SUCCESS (or at least call info) when logging is enabled + info.assert_called() + + +@pytest.mark.asyncio +async def test_push_data_to_windy_verify_no_raise_logs_debug_not_inserted_when_logger_enabled( + monkeypatch, hass +): + """Cover the `else:` branch when `verify_windy_response` does not raise. + + This is a defensive branch in `push_data_to_windy`: + try: verify(...) + except ...: + else: + if self.log: + _LOGGER.debug(WINDY_NOT_INSERTED) + """ + entry = _make_entry(**{WINDY_LOGGER_ENABLED: True}) + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + # Response text that does not contain any of the known markers (NOTICE/SUCCESS/Invalid/Unauthorized) + session = _FakeSession(response=_FakeResponse("OK")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + debug = MagicMock() + monkeypatch.setattr("custom_components.sws12500.windy_func._LOGGER.debug", debug) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + debug.assert_called() + + +@pytest.mark.asyncio +async def test_push_data_to_windy_client_error_increments_and_disables_after_three( + monkeypatch, hass +): + entry = _make_entry() + wp = WindyPush(hass, entry) + wp.next_update = datetime.now() - timedelta(seconds=1) + + update_options = AsyncMock(return_value=True) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.update_options", update_options + ) + + crit = MagicMock() + monkeypatch.setattr("custom_components.sws12500.windy_func._LOGGER.critical", crit) + + # Cause ClientError on session.get + session = _FakeSession(exc=ClientError("boom")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + # First 3 calls should not disable; 4th should + for i in range(4): + wp.next_update = datetime.now() - timedelta(seconds=1) + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + + assert wp.invalid_response_count == 4 + # update_options awaited once when count > 3 + update_options.assert_awaited() + args = update_options.await_args.args + assert args[2] == WINDY_ENABLED + assert args[3] is False + # It should log WINDY_UNEXPECTED at least once + assert any( + WINDY_UNEXPECTED in str(c.args[0]) for c in crit.call_args_list if c.args + ) + + +@pytest.mark.asyncio +async def test_push_data_to_windy_client_error_disable_failure_logs_debug( + monkeypatch, hass +): + entry = _make_entry() + wp = WindyPush(hass, entry) + wp.invalid_response_count = 3 # next error will push it over the threshold + wp.next_update = datetime.now() - timedelta(seconds=1) + + update_options = AsyncMock(return_value=False) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.update_options", update_options + ) + + dbg = MagicMock() + monkeypatch.setattr("custom_components.sws12500.windy_func._LOGGER.debug", dbg) + + session = _FakeSession(exc=ClientError("boom")) + monkeypatch.setattr( + "custom_components.sws12500.windy_func.async_get_clientsession", + lambda _h: session, + ) + + ok = await wp.push_data_to_windy({"a": "b"}) + assert ok is True + update_options.assert_awaited_once_with(hass, entry, WINDY_ENABLED, False) + dbg.assert_called()