Adds unit migration functionality to options flow

Implements a user interface to migrate units for rain sensors including migration of historic data via statistics.
This provides the user with the ability to correct rain units, if they have been set incorrectly.
Includes UI to select sensor and units, as well as trigger migration.
pull/65/head
schizza 2025-04-24 17:56:47 +02:00
parent af87fd0719
commit 99d25bfd56
1 changed files with 162 additions and 26 deletions

View File

@ -1,13 +1,15 @@
"""Config flow for Sencor SWS 12500 Weather Station integration.""" """Config flow for Sencor SWS 12500 Weather Station integration."""
import logging
from typing import Any from typing import Any
import voluptuous as vol import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, OptionsFlow from homeassistant.config_entries import ConfigFlow, OptionsFlow
from homeassistant.const import UnitOfPrecipitationDepth, UnitOfVolumetricFlux
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from .utils import long_term_units_in_statistics_meta, migrate_data import homeassistant.helpers.entity_registry as er
from .const import ( from .const import (
API_ID, API_ID,
@ -15,13 +17,18 @@ from .const import (
DEV_DBG, DEV_DBG,
DOMAIN, DOMAIN,
INVALID_CREDENTIALS, INVALID_CREDENTIALS,
SENSORS_TO_LOAD, MIG_FROM,
MIG_TO,
SENSOR_TO_MIGRATE, SENSOR_TO_MIGRATE,
SENSORS_TO_LOAD,
WINDY_API_KEY, WINDY_API_KEY,
WINDY_ENABLED, WINDY_ENABLED,
WINDY_LOGGER_ENABLED, WINDY_LOGGER_ENABLED,
WSLINK, WSLINK,
) )
from .utils import long_term_units_in_statistics_meta, migrate_data
_LOGGER = logging.getLogger(__name__)
class CannotConnect(HomeAssistantError): class CannotConnect(HomeAssistantError):
@ -41,16 +48,23 @@ class ConfigOptionsFlowHandler(OptionsFlow):
self.windy_data: dict[str, Any] = {} self.windy_data: dict[str, Any] = {}
self.windy_data_schema = {} self.windy_data_schema = {}
self.user_data: dict[str, str] = {} self.user_data: dict[str, Any] = {}
self.user_data_schema = {} self.user_data_schema = {}
self.sensors: dict[str, Any] = {} self.sensors: dict[str, Any] = {}
self.migrate_schema = {} self.migrate_schema = {}
self.migrate_sensor_select = {}
self.migrate_unit_selection = {}
self.count = 0
self.selected_sensor = ""
self.unit_values = [unit.value for unit in UnitOfVolumetricFlux]
self.unit_values.extend([unit.value for unit in UnitOfPrecipitationDepth])
@property @property
def config_entry(self): def config_entry(self):
return self.hass.config_entries.async_get_entry(self.handler) return self.hass.config_entries.async_get_entry(self.handler)
def _get_entry_data(self): async def _get_entry_data(self):
"""Get entry data.""" """Get entry data."""
self.user_data: dict[str, Any] = { self.user_data: dict[str, Any] = {
@ -61,10 +75,10 @@ class ConfigOptionsFlowHandler(OptionsFlow):
} }
self.user_data_schema = { self.user_data_schema = {
vol.Required(API_ID, default=self.user_data[API_ID] or ""): str, vol.Required(API_ID, default=self.user_data.get(API_ID, "")): str,
vol.Required(API_KEY, default=self.user_data[API_KEY] or ""): str, vol.Required(API_KEY, default=self.user_data.get(API_KEY, "")): str,
vol.Optional(WSLINK, default=self.user_data[WSLINK]): bool or False, vol.Optional(WSLINK, default=self.user_data.get(WSLINK, False)): bool,
vol.Optional(DEV_DBG, default=self.user_data[DEV_DBG]): bool or False, vol.Optional(DEV_DBG, default=self.user_data.get(DEV_DBG, False)): bool,
} }
self.sensors: dict[str, Any] = { self.sensors: dict[str, Any] = {
@ -83,7 +97,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
self.windy_data_schema = { self.windy_data_schema = {
vol.Optional( vol.Optional(
WINDY_API_KEY, default=self.windy_data[WINDY_API_KEY] or "" WINDY_API_KEY, default=self.windy_data.get(WINDY_API_KEY, "")
): str, ): str,
vol.Optional(WINDY_ENABLED, default=self.windy_data[WINDY_ENABLED]): bool vol.Optional(WINDY_ENABLED, default=self.windy_data[WINDY_ENABLED]): bool
or False, or False,
@ -93,12 +107,34 @@ class ConfigOptionsFlowHandler(OptionsFlow):
): bool or False, ): bool or False,
} }
self.migrate_schema = { self.migrate_sensor_select = {
vol.Required(SENSOR_TO_MIGRATE): vol.In( vol.Required(SENSOR_TO_MIGRATE): vol.In(
long_term_units_in_statistics_meta() or {} await self.load_sensors_to_migrate() or {}
), ),
}
self.migrate_unit_selection = {
vol.Required(MIG_FROM): vol.In(self.unit_values),
vol.Required(MIG_TO): vol.In(self.unit_values),
vol.Optional("trigger_action", default=False): bool, vol.Optional("trigger_action", default=False): bool,
} }
# "mm/d", "mm/h", "mm", "in/d", "in/h", "in"
async def load_sensors_to_migrate(self) -> dict[str, Any]:
"""Load sensors to migrate."""
sensor_statistics = await long_term_units_in_statistics_meta(self.hass)
entity_registry = er.async_get(self.hass)
sensors = entity_registry.entities.get_entries_for_config_entry_id(
self.config_entry.entry_id
)
return {
sensor.entity_id: f"{sensor.name or sensor.original_name} (current settings: {sensor.unit_of_measurement}, longterm stats unit: {sensor_statistics.get(sensor.entity_id)})"
for sensor in sensors
if sensor.unique_id in {"rain", "daily_rain"}
}
async def async_step_init(self, user_input=None): async def async_step_init(self, user_input=None):
"""Manage the options - show menu first.""" """Manage the options - show menu first."""
@ -110,7 +146,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
"""Manage basic options - credentials.""" """Manage basic options - credentials."""
errors = {} errors = {}
self._get_entry_data() await self._get_entry_data()
if user_input is None: if user_input is None:
return self.async_show_form( return self.async_show_form(
@ -147,7 +183,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
"""Manage windy options.""" """Manage windy options."""
errors = {} errors = {}
self._get_entry_data() await self._get_entry_data()
if user_input is None: if user_input is None:
return self.async_show_form( return self.async_show_form(
@ -160,7 +196,7 @@ class ConfigOptionsFlowHandler(OptionsFlow):
errors[WINDY_API_KEY] = "windy_key_required" errors[WINDY_API_KEY] = "windy_key_required"
return self.async_show_form( return self.async_show_form(
step_id="windy", step_id="windy",
data_schema=self.windy_data_schema, data_schema=vol.Schema(self.windy_data_schema),
errors=errors, errors=errors,
) )
@ -175,15 +211,17 @@ class ConfigOptionsFlowHandler(OptionsFlow):
async def async_step_migration(self, user_input=None): async def async_step_migration(self, user_input=None):
"""Migrate sensors.""" """Migrate sensors."""
# hj
errors = {} errors = {}
self._get_entry_data() data_schema = vol.Schema(self.migrate_sensor_select)
data_schema.schema.update()
await self._get_entry_data()
if user_input is None: if user_input is None:
return self.async_show_form( return self.async_show_form(
step_id="migration", step_id="migration",
data_schema=vol.Schema(self.migrate_schema), data_schema=vol.Schema(self.migrate_sensor_select),
errors=errors, errors=errors,
description_placeholders={ description_placeholders={
"migration_status": "-", "migration_status": "-",
@ -191,18 +229,116 @@ class ConfigOptionsFlowHandler(OptionsFlow):
}, },
) )
if user_input.get("trigger_action"): self.selected_sensor = user_input.get(SENSOR_TO_MIGRATE)
# Akce se vykoná po zaškrtnutí
count = await self.hass.async_add_executor_job( return await self.async_step_migration_units()
migrate_data, user_input.get(SENSOR_TO_MIGRATE)
) async def async_step_migration_units(self, user_input=None):
"""Migrate unit step."""
registry = er.async_get(self.hass)
sensor_entry = registry.async_get(self.selected_sensor)
sensor_stats = await long_term_units_in_statistics_meta(self.hass)
default_unit = sensor_entry.unit_of_measurement if sensor_entry else None
if default_unit not in self.unit_values:
default_unit = self.unit_values[0]
data_schema = vol.Schema({
vol.Required(MIG_FROM, default=default_unit): vol.In(self.unit_values),
vol.Required(MIG_TO): vol.In(self.unit_values),
vol.Optional("trigger_action", default=False): bool,
})
if user_input is None:
return self.async_show_form( return self.async_show_form(
step_id="migration", step_id="migration_units",
data_schema=vol.Schema(self.migrate_schema), data_schema=data_schema,
errors={},
description_placeholders={
"migration_sensor": sensor_entry.original_name,
"migration_stats": sensor_stats.get(self.selected_sensor),
},
)
if user_input.get("trigger_action"):
self.count = await migrate_data(
self.hass,
self.selected_sensor,
user_input.get(MIG_FROM),
user_input.get(MIG_TO),
)
registry.async_update_entity(self.selected_sensor,
unit_of_measurement=user_input.get(MIG_TO),
)
state = self.hass.states.get(self.selected_sensor)
if state:
_LOGGER.info("State attributes before update: %s", state.attributes)
attributes = dict(state.attributes)
attributes["unit_of_measurement"] = user_input.get(MIG_TO)
self.hass.states.async_set(self.selected_sensor, state.state, attributes)
_LOGGER.info("State attributes after update: %s", attributes)
options = {**self.config_entry.options, "reload_sensor": self.selected_sensor}
self.hass.config_entries.async_update_entry(self.config_entry, options=options)
await self.hass.config_entries.async_reload(self.config_entry.entry_id)
await self.hass.async_block_till_done()
_LOGGER.info("Migration complete for sensor %s: %s row updated, new measurement unit: %s, ",
self.selected_sensor,
self.count,
user_input.get(MIG_TO),
)
await self._get_entry_data()
sensor_entry = er.async_get(self.hass).async_get(self.selected_sensor)
sensor_stat = await self.load_sensors_to_migrate()
return self.async_show_form(
step_id="migration_complete",
data_schema=vol.Schema({}),
errors={},
description_placeholders={
"migration_sensor": sensor_entry.unit_of_measurement,
"migration_stats": sensor_stat.get(self.selected_sensor),
"migration_count": self.count,
},
)
# retain windy data
user_input.update(self.windy_data)
# retain user_data
user_input.update(self.user_data)
# retain senors
user_input.update(self.sensors)
return self.async_create_entry(title=DOMAIN, data=user_input)
async def async_step_migration_complete(self, user_input=None):
"""Migration complete."""
errors = {}
await self._get_entry_data()
sensor_entry = er.async_get(self.hass).async_get(self.selected_sensor)
sensor_stat = await self.load_sensors_to_migrate()
if user_input is None:
return self.async_show_form(
step_id="migration_complete",
data_schema=vol.Schema({}),
errors=errors, errors=errors,
description_placeholders={ description_placeholders={
"migration_status": user_input.get(SENSOR_TO_MIGRATE), "migration_sensor": sensor_entry.unit_of_measurement,
"migration_count": count, "migration_stats": sensor_stat.get(self.selected_sensor),
"migration_count": self.count,
}, },
) )