diff --git a/custom_components/xt211_han/__init__.py b/custom_components/xt211_han/__init__.py index 5874f46..f9747fb 100644 --- a/custom_components/xt211_han/__init__.py +++ b/custom_components/xt211_han/__init__.py @@ -1,28 +1,22 @@ -"""XT211 HAN integration for Home Assistant. - -Reads DLMS/COSEM PUSH data from a Sagemcom XT211 smart meter via a -RS485-to-Ethernet adapter (e.g. PUSR USR-DR134) over TCP. -No ESP32 or dedicated hardware needed beyond the adapter. -""" +"""XT211 HAN integration for Home Assistant.""" from __future__ import annotations import logging from homeassistant.config_entries import ConfigEntry -from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME, Platform +from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT, Platform from homeassistant.core import HomeAssistant -from .const import DOMAIN, DEFAULT_NAME +from .const import DEFAULT_NAME, DOMAIN from .coordinator import XT211Coordinator _LOGGER = logging.getLogger(__name__) -PLATFORMS = [Platform.SENSOR] +PLATFORMS = [Platform.SENSOR, Platform.BINARY_SENSOR] async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - """Set up XT211 HAN from a config entry.""" hass.data.setdefault(DOMAIN, {}) coordinator = XT211Coordinator( @@ -31,13 +25,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: port=entry.data[CONF_PORT], name=entry.data.get(CONF_NAME, DEFAULT_NAME), ) - hass.data[DOMAIN][entry.entry_id] = coordinator - # Start the background TCP listener await coordinator.async_setup() - - # Set up sensor platform await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) _LOGGER.info( @@ -49,14 +39,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - """Unload a config entry.""" - coordinator: XT211Coordinator = hass.data[DOMAIN].get(entry.entry_id) + coordinator: XT211Coordinator | None = hass.data[DOMAIN].get(entry.entry_id) if coordinator: await coordinator.async_shutdown() unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) - if unload_ok: hass.data[DOMAIN].pop(entry.entry_id, None) - return unload_ok diff --git a/custom_components/xt211_han/binary_sensor.py b/custom_components/xt211_han/binary_sensor.py new file mode 100644 index 0000000..10ad423 --- /dev/null +++ b/custom_components/xt211_han/binary_sensor.py @@ -0,0 +1,81 @@ +"""Binary sensor platform for XT211 HAN integration.""" + +from __future__ import annotations + +from homeassistant.components.binary_sensor import BinarySensorDeviceClass, BinarySensorEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import DOMAIN +from .coordinator import XT211Coordinator +from .sensor import BINARY_OBIS, build_enabled_obis, _device_info +from .dlms_parser import OBIS_DESCRIPTIONS + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id] + enabled_obis = build_enabled_obis(entry) + + entities = [ + XT211BinarySensorEntity(coordinator, entry, obis, meta) + for obis, meta in OBIS_DESCRIPTIONS.items() + if obis in enabled_obis and obis in BINARY_OBIS + ] + async_add_entities(entities) + + registered_obis = {entity._obis for entity in entities} + + @callback + def _on_update() -> None: + if not coordinator.data: + return + new_entities = [] + for obis, data in coordinator.data.items(): + if obis in registered_obis or obis not in enabled_obis or obis not in BINARY_OBIS: + continue + registered_obis.add(obis) + new_entities.append(XT211BinarySensorEntity(coordinator, entry, obis, data)) + if new_entities: + async_add_entities(new_entities) + + coordinator.async_add_listener(_on_update) + + +class XT211BinarySensorEntity(CoordinatorEntity[XT211Coordinator], BinarySensorEntity): + _attr_has_entity_name = True + _attr_device_class = BinarySensorDeviceClass.POWER + + def __init__(self, coordinator: XT211Coordinator, entry: ConfigEntry, obis: str, meta: dict) -> None: + super().__init__(coordinator) + self._entry = entry + self._obis = obis + self._attr_unique_id = f"{entry.entry_id}_{obis}" + self._attr_name = meta.get("name", obis) + + @property + def device_info(self) -> DeviceInfo: + return _device_info(self._entry) + + @property + def is_on(self) -> bool | None: + obj = (self.coordinator.data or {}).get(self._obis) + if obj is None: + return None + value = obj.get("value") + if isinstance(value, bool): + return value + try: + return int(value) != 0 + except (TypeError, ValueError): + return None + + @property + def available(self) -> bool: + return self.coordinator.data is not None diff --git a/custom_components/xt211_han/config_flow.py b/custom_components/xt211_han/config_flow.py index 22f557c..6c22ca2 100644 --- a/custom_components/xt211_han/config_flow.py +++ b/custom_components/xt211_han/config_flow.py @@ -1,50 +1,42 @@ -"""Config flow for XT211 HAN integration. - -Discovery order: - 1. DHCP discovery – automatic, triggered by HA when USR-DR134 appears on network - 2. Network scan – user clicks "Search network" in the UI - 3. Manual entry – user types IP + port manually (always available as fallback) -""" +"""Config flow for XT211 HAN integration.""" from __future__ import annotations import asyncio import logging import socket -import struct -from ipaddress import IPv4Network, IPv4Address +from ipaddress import IPv4Network from typing import Any import voluptuous as vol from homeassistant import config_entries from homeassistant.components import dhcp -from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME +from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT from homeassistant.data_entry_flow import FlowResult -import homeassistant.helpers.config_validation as cv from .const import ( - DOMAIN, - DEFAULT_PORT, - DEFAULT_NAME, - CONF_PHASES, CONF_HAS_FVE, - CONF_TARIFFS, + CONF_PHASES, CONF_RELAY_COUNT, + CONF_TARIFFS, + DEFAULT_NAME, + DEFAULT_PORT, + DOMAIN, PHASES_1, PHASES_3, - TARIFFS_1, - TARIFFS_2, - TARIFFS_4, RELAYS_0, RELAYS_4, RELAYS_6, + TARIFFS_1, + TARIFFS_2, + TARIFFS_4, ) _LOGGER = logging.getLogger(__name__) -# Known MAC prefixes for USR IOT devices (USR-DR134) USR_IOT_MAC_PREFIXES = ("d8b04c", "b4e62d") +MANUAL_CHOICE = "__manual__" STEP_CONNECTION_SCHEMA = vol.Schema( { @@ -78,17 +70,9 @@ STEP_METER_SCHEMA = vol.Schema( ) -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str | None: - """Try TCP connection. Returns error key or None on success.""" try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(host, port), - timeout=timeout, - ) + reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout) writer.close() await writer.wait_closed() return None @@ -96,83 +80,55 @@ async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str | return "cannot_connect" except OSError: return "cannot_connect" - except Exception: + except Exception: # pragma: no cover - defensive return "unknown" async def _scan_network(port: int, timeout: float = 1.0) -> list[str]: - """ - Scan the local network for open TCP port. - Returns list of IP addresses that responded. - """ - # Get real local IP by connecting to a public address (no data sent) local_ip = "192.168.1.1" try: - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.settimeout(0) - s.connect(("8.8.8.8", 80)) - local_ip = s.getsockname()[0] + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.settimeout(0) + sock.connect(("8.8.8.8", 80)) + local_ip = sock.getsockname()[0] except Exception: try: local_ip = socket.gethostbyname(socket.gethostname()) except Exception: pass - _LOGGER.debug("XT211 scan: local IP detected as %s", local_ip) - - # Fallback if we still got loopback if local_ip.startswith("127.") or local_ip == "0.0.0.0": local_ip = "192.168.1.1" - _LOGGER.warning("XT211 scan: loopback detected, falling back to %s", local_ip) try: network = IPv4Network(f"{local_ip}/24", strict=False) except ValueError: network = IPv4Network("192.168.1.0/24", strict=False) - _LOGGER.debug("XT211 scan: scanning %s on port %d", network, port) - found: list[str] = [] async def _probe(ip: str) -> None: try: - _, writer = await asyncio.wait_for( - asyncio.open_connection(ip, port), - timeout=timeout, - ) + _, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=timeout) writer.close() try: await writer.wait_closed() except Exception: pass found.append(ip) - _LOGGER.debug("XT211 scan: found device at %s:%d", ip, port) except Exception: pass - # Probe all hosts in /24 concurrently - hosts = [str(h) for h in network.hosts()] - # Split into batches to avoid overwhelming the network stack - batch_size = 50 - for i in range(0, len(hosts), batch_size): - batch = hosts[i:i + batch_size] - await asyncio.gather(*[_probe(ip) for ip in batch]) + hosts = [str(host) for host in network.hosts()] + for index in range(0, len(hosts), 50): + await asyncio.gather(*(_probe(ip) for ip in hosts[index:index + 50])) - return sorted(found) + found.sort() + _LOGGER.debug("XT211 scan found %d host(s) on port %d: %s", len(found), port, found) + return found -# --------------------------------------------------------------------------- -# Config Flow -# --------------------------------------------------------------------------- - class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): - """ - Three-path config flow: - - DHCP discovery (automatic) - - Network scan (semi-automatic) - - Manual entry (always available) - """ - VERSION = 1 def __init__(self) -> None: @@ -181,31 +137,21 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): self._discovered_port: int = DEFAULT_PORT self._scan_results: list[str] = [] - # ------------------------------------------------------------------ - # Path 1 – DHCP discovery (triggered automatically by HA) - # ------------------------------------------------------------------ - async def async_step_dhcp(self, discovery_info: dhcp.DhcpServiceInfo) -> FlowResult: - """Handle DHCP discovery of a USR IOT device.""" mac = discovery_info.macaddress.replace(":", "").lower() if not any(mac.startswith(prefix) for prefix in USR_IOT_MAC_PREFIXES): return self.async_abort(reason="not_supported") ip = discovery_info.ip - _LOGGER.info("XT211 HAN: DHCP discovered USR IOT device at %s (MAC %s)", ip, mac) - - # Check not already configured await self.async_set_unique_id(f"{ip}:{DEFAULT_PORT}") self._abort_if_unique_id_configured(updates={CONF_HOST: ip}) self._discovered_host = ip self._discovered_port = DEFAULT_PORT + _LOGGER.info("XT211 HAN: DHCP discovered USR IOT device at %s (MAC %s)", ip, mac) return await self.async_step_dhcp_confirm() - async def async_step_dhcp_confirm( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: - """Ask user to confirm the DHCP-discovered device.""" + async def async_step_dhcp_confirm(self, user_input: dict[str, Any] | None = None) -> FlowResult: if user_input is not None: error = await _test_connection(self._discovered_host, self._discovered_port) if error: @@ -232,19 +178,9 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): }, ) - # ------------------------------------------------------------------ - # Path 2 + 3 – User-initiated: scan or manual - # ------------------------------------------------------------------ - - async def async_step_user( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: - """First screen: choose between scan or manual entry.""" + async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult: if user_input is not None: - if user_input.get("method") == "scan": - return await self.async_step_scan() - else: - return await self.async_step_manual() + return await (self.async_step_scan() if user_input.get("method") == "scan" else self.async_step_manual()) return self.async_show_form( step_id="user", @@ -260,16 +196,12 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): ), ) - # ------------------------------------------------------------------ - # Path 2 – Network scan - # ------------------------------------------------------------------ - - async def async_step_scan( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: - """Scan the local network for devices with the configured port open.""" + async def async_step_scan(self, user_input: dict[str, Any] | None = None) -> FlowResult: if user_input is not None: host = user_input[CONF_HOST] + if host == MANUAL_CHOICE: + return await self.async_step_manual() + port = user_input.get(CONF_PORT, DEFAULT_PORT) name = user_input.get(CONF_NAME, DEFAULT_NAME) @@ -280,49 +212,38 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): if error: return self.async_show_form( step_id="scan", - data_schema=self._scan_schema(port), + data_schema=self._scan_schema(port, include_choices=not self._scan_results == []), errors={"base": error}, ) - self._connection_data = { - CONF_HOST: host, - CONF_PORT: port, - CONF_NAME: name, - } + self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name} return await self.async_step_meter() - # Run the scan - _LOGGER.debug("XT211 HAN: scanning network for port %d", DEFAULT_PORT) self._scan_results = await _scan_network(DEFAULT_PORT) - _LOGGER.debug("XT211 HAN: scan found %d device(s): %s", len(self._scan_results), self._scan_results) - if not self._scan_results: - # Nothing found – fall through to manual with a warning return self.async_show_form( step_id="scan", - data_schema=self._scan_schema(DEFAULT_PORT), + data_schema=self._scan_schema(DEFAULT_PORT, include_choices=False), errors={"base": "no_devices_found"}, ) - # Build selector: found IPs + option to type manually - choices = {ip: f"{ip}:{DEFAULT_PORT}" for ip in self._scan_results} - choices["manual"] = "✏️ Zadat jinak ručně" - return self.async_show_form( step_id="scan", - data_schema=self._scan_schema(DEFAULT_PORT, choices), + data_schema=self._scan_schema(DEFAULT_PORT, include_choices=True), ) - def _scan_schema( - self, port: int, choices: dict | None = None - ) -> vol.Schema: - if choices: + def _scan_schema(self, port: int, include_choices: bool) -> vol.Schema: + if include_choices: + choices = {ip: f"{ip}:{port}" for ip in self._scan_results} + choices[MANUAL_CHOICE] = "✏️ Zadat IP adresu ručně" return vol.Schema( { vol.Required(CONF_HOST): vol.In(choices), + vol.Optional(CONF_PORT, default=port): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str, } ) + return vol.Schema( { vol.Required(CONF_HOST): str, @@ -331,16 +252,8 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): } ) - # ------------------------------------------------------------------ - # Path 3 – Manual entry - # ------------------------------------------------------------------ - - async def async_step_manual( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: - """Manual IP + port entry.""" + async def async_step_manual(self, user_input: dict[str, Any] | None = None) -> FlowResult: errors: dict[str, str] = {} - if user_input is not None: host = user_input[CONF_HOST] port = user_input[CONF_PORT] @@ -353,38 +266,17 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): if error: errors["base"] = error else: - self._connection_data = { - CONF_HOST: host, - CONF_PORT: port, - CONF_NAME: name, - } + self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name} return await self.async_step_meter() - return self.async_show_form( - step_id="manual", - data_schema=STEP_CONNECTION_SCHEMA, - errors=errors, - ) + return self.async_show_form(step_id="manual", data_schema=STEP_CONNECTION_SCHEMA, errors=errors) - # ------------------------------------------------------------------ - # Step: meter configuration (shared by all paths) - # ------------------------------------------------------------------ - - async def async_step_meter( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: - """Meter type, FVE, tariffs, relays.""" + async def async_step_meter(self, user_input: dict[str, Any] | None = None) -> FlowResult: if user_input is not None: data = {**self._connection_data, **user_input} name = data.get(CONF_NAME, DEFAULT_NAME) host = data[CONF_HOST] port = data[CONF_PORT] - return self.async_create_entry( - title=f"{name} ({host}:{port})", - data=data, - ) + return self.async_create_entry(title=f"{name} ({host}:{port})", data=data) - return self.async_show_form( - step_id="meter", - data_schema=STEP_METER_SCHEMA, - ) + return self.async_show_form(step_id="meter", data_schema=STEP_METER_SCHEMA) diff --git a/custom_components/xt211_han/coordinator.py b/custom_components/xt211_han/coordinator.py index 879c1fe..d1cf3d8 100644 --- a/custom_components/xt211_han/coordinator.py +++ b/custom_components/xt211_han/coordinator.py @@ -1,51 +1,31 @@ -""" -Coordinator for XT211 HAN integration. - -Opens a persistent TCP connection to the RS485-to-Ethernet adapter -(e.g. USR-DR134) and receives DLMS/COSEM PUSH frames every 60 seconds. -""" +"""Coordinator for XT211 HAN integration.""" from __future__ import annotations import asyncio import logging -from datetime import timedelta from typing import Any from homeassistant.core import HomeAssistant -from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator -from .dlms_parser import DLMSParser, DLMSObject, OBIS_DESCRIPTIONS -from .const import DOMAIN +from .dlms_parser import DLMSObject, DLMSParser, OBIS_DESCRIPTIONS _LOGGER = logging.getLogger(__name__) -# We expect a frame every 60 s; allow some margin before timing out -PUSH_TIMEOUT = 90 # seconds -RECONNECT_DELAY = 10 # seconds after connection loss +PUSH_TIMEOUT = 90 +RECONNECT_DELAY = 10 class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): - """ - Coordinator that maintains a persistent TCP connection to the - RS485-to-Ethernet adapter and decodes incoming DLMS PUSH frames. + """Persistent TCP listener for XT211 DLMS push frames.""" - Data is published to HA listeners whenever a new frame arrives, - not on a fixed poll interval (update_interval=None triggers manual). - """ - - def __init__( - self, - hass: HomeAssistant, - host: str, - port: int, - name: str, - ) -> None: + def __init__(self, hass: HomeAssistant, host: str, port: int, name: str) -> None: super().__init__( hass, _LOGGER, name=f"XT211 HAN ({host}:{port})", - update_interval=None, # push-driven, not poll-driven + update_interval=None, ) self.host = host self.port = port @@ -55,17 +35,13 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): self._writer: asyncio.StreamWriter | None = None self._listen_task: asyncio.Task | None = None self._connected = False - - # ------------------------------------------------------------------ - # Public helpers - # ------------------------------------------------------------------ + self._frames_received = 0 @property def connected(self) -> bool: return self._connected async def async_setup(self) -> None: - """Start the background listener task.""" if self._listen_task is None or self._listen_task.done(): self._listen_task = self.hass.async_create_background_task( self._listen_loop(), @@ -73,7 +49,6 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): ) async def async_shutdown(self) -> None: - """Stop the listener and close the connection.""" if self._listen_task: self._listen_task.cancel() try: @@ -82,22 +57,10 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): pass await self._disconnect() - # ------------------------------------------------------------------ - # DataUpdateCoordinator required override - # ------------------------------------------------------------------ - async def _async_update_data(self) -> dict[str, Any]: - """Called by HA when entities want a refresh. Returns current data.""" - if self.data is None: - return {} - return self.data - - # ------------------------------------------------------------------ - # TCP listener loop - # ------------------------------------------------------------------ + return self.data or {} async def _listen_loop(self) -> None: - """Main loop: connect → receive → reconnect on error.""" while True: try: await self._connect() @@ -106,11 +69,14 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): _LOGGER.info("XT211 listener task cancelled") raise except Exception as exc: + self._connected = False _LOGGER.warning( "XT211 connection error (%s:%d): %s – retrying in %ds", - self.host, self.port, exc, RECONNECT_DELAY, + self.host, + self.port, + exc, + RECONNECT_DELAY, ) - self._connected = False finally: await self._disconnect() @@ -122,8 +88,8 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): asyncio.open_connection(self.host, self.port), timeout=10, ) + self._parser = DLMSParser() self._connected = True - self._parser = DLMSParser() # reset parser state on new connection _LOGGER.info("Connected to XT211 adapter at %s:%d", self.host, self.port) async def _disconnect(self) -> None: @@ -138,59 +104,68 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): self._reader = None async def _receive_loop(self) -> None: - """Read bytes from the TCP stream and feed them to the parser.""" assert self._reader is not None while True: try: - chunk = await asyncio.wait_for( - self._reader.read(4096), - timeout=PUSH_TIMEOUT, - ) - except asyncio.TimeoutError: - _LOGGER.warning( - "No data from XT211 for %d s – reconnecting", PUSH_TIMEOUT - ) - raise ConnectionError("Push timeout") + chunk = await asyncio.wait_for(self._reader.read(4096), timeout=PUSH_TIMEOUT) + except asyncio.TimeoutError as exc: + _LOGGER.warning("No data from XT211 for %d s – reconnecting", PUSH_TIMEOUT) + raise ConnectionError("Push timeout") from exc if not chunk: _LOGGER.warning("XT211 adapter closed connection") raise ConnectionError("Remote closed") + _LOGGER.debug("XT211 RX %d bytes: %s", len(chunk), chunk.hex()) self._parser.feed(chunk) - # Process all complete frames in the buffer while True: result = self._parser.get_frame() if result is None: break + + self._frames_received += 1 if result.success: + _LOGGER.debug( + "XT211 frame #%d parsed OK: %d object(s)", + self._frames_received, + len(result.objects), + ) await self._process_frame(result.objects) else: _LOGGER.debug( - "Frame parse error: %s (raw: %s)", - result.error, result.raw_hex[:80], + "XT211 frame #%d parse error: %s (raw: %s)", + self._frames_received, + result.error, + result.raw_hex[:120], ) async def _process_frame(self, objects: list[DLMSObject]) -> None: - """Update coordinator data from a decoded DLMS frame.""" if not objects: _LOGGER.debug("Received empty DLMS frame") return current = dict(self.data or {}) + changed: list[str] = [] for obj in objects: meta = OBIS_DESCRIPTIONS.get(obj.obis, {}) - current[obj.obis] = { + new_value = { "value": obj.value, "unit": obj.unit or meta.get("unit", ""), "name": meta.get("name", obj.obis), "class": meta.get("class", "sensor"), } - _LOGGER.debug( - "OBIS %s = %s %s", obj.obis, obj.value, obj.unit - ) + if current.get(obj.obis) != new_value: + changed.append(obj.obis) + current[obj.obis] = new_value + _LOGGER.debug("XT211 OBIS %s = %r %s", obj.obis, obj.value, new_value["unit"]) self.async_set_updated_data(current) - _LOGGER.debug("Updated %d DLMS objects from XT211 frame", len(objects)) + _LOGGER.debug( + "Coordinator updated with %d object(s), %d changed: %s", + len(objects), + len(changed), + ", ".join(changed[:10]), + ) diff --git a/custom_components/xt211_han/dlms_parser.py b/custom_components/xt211_han/dlms_parser.py index 8f314bc..8517195 100644 --- a/custom_components/xt211_han/dlms_parser.py +++ b/custom_components/xt211_han/dlms_parser.py @@ -1,184 +1,90 @@ -""" -DLMS/COSEM PUSH mode parser for Sagemcom XT211 smart meter. - -The XT211 sends unsolicited HDLC-framed DLMS/COSEM data every 60 seconds -over RS485 (9600 baud, 8N1). This module decodes those frames. - -Frame structure (HDLC): - 7E - HDLC flag - A0 xx - Frame type + length - 00 02 00 01 ... - Destination / source addresses - 13 - Control byte (UI frame) - xx xx - HCS (header checksum) - [LLC header] - E6 E7 00 - [APDU] - DLMS application data (tag 0F = Data-notification) - xx xx - FCS (frame checksum) - 7E - HDLC flag - -OBIS codes supported (from ČEZ Distribuce spec): - 0-0:96.1.1.255 - Serial number (Device ID) - 0-0:96.3.10.255 - Disconnector status - 0-0:96.14.0.255 - Current tariff - 1-0:1.7.0.255 - Instant active power consumption (W) - 1-0:2.7.0.255 - Instant active power delivery (W) - 1-0:21.7.0.255 - Instant power L1 (W) - 1-0:41.7.0.255 - Instant power L2 (W) - 1-0:61.7.0.255 - Instant power L3 (W) - 1-0:1.8.0.255 - Active energy consumed (Wh) - 1-0:1.8.1.255 - Active energy T1 (Wh) - 1-0:1.8.2.255 - Active energy T2 (Wh) - 1-0:2.8.0.255 - Active energy delivered (Wh) - 0-1:96.3.10.255 - Relay R1 status - 0-2:96.3.10.255 - Relay R2 status - 0-3:96.3.10.255 - Relay R3 status - 0-4:96.3.10.255 - Relay R4 status -""" +"""DLMS/COSEM PUSH parser for Sagemcom XT211 smart meter.""" from __future__ import annotations import logging import struct -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any _LOGGER = logging.getLogger(__name__) HDLC_FLAG = 0x7E -# DLMS data types DLMS_TYPE_NULL = 0x00 +DLMS_TYPE_ARRAY = 0x01 +DLMS_TYPE_STRUCTURE = 0x02 DLMS_TYPE_BOOL = 0x03 +DLMS_TYPE_INT32 = 0x05 +DLMS_TYPE_UINT32 = 0x06 +DLMS_TYPE_OCTET_STRING = 0x09 +DLMS_TYPE_VISIBLE_STRING = 0x0A DLMS_TYPE_INT8 = 0x0F DLMS_TYPE_INT16 = 0x10 DLMS_TYPE_UINT8 = 0x11 DLMS_TYPE_UINT16 = 0x12 -DLMS_TYPE_INT32 = 0x05 -DLMS_TYPE_UINT32 = 0x06 +DLMS_TYPE_COMPACT_ARRAY = 0x13 DLMS_TYPE_INT64 = 0x14 DLMS_TYPE_UINT64 = 0x15 -DLMS_TYPE_FLOAT32 = 0x16 -DLMS_TYPE_FLOAT64 = 0x17 -DLMS_TYPE_OCTET_STRING = 0x09 -DLMS_TYPE_VISIBLE_STRING = 0x0A -DLMS_TYPE_ARRAY = 0x01 -DLMS_TYPE_STRUCTURE = 0x02 -DLMS_TYPE_COMPACT_ARRAY = 0x13 +DLMS_TYPE_ENUM = 0x16 +DLMS_TYPE_FLOAT32 = 0x17 +DLMS_TYPE_FLOAT64 = 0x18 -# SI unit multipliers (DLMS scaler) -# Scaler is a signed int8 representing 10^scaler -def apply_scaler(value: int | float, scaler: int) -> float: - """Apply DLMS scaler (10^scaler) to a raw value.""" - return float(value) * (10 ** scaler) + +class NeedMoreData(Exception): + """Raised when the parser needs more bytes to finish a frame.""" @dataclass class DLMSObject: - """A single decoded DLMS COSEM object.""" - obis: str # e.g. "1-0:1.8.0.255" - value: Any # decoded Python value - unit: str = "" # e.g. "W", "Wh", "" - scaler: int = 0 # raw scaler from frame + obis: str + value: Any + unit: str = "" + scaler: int = 0 @dataclass class ParseResult: - """Result of parsing one HDLC frame.""" success: bool - objects: list[DLMSObject] = None + objects: list[DLMSObject] = field(default_factory=list) raw_hex: str = "" error: str = "" - def __post_init__(self): - if self.objects is None: - self.objects = [] - class DLMSParser: - """ - Stateful DLMS/COSEM PUSH mode parser for XT211. - - Usage: - parser = DLMSParser() - parser.feed(bytes_from_tcp) - while (result := parser.get_frame()): - process(result) - """ - - # DLMS unit codes → human readable strings - UNIT_MAP = { - 1: "a", 2: "mo", 3: "wk", 4: "d", 5: "h", - 6: "min", 7: "s", 8: "°", 9: "°C", 10: "currency", - 11: "m", 12: "m/s", 13: "m³", 14: "m³", 15: "m³/h", - 16: "m³/h", 17: "m³/d", 18: "m³/d", 19: "l", 20: "kg", - 21: "N", 22: "Nm", 23: "Pa", 24: "bar", 25: "J", - 26: "J/h", 27: "W", 28: "VA", 29: "var", 30: "Wh", - 31: "VAh", 32: "varh", 33: "A", 34: "C", 35: "V", - 36: "V/m", 37: "F", 38: "Ω", 39: "Ωm²/m",40: "Wb", - 41: "T", 42: "A/m", 43: "H", 44: "Hz", 45: "1/Wh", - 46: "1/varh",47: "1/VAh",48: "V²h", 49: "A²h", 50: "kg/s", - 51: "S", 52: "K", 53: "1/(V²h)",54: "1/(A²h)", - 255: "", 0: "", - } + """Stateful parser for raw DLMS APDUs and HDLC-wrapped frames.""" def __init__(self) -> None: self._buffer = bytearray() def feed(self, data: bytes) -> None: - """Add raw bytes from TCP socket to the internal buffer.""" self._buffer.extend(data) def get_frame(self) -> ParseResult | None: - """ - Try to extract and parse one complete frame from the buffer. - - Supports two formats: - 1. HDLC-wrapped: 7E A0 xx ... 7E - 2. Raw DLMS APDU: 0F [4B invoke-id] [optional datetime] [body] - (USR-DR134 strips the HDLC wrapper and sends raw APDU) - """ - buf = self._buffer - - if not buf: + """Return one parsed frame from the internal buffer, if available.""" + if not self._buffer: return None - # ---------------------------------------------------------------- - # Format 2: Raw DLMS APDU starting with 0x0F (Data-Notification) - # USR-DR134 sends this directly without HDLC framing - # ---------------------------------------------------------------- - if buf[0] == 0x0F: - # We need at least 5 bytes (tag + 4B invoke-id) - if len(buf) < 5: - return None + if self._buffer[0] == HDLC_FLAG: + return self._get_hdlc_frame() - # Heuristic: find the end of this APDU - # The USR-DR134 sends one complete APDU per TCP segment - # We consume everything in the buffer as one frame - raw = bytes(buf) - self._buffer.clear() - - raw_hex = raw.hex() - _LOGGER.debug("Raw DLMS APDU (%d bytes): %s", len(raw), raw_hex[:80]) - - try: - result = self._parse_apdu(raw) - result.raw_hex = raw_hex - return result - except Exception as exc: - _LOGGER.exception("Error parsing raw DLMS APDU") - return ParseResult(success=False, raw_hex=raw_hex, error=str(exc)) - - # ---------------------------------------------------------------- - # Format 1: HDLC-wrapped frame starting with 0x7E - # ---------------------------------------------------------------- - start = buf.find(HDLC_FLAG) + start = self._find_apdu_start(self._buffer) if start == -1: + _LOGGER.debug("Discarding %d bytes without known frame start", len(self._buffer)) self._buffer.clear() return None - if start > 0: - _LOGGER.debug("Discarding %d bytes before HDLC flag", start) - del self._buffer[:start] - buf = self._buffer + if start > 0: + _LOGGER.debug("Discarding %d leading byte(s) before APDU", start) + del self._buffer[:start] + + if self._buffer and self._buffer[0] == 0x0F: + return self._get_raw_apdu_frame() + + return None + + def _get_hdlc_frame(self) -> ParseResult | None: + buf = self._buffer if len(buf) < 3: return None @@ -188,10 +94,8 @@ class DLMSParser: return None raw = bytes(buf[:total]) - del self._buffer[:total] - + del buf[:total] raw_hex = raw.hex() - _LOGGER.debug("HDLC frame (%d bytes): %s", len(raw), raw_hex[:80]) if raw[0] != HDLC_FLAG or raw[-1] != HDLC_FLAG: return ParseResult(success=False, raw_hex=raw_hex, error="Missing HDLC flags") @@ -200,312 +104,282 @@ class DLMSParser: result = self._parse_hdlc(raw) result.raw_hex = raw_hex return result - except Exception as exc: + except NeedMoreData: + # Should not happen for HDLC because total length is known. + return None + except Exception as exc: # pragma: no cover - defensive logging _LOGGER.exception("Error parsing HDLC frame") return ParseResult(success=False, raw_hex=raw_hex, error=str(exc)) - # ------------------------------------------------------------------ - # Internal parsing methods - # ------------------------------------------------------------------ + def _get_raw_apdu_frame(self) -> ParseResult | None: + buf = self._buffer + try: + result, consumed = self._parse_apdu_with_length(bytes(buf)) + except NeedMoreData: + return None + except Exception as exc: # pragma: no cover - defensive logging + raw_hex = bytes(buf).hex() + _LOGGER.exception("Error parsing raw DLMS APDU") + del buf[:] + return ParseResult(success=False, raw_hex=raw_hex, error=str(exc)) + + raw = bytes(buf[:consumed]) + del buf[:consumed] + result.raw_hex = raw.hex() + return result def _parse_hdlc(self, raw: bytes) -> ParseResult: - """Parse full HDLC frame and extract DLMS objects.""" - pos = 1 # skip opening flag + pos = 1 + pos += 2 # frame format + _, pos = self._read_hdlc_address(raw, pos) + _, pos = self._read_hdlc_address(raw, pos) + pos += 1 # control + pos += 2 # HCS - # Frame format byte (should be A0 or A8) - # bits 11-0 = length - _frame_type = raw[pos] & 0xF8 - frame_len = ((raw[pos] & 0x07) << 8) | raw[pos + 1] - pos += 2 - - # Destination address (variable length, LSB=1 means last byte) - dest_addr, pos = self._read_hdlc_address(raw, pos) - # Source address - src_addr, pos = self._read_hdlc_address(raw, pos) - - # Control byte - control = raw[pos]; pos += 1 - - # HCS (2 bytes header checksum) - skip - pos += 2 - - # From here: LLC + APDU - # LLC header: E6 E7 00 (or E6 E6 00 for request) if pos + 3 > len(raw) - 3: - return ParseResult(success=False, error="Frame too short for LLC") + raise ValueError("Frame too short for LLC") - llc = raw[pos:pos+3]; pos += 3 - _LOGGER.debug("LLC: %s dest=%s src=%s", llc.hex(), dest_addr, src_addr) - - # APDU starts here, ends 3 bytes before end (FCS + closing flag) + pos += 3 # LLC header apdu = raw[pos:-3] - _LOGGER.debug("APDU (%d bytes): %s", len(apdu), apdu.hex()) - - return self._parse_apdu(apdu) + result, _ = self._parse_apdu_with_length(apdu) + return result def _read_hdlc_address(self, data: bytes, pos: int) -> tuple[int, int]: - """Read HDLC variable-length address. Returns (address_value, new_pos).""" addr = 0 shift = 0 - while pos < len(data): - byte = data[pos]; pos += 1 + while True: + if pos >= len(data): + raise NeedMoreData + byte = data[pos] + pos += 1 addr |= (byte >> 1) << shift shift += 7 - if byte & 0x01: # last byte of address - break - return addr, pos + if byte & 0x01: + return addr, pos - def _parse_apdu(self, apdu: bytes) -> ParseResult: - """ - Parse DLMS APDU (Data-Notification = tag 0x0F). - - XT211 frame structure: - 0F - Data-Notification tag - [4B invoke-id] - MSB set = data frame, clear = push-setup (skip) - 00 - datetime absent - 02 02 - outer structure(2) - 16 [push_type] - elem[0]: enum (push type, ignore) - 01 [N] - elem[1]: array(N captured objects) - [N x object] - see _parse_xt211_object - - Each captured object (11-byte header + type-tagged value): - 02 02 00 - structure prefix (3 bytes, ignored) - [class_id] - 1 byte DLMS class ID - [A B C D E F] - 6-byte raw OBIS (NO type tag!) - [attr_idx] - 1 byte attribute index (ignored) - [type][value bytes] - standard DLMS type-tagged value - """ + def _parse_apdu_with_length(self, apdu: bytes) -> tuple[ParseResult, int]: if not apdu: - return ParseResult(success=False, objects=[], error="Empty APDU") - + raise NeedMoreData if apdu[0] != 0x0F: - return ParseResult( - success=False, objects=[], - error=f"Unexpected APDU tag 0x{apdu[0]:02X} (expected 0x0F)" - ) - + raise ValueError(f"Unexpected APDU tag 0x{apdu[0]:02X}") if len(apdu) < 6: - return ParseResult(success=False, objects=[], error="APDU too short") + raise NeedMoreData pos = 1 - invoke_id = struct.unpack_from(">I", apdu, pos)[0]; pos += 4 - _LOGGER.debug("Invoke ID: 0x%08X", invoke_id) + invoke_id = struct.unpack_from(">I", apdu, pos)[0] + pos += 4 + _LOGGER.debug("XT211 invoke_id=0x%08X", invoke_id) - # Skip push-setup frames (invoke_id MSB = 0) - #if not (invoke_id & 0x80000000): - # _LOGGER.debug("Push-setup frame, skipping") - # return ParseResult(success=True, objects=[]) + if pos >= len(apdu): + raise NeedMoreData - # Datetime: 0x09 = octet-string, 0x00 = absent - if pos < len(apdu) and apdu[pos] == 0x09: + if apdu[pos] == DLMS_TYPE_OCTET_STRING: pos += 1 - dt_len = apdu[pos]; pos += 1 + dt_len - elif pos < len(apdu) and apdu[pos] == 0x00: + dt_len, pos = self._decode_length(apdu, pos) + self._require(apdu, pos, dt_len) + pos += dt_len + elif apdu[pos] == DLMS_TYPE_NULL: pos += 1 - # Outer structure(2): skip tag + count - if pos + 2 > len(apdu) or apdu[pos] != 0x02: - return ParseResult(success=True, objects=[]) - pos += 2 # 02 02 + self._require(apdu, pos, 2) + if apdu[pos] != DLMS_TYPE_STRUCTURE: + return ParseResult(success=True, objects=[]), pos + structure_count = apdu[pos + 1] + pos += 2 + if structure_count < 2: + return ParseResult(success=True, objects=[]), pos - # Element[0]: enum = push type (skip 2 bytes: 16 XX) - if pos < len(apdu) and apdu[pos] == 0x16: + if pos >= len(apdu): + raise NeedMoreData + if apdu[pos] == DLMS_TYPE_ENUM: + self._require(apdu, pos, 2) pos += 2 + else: + _, pos = self._decode_value(apdu, pos) - # Element[1]: array of captured objects - if pos >= len(apdu) or apdu[pos] != 0x01: - return ParseResult(success=True, objects=[]) + if pos >= len(apdu): + raise NeedMoreData + if apdu[pos] != DLMS_TYPE_ARRAY: + return ParseResult(success=True, objects=[]), pos pos += 1 array_count, pos = self._decode_length(apdu, pos) - _LOGGER.debug("Array count: %d objects", array_count) + objects: list[DLMSObject] = [] + for _ in range(array_count): + obj, pos = self._parse_xt211_object(apdu, pos) + if obj is not None: + objects.append(obj) - objects = [] - for i in range(array_count): - if pos + 11 > len(apdu): - break - try: - obj, pos = self._parse_xt211_object(apdu, pos) - if obj: - objects.append(obj) - _LOGGER.debug("OBIS %s = %s %s", obj.obis, obj.value, obj.unit) - except Exception as exc: - _LOGGER.debug("Error parsing object %d at pos %d: %s", i, pos, exc) - break - - return ParseResult(success=True, objects=objects) + return ParseResult(success=True, objects=objects), pos def _parse_xt211_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]: - """ - Parse one captured object from XT211 push notification. - - Format per object: - 02 02 00 - 3-byte structure prefix (ignored) - [class_id] - 1 byte - [A B C D E F] - 6-byte raw OBIS (no type tag) - [attr_idx] - 1 byte (ignored) - [type][value] - DLMS type-tagged value - """ - if data[pos] != 0x02: - _LOGGER.debug("Expected 0x02 at pos %d, got 0x%02X", pos, data[pos]) - return None, pos + 1 - - pos += 3 # skip: 02 02 00 - - # Class ID - pos += 1 # class_id (not needed for value extraction) - - # Raw OBIS (6 bytes, no type tag) - if pos + 6 > len(data): - return None, pos - obis_raw = data[pos:pos+6]; pos += 6 - obis_str = self._format_obis(obis_raw) - - # Attribute index (skip) + self._require(data, pos, 1) + if data[pos] != DLMS_TYPE_STRUCTURE: + raise ValueError(f"Expected object structure at {pos}, got 0x{data[pos]:02X}") pos += 1 - # Type-tagged value - value, pos = self._decode_value(data, pos) + count, pos = self._decode_length(data, pos) + if count < 1: + raise ValueError(f"Unexpected object element count {count}") - # Convert bytes to string for text objects - if isinstance(value, (bytes, bytearray)): - try: - value = value.decode("ascii", errors="replace").strip("\x00") - except Exception: - value = value.hex() + # XT211 measurement objects use a raw descriptor layout: + # 02 02 00 [class_id_hi class_id_lo] [6B OBIS] [attr_idx] [typed value] + if pos < len(data) and data[pos] == 0x00: + if pos + 10 > len(data): + raise NeedMoreData + class_id = int.from_bytes(data[pos:pos + 2], "big") + pos += 2 + obis_raw = bytes(data[pos:pos + 6]) + pos += 6 + _attr_idx = data[pos] + pos += 1 + value, pos = self._decode_value(data, pos) - meta = OBIS_DESCRIPTIONS.get(obis_str, {}) - return DLMSObject( - obis=obis_str, - value=value, - unit=meta.get("unit", ""), - scaler=0, - ), pos + if isinstance(value, (bytes, bytearray)): + try: + value = bytes(value).decode("ascii", errors="replace").strip("\x00") + except Exception: + value = bytes(value).hex() + obis = self._format_obis(obis_raw) + meta = OBIS_DESCRIPTIONS.get(obis, {}) + _LOGGER.debug( + "Parsed XT211 object class_id=%s obis=%s value=%r unit=%s", + class_id, + obis, + value, + meta.get("unit", ""), + ) + return DLMSObject( + obis=obis, + value=value, + unit=meta.get("unit", ""), + scaler=0, + ), pos + + # Short housekeeping frames use simple typed structures without OBIS. + # Consume them cleanly and ignore them. + last_value: Any = None + for _ in range(count): + last_value, pos = self._decode_value(data, pos) + _LOGGER.debug("Ignoring non-measurement structure value=%r", last_value) + return None, pos def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]: - """Recursively decode a DLMS typed value. Returns (value, new_pos).""" - if pos >= len(data): - return None, pos - - dtype = data[pos]; pos += 1 + self._require(data, pos, 1) + dtype = data[pos] + pos += 1 if dtype == DLMS_TYPE_NULL: return None, pos - elif dtype == DLMS_TYPE_BOOL: + if dtype == DLMS_TYPE_BOOL: + self._require(data, pos, 1) return bool(data[pos]), pos + 1 - elif dtype == DLMS_TYPE_INT8: + if dtype == DLMS_TYPE_INT8: + self._require(data, pos, 1) return struct.unpack_from(">b", data, pos)[0], pos + 1 - elif dtype == DLMS_TYPE_UINT8: + if dtype == DLMS_TYPE_UINT8 or dtype == DLMS_TYPE_ENUM: + self._require(data, pos, 1) return data[pos], pos + 1 - elif dtype == 0x16: # enum = uint8 - return data[pos], pos + 1 - elif dtype == DLMS_TYPE_INT16: + if dtype == DLMS_TYPE_INT16: + self._require(data, pos, 2) return struct.unpack_from(">h", data, pos)[0], pos + 2 - elif dtype == DLMS_TYPE_UINT16: + if dtype == DLMS_TYPE_UINT16: + self._require(data, pos, 2) return struct.unpack_from(">H", data, pos)[0], pos + 2 - elif dtype == DLMS_TYPE_INT32: + if dtype == DLMS_TYPE_INT32: + self._require(data, pos, 4) return struct.unpack_from(">i", data, pos)[0], pos + 4 - elif dtype == DLMS_TYPE_UINT32: + if dtype == DLMS_TYPE_UINT32: + self._require(data, pos, 4) return struct.unpack_from(">I", data, pos)[0], pos + 4 - elif dtype == DLMS_TYPE_INT64: + if dtype == DLMS_TYPE_INT64: + self._require(data, pos, 8) return struct.unpack_from(">q", data, pos)[0], pos + 8 - elif dtype == DLMS_TYPE_UINT64: + if dtype == DLMS_TYPE_UINT64: + self._require(data, pos, 8) return struct.unpack_from(">Q", data, pos)[0], pos + 8 - elif dtype == DLMS_TYPE_FLOAT32: + if dtype == DLMS_TYPE_FLOAT32: + self._require(data, pos, 4) return struct.unpack_from(">f", data, pos)[0], pos + 4 - elif dtype == DLMS_TYPE_FLOAT64: + if dtype == DLMS_TYPE_FLOAT64: + self._require(data, pos, 8) return struct.unpack_from(">d", data, pos)[0], pos + 8 - elif dtype in (DLMS_TYPE_OCTET_STRING, DLMS_TYPE_VISIBLE_STRING): + if dtype in (DLMS_TYPE_OCTET_STRING, DLMS_TYPE_VISIBLE_STRING): length, pos = self._decode_length(data, pos) - raw_bytes = data[pos:pos+length] + self._require(data, pos, length) + raw = data[pos:pos + length] pos += length if dtype == DLMS_TYPE_VISIBLE_STRING: - try: - return raw_bytes.decode("ascii", errors="replace"), pos - except Exception: - return raw_bytes.hex(), pos - return raw_bytes, pos - elif dtype in (DLMS_TYPE_ARRAY, DLMS_TYPE_STRUCTURE, DLMS_TYPE_COMPACT_ARRAY): + return raw.decode("ascii", errors="replace"), pos + return bytes(raw), pos + if dtype in (DLMS_TYPE_ARRAY, DLMS_TYPE_STRUCTURE, DLMS_TYPE_COMPACT_ARRAY): count, pos = self._decode_length(data, pos) - items = [] + items: list[Any] = [] for _ in range(count): - val, pos = self._decode_value(data, pos) - items.append(val) + item, pos = self._decode_value(data, pos) + items.append(item) return items, pos - else: - _LOGGER.debug("Unknown DLMS type 0x%02X at pos %d, skipping", dtype, pos) - return None, pos + + raise ValueError(f"Unknown DLMS type 0x{dtype:02X} at pos {pos - 1}") def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]: - """Decode BER-style length field.""" - first = data[pos]; pos += 1 + self._require(data, pos, 1) + first = data[pos] + pos += 1 if first < 0x80: return first, pos num_bytes = first & 0x7F + self._require(data, pos, num_bytes) length = 0 for _ in range(num_bytes): - length = (length << 8) | data[pos]; pos += 1 + length = (length << 8) | data[pos] + pos += 1 return length, pos - - """Convert 6 raw bytes to OBIS string notation A-B:C.D.E.F""" + def _require(self, data: bytes, pos: int, count: int) -> None: + if pos + count > len(data): + raise NeedMoreData + + def _find_apdu_start(self, data: bytes) -> int: + try: + return data.index(0x0F) + except ValueError: + return -1 + + def _format_obis(self, raw: bytes) -> str: if len(raw) != 6: return raw.hex() a, b, c, d, e, f = raw return f"{a}-{b}:{c}.{d}.{e}.{f}" -# --------------------------------------------------------------------------- -# Convenience: known OBIS codes for the XT211 -# --------------------------------------------------------------------------- - -OBIS_DESCRIPTIONS: dict[str, dict] = { - # --- Idx 1: COSEM logical device name --- - "0-0:42.0.0.255": {"name": "Název zařízení", "unit": "", "class": "text"}, - - # --- Idx 3: Serial number --- - "0-0:96.1.0.255": {"name": "Výrobní číslo", "unit": "", "class": "text"}, - - # --- Idx 4: Disconnector --- - "0-0:96.3.10.255": {"name": "Stav odpojovače", "unit": "", "class": "binary"}, - - # --- Idx 5: Power limiter --- - "0-0:17.0.0.255": {"name": "Limitér", "unit": "W", "class": "power"}, - - # --- Idx 6–11: Relays R1–R6 --- - "0-1:96.3.10.255": {"name": "Stav relé R1", "unit": "", "class": "binary"}, - "0-2:96.3.10.255": {"name": "Stav relé R2", "unit": "", "class": "binary"}, - "0-3:96.3.10.255": {"name": "Stav relé R3", "unit": "", "class": "binary"}, - "0-4:96.3.10.255": {"name": "Stav relé R4", "unit": "", "class": "binary"}, - "0-5:96.3.10.255": {"name": "Stav relé R5", "unit": "", "class": "binary"}, - "0-6:96.3.10.255": {"name": "Stav relé R6", "unit": "", "class": "binary"}, - - # --- Idx 12: Active tariff --- - "0-0:96.14.0.255": {"name": "Aktuální tarif", "unit": "", "class": "text"}, - - # --- Idx 13–16: Instant power import (odběr) --- - "1-0:1.7.0.255": {"name": "Okamžitý příkon odběru celkem", "unit": "W", "class": "power"}, - "1-0:21.7.0.255": {"name": "Okamžitý příkon odběru L1", "unit": "W", "class": "power"}, - "1-0:41.7.0.255": {"name": "Okamžitý příkon odběru L2", "unit": "W", "class": "power"}, - "1-0:61.7.0.255": {"name": "Okamžitý příkon odběru L3", "unit": "W", "class": "power"}, - - # --- Idx 17–20: Instant power export (dodávka / FVE) --- - "1-0:2.7.0.255": {"name": "Okamžitý výkon dodávky celkem", "unit": "W", "class": "power"}, - "1-0:22.7.0.255": {"name": "Okamžitý výkon dodávky L1", "unit": "W", "class": "power"}, - "1-0:42.7.0.255": {"name": "Okamžitý výkon dodávky L2", "unit": "W", "class": "power"}, - "1-0:62.7.0.255": {"name": "Okamžitý výkon dodávky L3", "unit": "W", "class": "power"}, - - # --- Idx 21–25: Cumulative energy import (odběr kWh) --- - "1-0:1.8.0.255": {"name": "Spotřeba energie celkem", "unit": "Wh", "class": "energy"}, - "1-0:1.8.1.255": {"name": "Spotřeba energie T1", "unit": "Wh", "class": "energy"}, - "1-0:1.8.2.255": {"name": "Spotřeba energie T2", "unit": "Wh", "class": "energy"}, - "1-0:1.8.3.255": {"name": "Spotřeba energie T3", "unit": "Wh", "class": "energy"}, - "1-0:1.8.4.255": {"name": "Spotřeba energie T4", "unit": "Wh", "class": "energy"}, - - # --- Idx 26: Cumulative energy export (dodávka kWh) --- - "1-0:2.8.0.255": {"name": "Dodávka energie celkem", "unit": "Wh", "class": "energy"}, - - # --- Idx 27: Consumer message --- - "0-0:96.13.0.255": {"name": "Zpráva pro zákazníka", "unit": "", "class": "text"}, +OBIS_DESCRIPTIONS: dict[str, dict[str, str]] = { + "0-0:42.0.0.255": {"name": "Název zařízení", "unit": "", "class": "text"}, + "0-0:96.1.0.255": {"name": "Výrobní číslo", "unit": "", "class": "text"}, + "0-0:96.1.1.255": {"name": "Výrobní číslo", "unit": "", "class": "text"}, + "0-0:96.3.10.255": {"name": "Stav odpojovače", "unit": "", "class": "binary"}, + "0-0:17.0.0.255": {"name": "Limitér", "unit": "W", "class": "power"}, + "0-1:96.3.10.255": {"name": "Stav relé R1", "unit": "", "class": "binary"}, + "0-2:96.3.10.255": {"name": "Stav relé R2", "unit": "", "class": "binary"}, + "0-3:96.3.10.255": {"name": "Stav relé R3", "unit": "", "class": "binary"}, + "0-4:96.3.10.255": {"name": "Stav relé R4", "unit": "", "class": "binary"}, + "0-5:96.3.10.255": {"name": "Stav relé R5", "unit": "", "class": "binary"}, + "0-6:96.3.10.255": {"name": "Stav relé R6", "unit": "", "class": "binary"}, + "0-0:96.14.0.255": {"name": "Aktuální tarif", "unit": "", "class": "text"}, + "1-0:1.7.0.255": {"name": "Okamžitý příkon odběru celkem", "unit": "W", "class": "power"}, + "1-0:21.7.0.255": {"name": "Okamžitý příkon odběru L1", "unit": "W", "class": "power"}, + "1-0:41.7.0.255": {"name": "Okamžitý příkon odběru L2", "unit": "W", "class": "power"}, + "1-0:61.7.0.255": {"name": "Okamžitý příkon odběru L3", "unit": "W", "class": "power"}, + "1-0:2.7.0.255": {"name": "Okamžitý výkon dodávky celkem", "unit": "W", "class": "power"}, + "1-0:22.7.0.255": {"name": "Okamžitý výkon dodávky L1", "unit": "W", "class": "power"}, + "1-0:42.7.0.255": {"name": "Okamžitý výkon dodávky L2", "unit": "W", "class": "power"}, + "1-0:62.7.0.255": {"name": "Okamžitý výkon dodávky L3", "unit": "W", "class": "power"}, + "1-0:1.8.0.255": {"name": "Spotřeba energie celkem", "unit": "Wh", "class": "energy"}, + "1-0:1.8.1.255": {"name": "Spotřeba energie T1", "unit": "Wh", "class": "energy"}, + "1-0:1.8.2.255": {"name": "Spotřeba energie T2", "unit": "Wh", "class": "energy"}, + "1-0:1.8.3.255": {"name": "Spotřeba energie T3", "unit": "Wh", "class": "energy"}, + "1-0:1.8.4.255": {"name": "Spotřeba energie T4", "unit": "Wh", "class": "energy"}, + "1-0:2.8.0.255": {"name": "Dodávka energie celkem", "unit": "Wh", "class": "energy"}, + "0-0:96.13.0.255": {"name": "Zpráva pro zákazníka", "unit": "", "class": "text"}, } diff --git a/custom_components/xt211_han/manifest.json b/custom_components/xt211_han/manifest.json index 4cd54dd..ed994ac 100644 --- a/custom_components/xt211_han/manifest.json +++ b/custom_components/xt211_han/manifest.json @@ -1,16 +1,22 @@ { "domain": "xt211_han", "name": "XT211 HAN (RS485 via Ethernet)", - "version": "0.7.5", + "version": "0.7.6", "documentation": "https://github.com/nero150/xt211-han-ha", "issue_tracker": "https://github.com/nero150/xt211-han-ha/issues", "dependencies": [], - "codeowners": ["@nero150"], + "codeowners": [ + "@nero150" + ], "requirements": [], "iot_class": "local_push", "config_flow": true, "dhcp": [ - {"macaddress": "D8B04C*"}, - {"macaddress": "B4E62D*"} + { + "macaddress": "D8B04C*" + }, + { + "macaddress": "B4E62D*" + } ] } diff --git a/custom_components/xt211_han/sensor.py b/custom_components/xt211_han/sensor.py index 499c4bb..ed92a74 100644 --- a/custom_components/xt211_han/sensor.py +++ b/custom_components/xt211_han/sensor.py @@ -1,53 +1,28 @@ -"""Sensor platform for XT211 HAN integration. - -Registers three types of entities: - - Numeric sensors (power, energy) - - Text sensors (serial number, tariff, limiter) - - Binary sensors (disconnector, relays) -""" +"""Sensor platform for XT211 HAN integration.""" from __future__ import annotations -import logging -from typing import Any - -from homeassistant.components.binary_sensor import ( - BinarySensorDeviceClass, - BinarySensorEntity, -) -from homeassistant.components.sensor import ( - SensorDeviceClass, - SensorEntity, - SensorStateClass, -) +from homeassistant.components.sensor import SensorDeviceClass, SensorEntity, SensorStateClass from homeassistant.config_entries import ConfigEntry -from homeassistant.const import ( - CONF_NAME, - EntityCategory, - UnitOfEnergy, - UnitOfPower, -) +from homeassistant.const import CONF_NAME, EntityCategory, UnitOfEnergy, UnitOfPower from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity from .const import ( - DOMAIN, - CONF_PHASES, CONF_HAS_FVE, - CONF_TARIFFS, + CONF_PHASES, CONF_RELAY_COUNT, + CONF_TARIFFS, + DOMAIN, PHASES_3, - TARIFFS_2, RELAYS_4, + TARIFFS_2, ) from .coordinator import XT211Coordinator from .dlms_parser import OBIS_DESCRIPTIONS -_LOGGER = logging.getLogger(__name__) - -# Map OBIS "class" → HA SensorDeviceClass + StateClass + unit SENSOR_META: dict[str, dict] = { "power": { "device_class": SensorDeviceClass.POWER, @@ -66,23 +41,22 @@ SENSOR_META: dict[str, dict] = { }, } -# OBIS codes that send text values (not numeric) TEXT_OBIS = { - "0-0:42.0.0.255", # COSEM logical device name - "0-0:96.1.0.255", # Serial number - "0-0:96.14.0.255", # Current tariff - "0-0:96.13.0.255", # Consumer message + "0-0:42.0.0.255", + "0-0:96.1.0.255", + "0-0:96.1.1.255", + "0-0:96.14.0.255", + "0-0:96.13.0.255", } -# OBIS codes that are binary (on/off) BINARY_OBIS = { - "0-0:96.3.10.255", # Disconnector - "0-1:96.3.10.255", # Relay R1 - "0-2:96.3.10.255", # Relay R2 - "0-3:96.3.10.255", # Relay R3 - "0-4:96.3.10.255", # Relay R4 - "0-5:96.3.10.255", # Relay R5 - "0-6:96.3.10.255", # Relay R6 + "0-0:96.3.10.255", + "0-1:96.3.10.255", + "0-2:96.3.10.255", + "0-3:96.3.10.255", + "0-4:96.3.10.255", + "0-5:96.3.10.255", + "0-6:96.3.10.255", } @@ -95,33 +69,24 @@ def _device_info(entry: ConfigEntry) -> DeviceInfo: ) -async def async_setup_entry( - hass: HomeAssistant, - entry: ConfigEntry, - async_add_entities: AddEntitiesCallback, -) -> None: - """Set up all XT211 HAN entities from a config entry, filtered by meter config.""" - coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id] +def build_enabled_obis(entry: ConfigEntry) -> set[str]: + phases = entry.data.get(CONF_PHASES, PHASES_3) + has_fve = entry.data.get(CONF_HAS_FVE, True) + tariffs = int(entry.data.get(CONF_TARIFFS, TARIFFS_2)) + relay_count = int(entry.data.get(CONF_RELAY_COUNT, RELAYS_4)) - phases = entry.data.get(CONF_PHASES, PHASES_3) - has_fve = entry.data.get(CONF_HAS_FVE, True) - tariffs = int(entry.data.get(CONF_TARIFFS, TARIFFS_2)) - relay_count = int(entry.data.get(CONF_RELAY_COUNT, RELAYS_4)) - - # Build set of OBIS codes to include based on user config - enabled_obis: set[str] = set() - - # Always include: device name, serial, tariff, consumer message, disconnector, limiter - enabled_obis.update({ + enabled_obis: set[str] = { "0-0:42.0.0.255", "0-0:96.1.0.255", + "0-0:96.1.1.255", "0-0:96.14.0.255", "0-0:96.13.0.255", "0-0:96.3.10.255", "0-0:17.0.0.255", - }) + "1-0:1.7.0.255", + "1-0:1.8.0.255", + } - # Relays – according to relay_count relay_obis = { 1: "0-1:96.3.10.255", 2: "0-2:96.3.10.255", @@ -130,215 +95,96 @@ async def async_setup_entry( 5: "0-5:96.3.10.255", 6: "0-6:96.3.10.255", } - for i in range(1, relay_count + 1): - enabled_obis.add(relay_obis[i]) + for idx in range(1, relay_count + 1): + enabled_obis.add(relay_obis[idx]) - # Instant power import – total always included - enabled_obis.add("1-0:1.7.0.255") if phases == PHASES_3: enabled_obis.update({"1-0:21.7.0.255", "1-0:41.7.0.255", "1-0:61.7.0.255"}) - # Instant power export – only with FVE if has_fve: enabled_obis.add("1-0:2.7.0.255") + enabled_obis.add("1-0:2.8.0.255") if phases == PHASES_3: enabled_obis.update({"1-0:22.7.0.255", "1-0:42.7.0.255", "1-0:62.7.0.255"}) - # Cumulative energy import – total + tariffs - enabled_obis.add("1-0:1.8.0.255") - for t in range(1, tariffs + 1): - enabled_obis.add(f"1-0:1.8.{t}.255") + for tariff in range(1, tariffs + 1): + enabled_obis.add(f"1-0:1.8.{tariff}.255") - # Cumulative energy export – only with FVE - if has_fve: - enabled_obis.add("1-0:2.8.0.255") + return enabled_obis - _LOGGER.debug( - "XT211 config: phases=%s fve=%s tariffs=%d relays=%d → %d entities", - phases, has_fve, tariffs, relay_count, len(enabled_obis), - ) - entities: list = [] - registered_obis: set[str] = set() - - for obis, meta in OBIS_DESCRIPTIONS.items(): - if obis not in enabled_obis: - continue - registered_obis.add(obis) - if obis in BINARY_OBIS: - entities.append(XT211BinarySensorEntity(coordinator, entry, obis, meta)) - elif obis in TEXT_OBIS: - entities.append(XT211TextSensorEntity(coordinator, entry, obis, meta)) - else: - entities.append(XT211SensorEntity(coordinator, entry, obis, meta)) +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id] + enabled_obis = build_enabled_obis(entry) + entities = [ + XT211SensorEntity(coordinator, entry, obis, meta) + for obis, meta in OBIS_DESCRIPTIONS.items() + if obis in enabled_obis and obis not in BINARY_OBIS + ] async_add_entities(entities) - # Dynamically register any unknown OBIS codes that arrive at runtime + registered_obis = {entity._obis for entity in entities} + @callback def _on_update() -> None: if not coordinator.data: return - new: list = [] + new_entities = [] for obis, data in coordinator.data.items(): - if obis in registered_obis or obis not in enabled_obis: + if obis in registered_obis or obis not in enabled_obis or obis in BINARY_OBIS: continue registered_obis.add(obis) - _LOGGER.info("XT211: discovered new OBIS code %s – adding entity", obis) - if obis in BINARY_OBIS: - new.append(XT211BinarySensorEntity(coordinator, entry, obis, data)) - elif obis in TEXT_OBIS: - new.append(XT211TextSensorEntity(coordinator, entry, obis, data)) - else: - new.append(XT211SensorEntity(coordinator, entry, obis, data)) - if new: - async_add_entities(new) + new_entities.append(XT211SensorEntity(coordinator, entry, obis, data)) + if new_entities: + async_add_entities(new_entities) coordinator.async_add_listener(_on_update) -# --------------------------------------------------------------------------- -# Numeric sensor -# --------------------------------------------------------------------------- - class XT211SensorEntity(CoordinatorEntity[XT211Coordinator], SensorEntity): - """Numeric sensor (power / energy / generic).""" - _attr_has_entity_name = True - def __init__( - self, - coordinator: XT211Coordinator, - entry: ConfigEntry, - obis: str, - meta: dict, - ) -> None: + def __init__(self, coordinator: XT211Coordinator, entry: ConfigEntry, obis: str, meta: dict) -> None: super().__init__(coordinator) - self._obis = obis - self._entry = entry - sensor_type = meta.get("class", "sensor") - sm = SENSOR_META.get(sensor_type, SENSOR_META["sensor"]) - + sensor_meta = SENSOR_META.get(sensor_type, SENSOR_META["sensor"]) + self._entry = entry + self._obis = obis + self._wh_to_kwh = sensor_type == "energy" + self._text = obis in TEXT_OBIS self._attr_unique_id = f"{entry.entry_id}_{obis}" self._attr_name = meta.get("name", obis) - self._attr_device_class = sm["device_class"] - self._attr_state_class = sm["state_class"] - self._attr_native_unit_of_measurement = sm["unit"] or meta.get("unit") - self._wh_to_kwh = (sensor_type == "energy") + self._attr_device_class = None if self._text else sensor_meta["device_class"] + self._attr_state_class = None if self._text else sensor_meta["state_class"] + self._attr_native_unit_of_measurement = None if self._text else (sensor_meta["unit"] or meta.get("unit")) + if self._text: + self._attr_entity_category = EntityCategory.DIAGNOSTIC @property def device_info(self) -> DeviceInfo: return _device_info(self._entry) @property - def native_value(self) -> float | None: - if not self.coordinator.data: - return None - obj = self.coordinator.data.get(self._obis) + def native_value(self): + obj = (self.coordinator.data or {}).get(self._obis) if obj is None: return None - raw = obj.get("value") + value = obj.get("value") + if self._text: + return None if value is None else str(value) try: - val = float(raw) - if self._wh_to_kwh: - val = val / 1000.0 - return round(val, 3) + number = float(value) except (TypeError, ValueError): return None + if self._wh_to_kwh: + number /= 1000.0 + return round(number, 3) @property def available(self) -> bool: - return self.coordinator.connected and self.coordinator.data is not None - - -# --------------------------------------------------------------------------- -# Text sensor -# --------------------------------------------------------------------------- - -class XT211TextSensorEntity(CoordinatorEntity[XT211Coordinator], SensorEntity): - """Text sensor (serial number, tariff).""" - - _attr_has_entity_name = True - _attr_entity_category = EntityCategory.DIAGNOSTIC - - def __init__( - self, - coordinator: XT211Coordinator, - entry: ConfigEntry, - obis: str, - meta: dict, - ) -> None: - super().__init__(coordinator) - self._obis = obis - self._entry = entry - self._attr_unique_id = f"{entry.entry_id}_{obis}" - self._attr_name = meta.get("name", obis) - self._attr_device_class = None - self._attr_state_class = None - self._attr_native_unit_of_measurement = None - - @property - def device_info(self) -> DeviceInfo: - return _device_info(self._entry) - - @property - def native_value(self) -> str | None: - if not self.coordinator.data: - return None - obj = self.coordinator.data.get(self._obis) - if obj is None: - return None - val = obj.get("value") - return str(val) if val is not None else None - - @property - def available(self) -> bool: - return self.coordinator.connected and self.coordinator.data is not None - - -# --------------------------------------------------------------------------- -# Binary sensor -# --------------------------------------------------------------------------- - -class XT211BinarySensorEntity(CoordinatorEntity[XT211Coordinator], BinarySensorEntity): - """Binary sensor (disconnector / relay status).""" - - _attr_has_entity_name = True - _attr_device_class = BinarySensorDeviceClass.PLUG - - def __init__( - self, - coordinator: XT211Coordinator, - entry: ConfigEntry, - obis: str, - meta: dict, - ) -> None: - super().__init__(coordinator) - self._obis = obis - self._entry = entry - self._attr_unique_id = f"{entry.entry_id}_{obis}" - self._attr_name = meta.get("name", obis) - - @property - def device_info(self) -> DeviceInfo: - return _device_info(self._entry) - - @property - def is_on(self) -> bool | None: - if not self.coordinator.data: - return None - obj = self.coordinator.data.get(self._obis) - if obj is None: - return None - val = obj.get("value") - if isinstance(val, bool): - return val - try: - return int(val) != 0 - except (TypeError, ValueError): - return None - - @property - def available(self) -> bool: - return self.coordinator.connected and self.coordinator.data is not None + return self.coordinator.data is not None