Validate hass data with py_typecheck.checked

Replace manual isinstance checks and casts with py_typecheck.checked()
to validate hass and entry data and return early on errors. Simplify
add_new_sensors by unwrapping values, renaming vars, and passing the
coordinator to WeatherSensor
ecowitt_support
SchiZzA 2026-03-02 22:08:01 +01:00
parent b3aae77132
commit 6a4eed2ff9
No known key found for this signature in database
1 changed files with 27 additions and 28 deletions

View File

@ -20,7 +20,7 @@ from functools import cached_property
import logging
from typing import Any, cast
from py_typecheck import checked_or
from py_typecheck import checked, checked_or
from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
@ -92,15 +92,17 @@ async def async_setup_entry(
so the webhook handler can add newly discovered entities dynamically without
reloading the config entry.
"""
hass_data_any = hass.data.setdefault(DOMAIN, {})
hass_data = cast("dict[str, Any]", hass_data_any)
entry_data_any = hass_data.get(config_entry.entry_id)
if not isinstance(entry_data_any, dict):
# Created by the integration setup, but keep this defensive for safety.
entry_data_any = {}
hass_data[config_entry.entry_id] = entry_data_any
entry_data = cast("dict[str, Any]", entry_data_any)
if (hass_data := checked(hass.data.setdefault(DOMAIN, {}), dict[str, Any])) is None:
return
# we have to check if entry_data are present
# It is created by integration setup, so it should be presnet
if (
entry_data := checked(hass_data.get(config_entry.entry_id), dict[str, Any])
) is None:
# This should not happen in normal operation.
return
coordinator = entry_data.get(ENTRY_COORDINATOR)
if coordinator is None:
@ -151,34 +153,31 @@ def add_new_sensors(
finished setting up yet (e.g. callback/description map missing).
- Unknown payload keys are ignored (only keys with an entity description are added).
"""
hass_data_any = hass.data.get(DOMAIN)
if not isinstance(hass_data_any, dict):
return
hass_data = cast("dict[str, Any]", hass_data_any)
entry_data_any = hass_data.get(config_entry.entry_id)
if not isinstance(entry_data_any, dict):
return
entry_data = cast("dict[str, Any]", entry_data_any)
add_entities_any = entry_data.get(ENTRY_ADD_ENTITIES)
descriptions_any = entry_data.get(ENTRY_DESCRIPTIONS)
coordinator_any = entry_data.get(ENTRY_COORDINATOR)
if add_entities_any is None or descriptions_any is None or coordinator_any is None:
if (hass_data := checked(hass.data.get(DOMAIN), dict[str, Any])) is None:
return
add_entities_fn = cast("_AddEntitiesFn", add_entities_any)
descriptions_map = cast(
"dict[str, WeatherSensorEntityDescription]", descriptions_any
)
if (
entry_data := checked(hass_data.get(config_entry.entry_id), dict[str, Any])
) is None:
return
add_entities = entry_data.get(ENTRY_ADD_ENTITIES)
descriptions = entry_data.get(ENTRY_DESCRIPTIONS)
coordinator = entry_data.get(ENTRY_COORDINATOR)
if add_entities is None or descriptions is None or coordinator is None:
return
add_entities_fn = cast("_AddEntitiesFn", add_entities)
descriptions_map = cast("dict[str, WeatherSensorEntityDescription]", descriptions)
new_entities: list[SensorEntity] = []
for key in keys:
desc = descriptions_map.get(key)
if desc is None:
continue
new_entities.append(WeatherSensor(desc, coordinator_any))
new_entities.append(WeatherSensor(desc, coordinator))
if new_entities:
add_entities_fn(new_entities)