CEZ_rele_box/custom_components/xt211_han/config_flow.py

283 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""Config flow for XT211 HAN integration."""
from __future__ import annotations
import asyncio
import logging
import socket
from ipaddress import IPv4Network
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import dhcp
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
from homeassistant.data_entry_flow import FlowResult
from .const import (
CONF_HAS_FVE,
CONF_PHASES,
CONF_RELAY_COUNT,
CONF_TARIFFS,
DEFAULT_NAME,
DEFAULT_PORT,
DOMAIN,
PHASES_1,
PHASES_3,
RELAYS_0,
RELAYS_4,
RELAYS_6,
TARIFFS_1,
TARIFFS_2,
TARIFFS_4,
)
_LOGGER = logging.getLogger(__name__)
USR_IOT_MAC_PREFIXES = ("d8b04c", "b4e62d")
MANUAL_CHOICE = "__manual__"
STEP_CONNECTION_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
STEP_METER_SCHEMA = vol.Schema(
{
vol.Required(CONF_PHASES, default=PHASES_3): vol.In(
{PHASES_1: "Jednofázový", PHASES_3: "Třífázový"}
),
vol.Required(CONF_HAS_FVE, default=False): bool,
vol.Required(CONF_TARIFFS, default=TARIFFS_2): vol.In(
{
TARIFFS_1: "Jeden tarif (pouze T1)",
TARIFFS_2: "Dva tarify (T1 + T2)",
TARIFFS_4: "Čtyři tarify (T1 T4)",
}
),
vol.Required(CONF_RELAY_COUNT, default=RELAYS_4): vol.In(
{
RELAYS_0: "Žádné relé",
RELAYS_4: "WM-RelayBox (R1 R4)",
RELAYS_6: "WM-RelayBox rozšířený (R1 R6)",
}
),
}
)
async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str | None:
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
writer.close()
await writer.wait_closed()
return None
except asyncio.TimeoutError:
return "cannot_connect"
except OSError:
return "cannot_connect"
except Exception: # pragma: no cover - defensive
return "unknown"
async def _scan_network(port: int, timeout: float = 1.0) -> list[str]:
local_ip = "192.168.1.1"
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(0)
sock.connect(("8.8.8.8", 80))
local_ip = sock.getsockname()[0]
except Exception:
try:
local_ip = socket.gethostbyname(socket.gethostname())
except Exception:
pass
if local_ip.startswith("127.") or local_ip == "0.0.0.0":
local_ip = "192.168.1.1"
try:
network = IPv4Network(f"{local_ip}/24", strict=False)
except ValueError:
network = IPv4Network("192.168.1.0/24", strict=False)
found: list[str] = []
async def _probe(ip: str) -> None:
try:
_, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=timeout)
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
found.append(ip)
except Exception:
pass
hosts = [str(host) for host in network.hosts()]
for index in range(0, len(hosts), 50):
await asyncio.gather(*(_probe(ip) for ip in hosts[index:index + 50]))
found.sort()
_LOGGER.debug("XT211 scan found %d host(s) on port %d: %s", len(found), port, found)
return found
class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self) -> None:
self._connection_data: dict[str, Any] = {}
self._discovered_host: str | None = None
self._discovered_port: int = DEFAULT_PORT
self._scan_results: list[str] = []
async def async_step_dhcp(self, discovery_info: dhcp.DhcpServiceInfo) -> FlowResult:
mac = discovery_info.macaddress.replace(":", "").lower()
if not any(mac.startswith(prefix) for prefix in USR_IOT_MAC_PREFIXES):
return self.async_abort(reason="not_supported")
ip = discovery_info.ip
await self.async_set_unique_id(f"{ip}:{DEFAULT_PORT}")
self._abort_if_unique_id_configured(updates={CONF_HOST: ip})
self._discovered_host = ip
self._discovered_port = DEFAULT_PORT
_LOGGER.info("XT211 HAN: DHCP discovered USR IOT device at %s (MAC %s)", ip, mac)
return await self.async_step_dhcp_confirm()
async def async_step_dhcp_confirm(self, user_input: dict[str, Any] | None = None) -> FlowResult:
if user_input is not None:
error = await _test_connection(self._discovered_host, self._discovered_port)
if error:
return self.async_show_form(
step_id="dhcp_confirm",
errors={"base": error},
description_placeholders={
"host": self._discovered_host,
"port": str(self._discovered_port),
},
)
self._connection_data = {
CONF_HOST: self._discovered_host,
CONF_PORT: self._discovered_port,
CONF_NAME: DEFAULT_NAME,
}
return await self.async_step_meter()
return self.async_show_form(
step_id="dhcp_confirm",
description_placeholders={
"host": self._discovered_host,
"port": str(self._discovered_port),
},
)
async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult:
if user_input is not None:
return await (self.async_step_scan() if user_input.get("method") == "scan" else self.async_step_manual())
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required("method", default="scan"): vol.In(
{
"scan": "🔍 Automaticky vyhledat v síti",
"manual": "✏️ Zadat IP adresu ručně",
}
)
}
),
)
async def async_step_scan(self, user_input: dict[str, Any] | None = None) -> FlowResult:
if user_input is not None:
host = user_input[CONF_HOST]
if host == MANUAL_CHOICE:
return await self.async_step_manual()
port = user_input.get(CONF_PORT, DEFAULT_PORT)
name = user_input.get(CONF_NAME, DEFAULT_NAME)
await self.async_set_unique_id(f"{host}:{port}")
self._abort_if_unique_id_configured()
error = await _test_connection(host, port)
if error:
return self.async_show_form(
step_id="scan",
data_schema=self._scan_schema(port, include_choices=not self._scan_results == []),
errors={"base": error},
)
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
return await self.async_step_meter()
self._scan_results = await _scan_network(DEFAULT_PORT)
if not self._scan_results:
return self.async_show_form(
step_id="scan",
data_schema=self._scan_schema(DEFAULT_PORT, include_choices=False),
errors={"base": "no_devices_found"},
)
return self.async_show_form(
step_id="scan",
data_schema=self._scan_schema(DEFAULT_PORT, include_choices=True),
)
def _scan_schema(self, port: int, include_choices: bool) -> vol.Schema:
if include_choices:
choices = {ip: f"{ip}:{port}" for ip in self._scan_results}
choices[MANUAL_CHOICE] = "✏️ Zadat IP adresu ručně"
return vol.Schema(
{
vol.Required(CONF_HOST): vol.In(choices),
vol.Optional(CONF_PORT, default=port): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
return vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=port): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
async def async_step_manual(self, user_input: dict[str, Any] | None = None) -> FlowResult:
errors: dict[str, str] = {}
if user_input is not None:
host = user_input[CONF_HOST]
port = user_input[CONF_PORT]
name = user_input.get(CONF_NAME, DEFAULT_NAME)
await self.async_set_unique_id(f"{host}:{port}")
self._abort_if_unique_id_configured()
error = await _test_connection(host, port)
if error:
errors["base"] = error
else:
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
return await self.async_step_meter()
return self.async_show_form(step_id="manual", data_schema=STEP_CONNECTION_SCHEMA, errors=errors)
async def async_step_meter(self, user_input: dict[str, Any] | None = None) -> FlowResult:
if user_input is not None:
data = {**self._connection_data, **user_input}
name = data.get(CONF_NAME, DEFAULT_NAME)
host = data[CONF_HOST]
port = data[CONF_PORT]
return self.async_create_entry(title=f"{name} ({host}:{port})", data=data)
return self.async_show_form(step_id="meter", data_schema=STEP_METER_SCHEMA)