From 968ef18f434b44fbb34c94b7248a5fb4f8d56af1 Mon Sep 17 00:00:00 2001 From: nero150 <95982029+nero150@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:20:25 +0100 Subject: [PATCH] Add files via upload --- LICENSE | 2 +- README.md | 140 +++++- custom_components/xt211_han/__init__.py | 62 +++ custom_components/xt211_han/config_flow.py | 85 ++++ custom_components/xt211_han/const.py | 10 + custom_components/xt211_han/coordinator.py | 196 ++++++++ custom_components/xt211_han/dlms_parser.py | 458 ++++++++++++++++++ custom_components/xt211_han/manifest.json | 12 + custom_components/xt211_han/sensor.py | 158 ++++++ custom_components/xt211_han/strings.json | 22 + .../xt211_han/translations/cs.json | 22 + .../xt211_han/translations/en.json | 22 + hacs.json | 8 + test_parser.py | 118 +++++ 14 files changed, 1312 insertions(+), 3 deletions(-) create mode 100644 custom_components/xt211_han/__init__.py create mode 100644 custom_components/xt211_han/config_flow.py create mode 100644 custom_components/xt211_han/const.py create mode 100644 custom_components/xt211_han/coordinator.py create mode 100644 custom_components/xt211_han/dlms_parser.py create mode 100644 custom_components/xt211_han/manifest.json create mode 100644 custom_components/xt211_han/sensor.py create mode 100644 custom_components/xt211_han/strings.json create mode 100644 custom_components/xt211_han/translations/cs.json create mode 100644 custom_components/xt211_han/translations/en.json create mode 100644 hacs.json create mode 100644 test_parser.py diff --git a/LICENSE b/LICENSE index 1aa0c60..14fac91 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2026 nero150 +Copyright (c) 2026 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 7799fdf..a73e475 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,138 @@ -# CEZ_rele_box -Připojení relé boxu do Home asistenta +# XT211 HAN – Home Assistant Integration + +[![hacs_badge](https://img.shields.io/badge/HACS-Custom-orange.svg)](https://github.com/hacs/integration) +![Maintained](https://img.shields.io/maintenance/yes/2026) + +> **Čtení dat z elektroměru Sagemcom XT211 (ČEZ Distribuce) přes RS485-to-Ethernet adaptér – bez ESP32.** + +Tato integrace nahrazuje ESPHome řešení s ESP32 + RS485→TTL převodníkem. Místo toho používá průmyslový RS485-to-Ethernet adaptér (doporučen **PUSR USR-DR134**), který posílá syrová RS485 data přes TCP přímo do Home Assistantu. + +--- + +## Jak to funguje + +``` +XT211 / WM-RelayBox + └── RJ12 HAN port (RS485, 9600 baud) + └── USR-DR134 (RS485 → Ethernet) + └── TCP socket (LAN) + └── Home Assistant (tato integrace) +``` + +Elektroměr posílá DLMS/COSEM PUSH zprávy každých **60 sekund**. Integrace udržuje persistentní TCP spojení k adaptéru a dekóduje příchozí HDLC rámce. + +--- + +## Požadavky + +- Home Assistant 2024.1+ +- RS485-to-Ethernet adaptér s TCP server módem: + - **PUSR USR-DR134** (doporučeno) – RS485, DIN rail, 5–24V + - Nebo jiný kompatibilní adaptér (USR-TCP232-410S, Waveshare, apod.) + +--- + +## Instalace přes HACS + +1. Otevři HACS → **Integrace** → tři tečky vpravo nahoře → **Vlastní repozitáře** +2. Přidej URL tohoto repozitáře, kategorie: **Integration** +3. Najdi „XT211 HAN" a nainstaluj +4. Restartuj Home Assistant +5. **Nastavení → Zařízení a služby → Přidat integraci → XT211 HAN** + +--- + +## Nastavení adaptéru USR-DR134 + +Nastavení přes webové rozhraní adaptéru (výchozí IP `192.168.0.7`): + +| Parametr | Hodnota | +|----------|---------| +| Work Mode | **TCP Server** | +| Local Port | `8899` (nebo libovolný) | +| Baud Rate | `9600` | +| Data Bits | `8` | +| Stop Bits | `1` | +| Parity | `None` | +| Flow Control | `None` | + +> ⚠️ Použij model **USR-DR134** (RS485), ne DR132 (RS232)! + +--- + +## Zapojení + +``` +WM-RelayBox HAN port (RJ12): + Pin 3 (Data A+) → USR-DR134 terminal A+ + Pin 4 (Data B-) → USR-DR134 terminal B- + Pin 6 (GND) → USR-DR134 GND (volitelné) +``` + +Napájení USR-DR134: 5–24V DC (např. z USB adaptéru přes step-up, nebo 12V zdroj). + +--- + +## Dostupné senzory + +| Název | OBIS kód | Jednotka | +|-------|----------|----------| +| Active Power Consumption | `1-0:1.7.0.255` | W | +| Active Power Delivery | `1-0:2.7.0.255` | W | +| Active Power L1 | `1-0:21.7.0.255` | W | +| Active Power L2 | `1-0:41.7.0.255` | W | +| Active Power L3 | `1-0:61.7.0.255` | W | +| Energy Consumed | `1-0:1.8.0.255` | kWh | +| Energy Consumed T1 | `1-0:1.8.1.255` | kWh | +| Energy Consumed T2 | `1-0:1.8.2.255` | kWh | +| Energy Delivered | `1-0:2.8.0.255` | kWh | +| Serial Number | `0-0:96.1.1.255` | – | +| Current Tariff | `0-0:96.14.0.255` | – | +| Disconnector Status | `0-0:96.3.10.255` | – | + +--- + +## Ladění (debug) + +Přidej do `configuration.yaml`: + +```yaml +logger: + default: warning + logs: + custom_components.xt211_han: debug +``` + +V logu uvidíš surová hex data každého HDLC rámce a dekódované OBIS hodnoty. + +--- + +## Struktura repozitáře + +``` +custom_components/xt211_han/ +├── __init__.py # Inicializace integrace +├── manifest.json # Metadata pro HA / HACS +├── const.py # Konstanty +├── config_flow.py # UI průvodce nastavením +├── coordinator.py # TCP listener + DataUpdateCoordinator +├── sensor.py # Senzorová platforma +├── dlms_parser.py # HDLC / DLMS / COSEM parser +├── strings.json # Texty UI +└── translations/ + ├── cs.json # Čeština + └── en.json # Angličtina +``` + +--- + +## Poděkování / Credits + +- [Tomer27cz/xt211](https://github.com/Tomer27cz/xt211) – původní ESPHome komponenta a dokumentace protokolu +- ČEZ Distribuce – dokumentace OBIS kódů a RS485 rozhraní + +--- + +## Licence + +MIT diff --git a/custom_components/xt211_han/__init__.py b/custom_components/xt211_han/__init__.py new file mode 100644 index 0000000..5874f46 --- /dev/null +++ b/custom_components/xt211_han/__init__.py @@ -0,0 +1,62 @@ +"""XT211 HAN integration for Home Assistant. + +Reads DLMS/COSEM PUSH data from a Sagemcom XT211 smart meter via a +RS485-to-Ethernet adapter (e.g. PUSR USR-DR134) over TCP. +No ESP32 or dedicated hardware needed beyond the adapter. +""" + +from __future__ import annotations + +import logging + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME, Platform +from homeassistant.core import HomeAssistant + +from .const import DOMAIN, DEFAULT_NAME +from .coordinator import XT211Coordinator + +_LOGGER = logging.getLogger(__name__) + +PLATFORMS = [Platform.SENSOR] + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up XT211 HAN from a config entry.""" + hass.data.setdefault(DOMAIN, {}) + + coordinator = XT211Coordinator( + hass, + host=entry.data[CONF_HOST], + port=entry.data[CONF_PORT], + name=entry.data.get(CONF_NAME, DEFAULT_NAME), + ) + + hass.data[DOMAIN][entry.entry_id] = coordinator + + # Start the background TCP listener + await coordinator.async_setup() + + # Set up sensor platform + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + + _LOGGER.info( + "XT211 HAN integration started for %s:%d", + entry.data[CONF_HOST], + entry.data[CONF_PORT], + ) + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + coordinator: XT211Coordinator = hass.data[DOMAIN].get(entry.entry_id) + if coordinator: + await coordinator.async_shutdown() + + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + + if unload_ok: + hass.data[DOMAIN].pop(entry.entry_id, None) + + return unload_ok diff --git a/custom_components/xt211_han/config_flow.py b/custom_components/xt211_han/config_flow.py new file mode 100644 index 0000000..859c47c --- /dev/null +++ b/custom_components/xt211_han/config_flow.py @@ -0,0 +1,85 @@ +"""Config flow for XT211 HAN integration.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME +from homeassistant.data_entry_flow import FlowResult + +from .const import DOMAIN, DEFAULT_PORT, DEFAULT_NAME + +_LOGGER = logging.getLogger(__name__) + +STEP_USER_DATA_SCHEMA = vol.Schema( + { + vol.Required(CONF_HOST): str, + vol.Required(CONF_PORT, default=DEFAULT_PORT): int, + vol.Optional(CONF_NAME, default=DEFAULT_NAME): str, + } +) + + +async def _test_connection(host: str, port: int) -> str | None: + """Try to open a TCP connection. Returns error string or None on success.""" + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), + timeout=5, + ) + writer.close() + await writer.wait_closed() + return None + except asyncio.TimeoutError: + return "cannot_connect" + except OSError: + return "cannot_connect" + except Exception: + return "unknown" + + +class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Handle the config flow for XT211 HAN.""" + + VERSION = 1 + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + errors: dict[str, str] = {} + + if user_input is not None: + host = user_input[CONF_HOST] + port = user_input[CONF_PORT] + name = user_input.get(CONF_NAME, DEFAULT_NAME) + + # Prevent duplicate entries + await self.async_set_unique_id(f"{host}:{port}") + self._abort_if_unique_id_configured() + + error = await _test_connection(host, port) + if error: + errors["base"] = error + else: + return self.async_create_entry( + title=f"{name} ({host}:{port})", + data={ + CONF_HOST: host, + CONF_PORT: port, + CONF_NAME: name, + }, + ) + + return self.async_show_form( + step_id="user", + data_schema=STEP_USER_DATA_SCHEMA, + errors=errors, + description_placeholders={ + "default_port": str(DEFAULT_PORT), + }, + ) diff --git a/custom_components/xt211_han/const.py b/custom_components/xt211_han/const.py new file mode 100644 index 0000000..46255bc --- /dev/null +++ b/custom_components/xt211_han/const.py @@ -0,0 +1,10 @@ +"""Constants for XT211 HAN integration.""" + +DOMAIN = "xt211_han" + +CONF_HOST = "host" +CONF_PORT = "port" +CONF_NAME = "name" + +DEFAULT_PORT = 8899 +DEFAULT_NAME = "XT211 HAN" diff --git a/custom_components/xt211_han/coordinator.py b/custom_components/xt211_han/coordinator.py new file mode 100644 index 0000000..879c1fe --- /dev/null +++ b/custom_components/xt211_han/coordinator.py @@ -0,0 +1,196 @@ +""" +Coordinator for XT211 HAN integration. + +Opens a persistent TCP connection to the RS485-to-Ethernet adapter +(e.g. USR-DR134) and receives DLMS/COSEM PUSH frames every 60 seconds. +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import timedelta +from typing import Any + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +from .dlms_parser import DLMSParser, DLMSObject, OBIS_DESCRIPTIONS +from .const import DOMAIN + +_LOGGER = logging.getLogger(__name__) + +# We expect a frame every 60 s; allow some margin before timing out +PUSH_TIMEOUT = 90 # seconds +RECONNECT_DELAY = 10 # seconds after connection loss + + +class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]): + """ + Coordinator that maintains a persistent TCP connection to the + RS485-to-Ethernet adapter and decodes incoming DLMS PUSH frames. + + Data is published to HA listeners whenever a new frame arrives, + not on a fixed poll interval (update_interval=None triggers manual). + """ + + def __init__( + self, + hass: HomeAssistant, + host: str, + port: int, + name: str, + ) -> None: + super().__init__( + hass, + _LOGGER, + name=f"XT211 HAN ({host}:{port})", + update_interval=None, # push-driven, not poll-driven + ) + self.host = host + self.port = port + self.device_name = name + self._parser = DLMSParser() + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._listen_task: asyncio.Task | None = None + self._connected = False + + # ------------------------------------------------------------------ + # Public helpers + # ------------------------------------------------------------------ + + @property + def connected(self) -> bool: + return self._connected + + async def async_setup(self) -> None: + """Start the background listener task.""" + if self._listen_task is None or self._listen_task.done(): + self._listen_task = self.hass.async_create_background_task( + self._listen_loop(), + name=f"xt211_han_{self.host}_{self.port}", + ) + + async def async_shutdown(self) -> None: + """Stop the listener and close the connection.""" + if self._listen_task: + self._listen_task.cancel() + try: + await self._listen_task + except asyncio.CancelledError: + pass + await self._disconnect() + + # ------------------------------------------------------------------ + # DataUpdateCoordinator required override + # ------------------------------------------------------------------ + + async def _async_update_data(self) -> dict[str, Any]: + """Called by HA when entities want a refresh. Returns current data.""" + if self.data is None: + return {} + return self.data + + # ------------------------------------------------------------------ + # TCP listener loop + # ------------------------------------------------------------------ + + async def _listen_loop(self) -> None: + """Main loop: connect → receive → reconnect on error.""" + while True: + try: + await self._connect() + await self._receive_loop() + except asyncio.CancelledError: + _LOGGER.info("XT211 listener task cancelled") + raise + except Exception as exc: + _LOGGER.warning( + "XT211 connection error (%s:%d): %s – retrying in %ds", + self.host, self.port, exc, RECONNECT_DELAY, + ) + self._connected = False + finally: + await self._disconnect() + + await asyncio.sleep(RECONNECT_DELAY) + + async def _connect(self) -> None: + _LOGGER.info("Connecting to XT211 adapter at %s:%d", self.host, self.port) + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self.host, self.port), + timeout=10, + ) + self._connected = True + self._parser = DLMSParser() # reset parser state on new connection + _LOGGER.info("Connected to XT211 adapter at %s:%d", self.host, self.port) + + async def _disconnect(self) -> None: + self._connected = False + if self._writer: + try: + self._writer.close() + await self._writer.wait_closed() + except Exception: + pass + self._writer = None + self._reader = None + + async def _receive_loop(self) -> None: + """Read bytes from the TCP stream and feed them to the parser.""" + assert self._reader is not None + + while True: + try: + chunk = await asyncio.wait_for( + self._reader.read(4096), + timeout=PUSH_TIMEOUT, + ) + except asyncio.TimeoutError: + _LOGGER.warning( + "No data from XT211 for %d s – reconnecting", PUSH_TIMEOUT + ) + raise ConnectionError("Push timeout") + + if not chunk: + _LOGGER.warning("XT211 adapter closed connection") + raise ConnectionError("Remote closed") + + self._parser.feed(chunk) + + # Process all complete frames in the buffer + while True: + result = self._parser.get_frame() + if result is None: + break + if result.success: + await self._process_frame(result.objects) + else: + _LOGGER.debug( + "Frame parse error: %s (raw: %s)", + result.error, result.raw_hex[:80], + ) + + async def _process_frame(self, objects: list[DLMSObject]) -> None: + """Update coordinator data from a decoded DLMS frame.""" + if not objects: + _LOGGER.debug("Received empty DLMS frame") + return + + current = dict(self.data or {}) + + for obj in objects: + meta = OBIS_DESCRIPTIONS.get(obj.obis, {}) + current[obj.obis] = { + "value": obj.value, + "unit": obj.unit or meta.get("unit", ""), + "name": meta.get("name", obj.obis), + "class": meta.get("class", "sensor"), + } + _LOGGER.debug( + "OBIS %s = %s %s", obj.obis, obj.value, obj.unit + ) + + self.async_set_updated_data(current) + _LOGGER.debug("Updated %d DLMS objects from XT211 frame", len(objects)) diff --git a/custom_components/xt211_han/dlms_parser.py b/custom_components/xt211_han/dlms_parser.py new file mode 100644 index 0000000..e0a4c04 --- /dev/null +++ b/custom_components/xt211_han/dlms_parser.py @@ -0,0 +1,458 @@ +""" +DLMS/COSEM PUSH mode parser for Sagemcom XT211 smart meter. + +The XT211 sends unsolicited HDLC-framed DLMS/COSEM data every 60 seconds +over RS485 (9600 baud, 8N1). This module decodes those frames. + +Frame structure (HDLC): + 7E - HDLC flag + A0 xx - Frame type + length + 00 02 00 01 ... - Destination / source addresses + 13 - Control byte (UI frame) + xx xx - HCS (header checksum) + [LLC header] - E6 E7 00 + [APDU] - DLMS application data (tag 0F = Data-notification) + xx xx - FCS (frame checksum) + 7E - HDLC flag + +OBIS codes supported (from ČEZ Distribuce spec): + 0-0:96.1.1.255 - Serial number (Device ID) + 0-0:96.3.10.255 - Disconnector status + 0-0:96.14.0.255 - Current tariff + 1-0:1.7.0.255 - Instant active power consumption (W) + 1-0:2.7.0.255 - Instant active power delivery (W) + 1-0:21.7.0.255 - Instant power L1 (W) + 1-0:41.7.0.255 - Instant power L2 (W) + 1-0:61.7.0.255 - Instant power L3 (W) + 1-0:1.8.0.255 - Active energy consumed (Wh) + 1-0:1.8.1.255 - Active energy T1 (Wh) + 1-0:1.8.2.255 - Active energy T2 (Wh) + 1-0:2.8.0.255 - Active energy delivered (Wh) + 0-1:96.3.10.255 - Relay R1 status + 0-2:96.3.10.255 - Relay R2 status + 0-3:96.3.10.255 - Relay R3 status + 0-4:96.3.10.255 - Relay R4 status +""" + +from __future__ import annotations + +import logging +import struct +from dataclasses import dataclass +from typing import Any + +_LOGGER = logging.getLogger(__name__) + +HDLC_FLAG = 0x7E + +# DLMS data types +DLMS_TYPE_NULL = 0x00 +DLMS_TYPE_BOOL = 0x03 +DLMS_TYPE_INT8 = 0x0F +DLMS_TYPE_INT16 = 0x10 +DLMS_TYPE_UINT8 = 0x11 +DLMS_TYPE_UINT16 = 0x12 +DLMS_TYPE_INT32 = 0x05 +DLMS_TYPE_UINT32 = 0x06 +DLMS_TYPE_INT64 = 0x14 +DLMS_TYPE_UINT64 = 0x15 +DLMS_TYPE_FLOAT32 = 0x16 +DLMS_TYPE_FLOAT64 = 0x17 +DLMS_TYPE_OCTET_STRING = 0x09 +DLMS_TYPE_VISIBLE_STRING = 0x0A +DLMS_TYPE_ARRAY = 0x01 +DLMS_TYPE_STRUCTURE = 0x02 +DLMS_TYPE_COMPACT_ARRAY = 0x13 + +# SI unit multipliers (DLMS scaler) +# Scaler is a signed int8 representing 10^scaler +def apply_scaler(value: int | float, scaler: int) -> float: + """Apply DLMS scaler (10^scaler) to a raw value.""" + return float(value) * (10 ** scaler) + + +@dataclass +class DLMSObject: + """A single decoded DLMS COSEM object.""" + obis: str # e.g. "1-0:1.8.0.255" + value: Any # decoded Python value + unit: str = "" # e.g. "W", "Wh", "" + scaler: int = 0 # raw scaler from frame + + +@dataclass +class ParseResult: + """Result of parsing one HDLC frame.""" + success: bool + objects: list[DLMSObject] + raw_hex: str = "" + error: str = "" + + +class DLMSParser: + """ + Stateful DLMS/COSEM PUSH mode parser for XT211. + + Usage: + parser = DLMSParser() + parser.feed(bytes_from_tcp) + while (result := parser.get_frame()): + process(result) + """ + + # DLMS unit codes → human readable strings + UNIT_MAP = { + 1: "a", 2: "mo", 3: "wk", 4: "d", 5: "h", + 6: "min", 7: "s", 8: "°", 9: "°C", 10: "currency", + 11: "m", 12: "m/s", 13: "m³", 14: "m³", 15: "m³/h", + 16: "m³/h", 17: "m³/d", 18: "m³/d", 19: "l", 20: "kg", + 21: "N", 22: "Nm", 23: "Pa", 24: "bar", 25: "J", + 26: "J/h", 27: "W", 28: "VA", 29: "var", 30: "Wh", + 31: "VAh", 32: "varh", 33: "A", 34: "C", 35: "V", + 36: "V/m", 37: "F", 38: "Ω", 39: "Ωm²/m",40: "Wb", + 41: "T", 42: "A/m", 43: "H", 44: "Hz", 45: "1/Wh", + 46: "1/varh",47: "1/VAh",48: "V²h", 49: "A²h", 50: "kg/s", + 51: "S", 52: "K", 53: "1/(V²h)",54: "1/(A²h)", + 255: "", 0: "", + } + + def __init__(self) -> None: + self._buffer = bytearray() + + def feed(self, data: bytes) -> None: + """Add raw bytes from TCP socket to the internal buffer.""" + self._buffer.extend(data) + + def get_frame(self) -> ParseResult | None: + """ + Try to extract and parse one complete HDLC frame from the buffer. + Returns ParseResult if a frame was found, None if more data is needed. + """ + buf = self._buffer + + # Find opening flag + start = buf.find(HDLC_FLAG) + if start == -1: + self._buffer.clear() + return None + if start > 0: + _LOGGER.debug("Discarding %d bytes before HDLC flag", start) + del self._buffer[:start] + buf = self._buffer + + if len(buf) < 3: + return None + + # Parse frame length from bytes 1-2 (A0 XX or A8 XX) + # Bits 11-0 of bytes 1-2 give frame length + frame_len = ((buf[1] & 0x07) << 8) | buf[2] + + # Total on-wire length = frame_len + 2 flags (opening already at 0, closing at frame_len+1) + total = frame_len + 2 + if len(buf) < total: + return None # incomplete frame, wait for more data + + raw = bytes(buf[:total]) + del self._buffer[:total] + + raw_hex = raw.hex() + _LOGGER.debug("HDLC frame: %s", raw_hex) + + # Basic sanity: starts and ends with 0x7E + if raw[0] != HDLC_FLAG or raw[-1] != HDLC_FLAG: + return ParseResult(success=False, raw_hex=raw_hex, error="Missing HDLC flags") + + try: + result = self._parse_hdlc(raw) + result.raw_hex = raw_hex + return result + except Exception as exc: + _LOGGER.exception("Error parsing HDLC frame") + return ParseResult(success=False, raw_hex=raw_hex, error=str(exc)) + + # ------------------------------------------------------------------ + # Internal parsing methods + # ------------------------------------------------------------------ + + def _parse_hdlc(self, raw: bytes) -> ParseResult: + """Parse full HDLC frame and extract DLMS objects.""" + pos = 1 # skip opening flag + + # Frame format byte (should be A0 or A8) + # bits 11-0 = length + _frame_type = raw[pos] & 0xF8 + frame_len = ((raw[pos] & 0x07) << 8) | raw[pos + 1] + pos += 2 + + # Destination address (variable length, LSB=1 means last byte) + dest_addr, pos = self._read_hdlc_address(raw, pos) + # Source address + src_addr, pos = self._read_hdlc_address(raw, pos) + + # Control byte + control = raw[pos]; pos += 1 + + # HCS (2 bytes header checksum) - skip + pos += 2 + + # From here: LLC + APDU + # LLC header: E6 E7 00 (or E6 E6 00 for request) + if pos + 3 > len(raw) - 3: + return ParseResult(success=False, error="Frame too short for LLC") + + llc = raw[pos:pos+3]; pos += 3 + _LOGGER.debug("LLC: %s dest=%s src=%s", llc.hex(), dest_addr, src_addr) + + # APDU starts here, ends 3 bytes before end (FCS + closing flag) + apdu = raw[pos:-3] + _LOGGER.debug("APDU (%d bytes): %s", len(apdu), apdu.hex()) + + return self._parse_apdu(apdu) + + def _read_hdlc_address(self, data: bytes, pos: int) -> tuple[int, int]: + """Read HDLC variable-length address. Returns (address_value, new_pos).""" + addr = 0 + shift = 0 + while pos < len(data): + byte = data[pos]; pos += 1 + addr |= (byte >> 1) << shift + shift += 7 + if byte & 0x01: # last byte of address + break + return addr, pos + + def _parse_apdu(self, apdu: bytes) -> ParseResult: + """Parse DLMS APDU (Data-Notification = tag 0x0F).""" + if not apdu: + return ParseResult(success=False, error="Empty APDU") + + tag = apdu[0] + + if tag != 0x0F: + return ParseResult( + success=False, + error=f"Unexpected APDU tag 0x{tag:02X} (expected 0x0F Data-Notification)" + ) + + # Data-Notification structure: + # 0F [long-invoke-id-and-priority 4B] [date-time opt] [notification-body] + pos = 1 + if len(apdu) < 5: + return ParseResult(success=False, error="APDU too short") + + # Long invoke-id-and-priority (4 bytes) + invoke_id = struct.unpack_from(">I", apdu, pos)[0]; pos += 4 + _LOGGER.debug("Invoke ID: 0x%08X", invoke_id) + + # Optional date-time: if next byte == 0x09 then it's an octet string with time + if pos < len(apdu) and apdu[pos] == 0x09: + pos += 1 # skip type tag + dt_len = apdu[pos]; pos += 1 + _dt_bytes = apdu[pos:pos+dt_len]; pos += dt_len + _LOGGER.debug("Timestamp bytes: %s", _dt_bytes.hex()) + elif pos < len(apdu) and apdu[pos] == 0x00: + pos += 1 # optional field absent + + # Notification body is a structure containing the push data + objects, _ = self._decode_value(apdu, pos) + dlms_objects = self._extract_objects(objects) + + return ParseResult(success=True, objects=dlms_objects) + + def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]: + """Recursively decode a DLMS typed value. Returns (value, new_pos).""" + if pos >= len(data): + return None, pos + + dtype = data[pos]; pos += 1 + + if dtype == DLMS_TYPE_NULL: + return None, pos + + elif dtype == DLMS_TYPE_BOOL: + return bool(data[pos]), pos + 1 + + elif dtype == DLMS_TYPE_INT8: + return struct.unpack_from(">b", data, pos)[0], pos + 1 + + elif dtype == DLMS_TYPE_UINT8: + return data[pos], pos + 1 + + elif dtype == DLMS_TYPE_INT16: + return struct.unpack_from(">h", data, pos)[0], pos + 2 + + elif dtype == DLMS_TYPE_UINT16: + return struct.unpack_from(">H", data, pos)[0], pos + 2 + + elif dtype == DLMS_TYPE_INT32: + return struct.unpack_from(">i", data, pos)[0], pos + 4 + + elif dtype == DLMS_TYPE_UINT32: + return struct.unpack_from(">I", data, pos)[0], pos + 4 + + elif dtype == DLMS_TYPE_INT64: + return struct.unpack_from(">q", data, pos)[0], pos + 8 + + elif dtype == DLMS_TYPE_UINT64: + return struct.unpack_from(">Q", data, pos)[0], pos + 8 + + elif dtype == DLMS_TYPE_FLOAT32: + return struct.unpack_from(">f", data, pos)[0], pos + 4 + + elif dtype == DLMS_TYPE_FLOAT64: + return struct.unpack_from(">d", data, pos)[0], pos + 8 + + elif dtype in (DLMS_TYPE_OCTET_STRING, DLMS_TYPE_VISIBLE_STRING): + length, pos = self._decode_length(data, pos) + raw_bytes = data[pos:pos+length] + pos += length + if dtype == DLMS_TYPE_VISIBLE_STRING: + try: + return raw_bytes.decode("ascii", errors="replace"), pos + except Exception: + return raw_bytes.hex(), pos + return raw_bytes, pos + + elif dtype in (DLMS_TYPE_ARRAY, DLMS_TYPE_STRUCTURE, DLMS_TYPE_COMPACT_ARRAY): + count, pos = self._decode_length(data, pos) + items = [] + for _ in range(count): + val, pos = self._decode_value(data, pos) + items.append(val) + return items, pos + + else: + _LOGGER.warning("Unknown DLMS type 0x%02X at pos %d", dtype, pos) + return None, pos + + def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]: + """Decode BER-style length field.""" + first = data[pos]; pos += 1 + if first < 0x80: + return first, pos + num_bytes = first & 0x7F + length = 0 + for _ in range(num_bytes): + length = (length << 8) | data[pos]; pos += 1 + return length, pos + + def _extract_objects(self, notification_body: Any) -> list[DLMSObject]: + """ + Walk the decoded notification body and extract OBIS-keyed objects. + + The XT211 push notification body is a structure containing an array + of structures, each typically: + [OBIS bytes (6B octet-string), value (structure with scaler+unit), data] + + We try to handle both flat and nested layouts. + """ + objects = [] + if not isinstance(notification_body, list): + return objects + + # The outer structure may wrap an inner array + # Try to unwrap one level of nesting + payload = notification_body + if len(payload) == 1 and isinstance(payload[0], list): + payload = payload[0] + + for item in payload: + if not isinstance(item, list) or len(item) < 2: + continue + try: + obj = self._parse_cosem_entry(item) + if obj: + objects.append(obj) + except Exception as exc: + _LOGGER.debug("Could not parse COSEM entry %s: %s", item, exc) + + return objects + + def _parse_cosem_entry(self, entry: list) -> DLMSObject | None: + """ + Parse one COSEM entry from the push notification. + Expected layout: [obis_bytes, [scaler, unit], value] + or simplified: [obis_bytes, value] + """ + if len(entry) < 2: + return None + + # First element should be the OBIS code as 6-byte octet string + obis_raw = entry[0] + if not isinstance(obis_raw, (bytes, bytearray)) or len(obis_raw) != 6: + return None + + obis_str = self._format_obis(obis_raw) + + scaler = 0 + unit_code = 255 + value = None + + if len(entry) == 3: + # entry[1] = [scaler, unit], entry[2] = value + scaler_unit = entry[1] + if isinstance(scaler_unit, list) and len(scaler_unit) == 2: + raw_scaler = scaler_unit[0] + # scaler is signed int8 + if isinstance(raw_scaler, int): + scaler = raw_scaler if raw_scaler < 128 else raw_scaler - 256 + unit_code = scaler_unit[1] if isinstance(scaler_unit[1], int) else 255 + value = entry[2] + else: + value = entry[1] + + # Apply scaler to numeric values + if isinstance(value, int) and scaler != 0: + final_value: Any = apply_scaler(value, scaler) + elif isinstance(value, bytes): + # Try to decode as ASCII string (e.g. serial number) + try: + final_value = value.decode("ascii", errors="replace").strip("\x00") + except Exception: + final_value = value.hex() + else: + final_value = value + + unit_str = self.UNIT_MAP.get(unit_code, "") + + return DLMSObject( + obis=obis_str, + value=final_value, + unit=unit_str, + scaler=scaler, + ) + + @staticmethod + def _format_obis(raw: bytes) -> str: + """Convert 6 raw bytes to OBIS string notation A-B:C.D.E.F""" + if len(raw) != 6: + return raw.hex() + a, b, c, d, e, f = raw + return f"{a}-{b}:{c}.{d}.{e}.{f}" + + +# --------------------------------------------------------------------------- +# Convenience: known OBIS codes for the XT211 +# --------------------------------------------------------------------------- + +OBIS_DESCRIPTIONS: dict[str, dict] = { + "0-0:96.1.1.255": {"name": "Serial Number", "unit": "", "class": "text"}, + "0-0:96.3.10.255": {"name": "Disconnector Status", "unit": "", "class": "binary"}, + "0-0:96.14.0.255": {"name": "Current Tariff", "unit": "", "class": "text"}, + "1-0:1.7.0.255": {"name": "Active Power Consumption", "unit": "W", "class": "power"}, + "1-0:2.7.0.255": {"name": "Active Power Delivery", "unit": "W", "class": "power"}, + "1-0:21.7.0.255": {"name": "Active Power L1", "unit": "W", "class": "power"}, + "1-0:41.7.0.255": {"name": "Active Power L2", "unit": "W", "class": "power"}, + "1-0:61.7.0.255": {"name": "Active Power L3", "unit": "W", "class": "power"}, + "1-0:1.8.0.255": {"name": "Energy Consumed", "unit": "Wh", "class": "energy"}, + "1-0:1.8.1.255": {"name": "Energy Consumed T1", "unit": "Wh", "class": "energy"}, + "1-0:1.8.2.255": {"name": "Energy Consumed T2", "unit": "Wh", "class": "energy"}, + "1-0:1.8.3.255": {"name": "Energy Consumed T3", "unit": "Wh", "class": "energy"}, + "1-0:1.8.4.255": {"name": "Energy Consumed T4", "unit": "Wh", "class": "energy"}, + "1-0:2.8.0.255": {"name": "Energy Delivered", "unit": "Wh", "class": "energy"}, + "0-1:96.3.10.255": {"name": "Relay R1 Status", "unit": "", "class": "binary"}, + "0-2:96.3.10.255": {"name": "Relay R2 Status", "unit": "", "class": "binary"}, + "0-3:96.3.10.255": {"name": "Relay R3 Status", "unit": "", "class": "binary"}, + "0-4:96.3.10.255": {"name": "Relay R4 Status", "unit": "", "class": "binary"}, + "0-0:17.0.0.255": {"name": "Limiter Value", "unit": "W", "class": "power"}, +} diff --git a/custom_components/xt211_han/manifest.json b/custom_components/xt211_han/manifest.json new file mode 100644 index 0000000..fe9a8dc --- /dev/null +++ b/custom_components/xt211_han/manifest.json @@ -0,0 +1,12 @@ +{ + "domain": "xt211_han", + "name": "XT211 HAN (RS485 via Ethernet)", + "version": "0.1.0", + "documentation": "https://github.com/yourusername/xt211-han-ha", + "issue_tracker": "https://github.com/yourusername/xt211-han-ha/issues", + "dependencies": [], + "codeowners": ["@yourusername"], + "requirements": [], + "iot_class": "local_push", + "config_flow": true +} diff --git a/custom_components/xt211_han/sensor.py b/custom_components/xt211_han/sensor.py new file mode 100644 index 0000000..17250a0 --- /dev/null +++ b/custom_components/xt211_han/sensor.py @@ -0,0 +1,158 @@ +"""Sensor platform for XT211 HAN integration.""" + +from __future__ import annotations + +import logging +from typing import Any + +from homeassistant.components.sensor import ( + SensorDeviceClass, + SensorEntity, + SensorStateClass, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + CONF_NAME, + UnitOfEnergy, + UnitOfPower, +) +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import DOMAIN +from .coordinator import XT211Coordinator +from .dlms_parser import OBIS_DESCRIPTIONS + +_LOGGER = logging.getLogger(__name__) + +# Map OBIS "class" strings → HA SensorDeviceClass + StateClass + unit +SENSOR_META: dict[str, dict] = { + "power": { + "device_class": SensorDeviceClass.POWER, + "state_class": SensorStateClass.MEASUREMENT, + "unit": UnitOfPower.WATT, + }, + "energy": { + "device_class": SensorDeviceClass.ENERGY, + "state_class": SensorStateClass.TOTAL_INCREASING, + "unit": UnitOfEnergy.WATT_HOUR, + }, + "sensor": { + "device_class": None, + "state_class": SensorStateClass.MEASUREMENT, + "unit": None, + }, +} + +# OBIS codes that are NOT numeric sensors (text / binary) – handled separately +NON_SENSOR_CLASSES = {"text", "binary"} + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up XT211 HAN sensors from a config entry.""" + coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id] + + # We create entities for all known OBIS codes upfront. + # Unknown codes that arrive later will be added dynamically. + entities: list[XT211SensorEntity] = [] + registered_obis: set[str] = set() + + for obis, meta in OBIS_DESCRIPTIONS.items(): + if meta.get("class") in NON_SENSOR_CLASSES: + continue + entities.append( + XT211SensorEntity(coordinator, entry, obis, meta) + ) + registered_obis.add(obis) + + async_add_entities(entities) + + @callback + def _handle_new_obis(obis: str, data: dict) -> None: + """Dynamically add sensor for a previously unknown OBIS code.""" + if obis in registered_obis: + return + if data.get("class") in NON_SENSOR_CLASSES: + return + registered_obis.add(obis) + async_add_entities([XT211SensorEntity(coordinator, entry, obis, data)]) + + # Subscribe to coordinator updates to detect new OBIS codes + @callback + def _on_update() -> None: + if coordinator.data: + for obis, data in coordinator.data.items(): + _handle_new_obis(obis, data) + + coordinator.async_add_listener(_on_update) + + +class XT211SensorEntity(CoordinatorEntity[XT211Coordinator], SensorEntity): + """A single numeric sensor entity backed by an OBIS code.""" + + _attr_has_entity_name = True + + def __init__( + self, + coordinator: XT211Coordinator, + entry: ConfigEntry, + obis: str, + meta: dict, + ) -> None: + super().__init__(coordinator) + self._obis = obis + self._meta = meta + self._entry = entry + + sensor_type = meta.get("class", "sensor") + sm = SENSOR_META.get(sensor_type, SENSOR_META["sensor"]) + + self._attr_unique_id = f"{entry.entry_id}_{obis}" + self._attr_name = meta.get("name", obis) + self._attr_device_class = sm["device_class"] + self._attr_state_class = sm["state_class"] + self._attr_native_unit_of_measurement = sm["unit"] or meta.get("unit") + + # Energy sensors: convert Wh → kWh for HA Energy dashboard + if sensor_type == "energy": + self._attr_native_unit_of_measurement = UnitOfEnergy.KILO_WATT_HOUR + self._wh_to_kwh = True + else: + self._wh_to_kwh = False + + @property + def device_info(self) -> DeviceInfo: + return DeviceInfo( + identifiers={(DOMAIN, self._entry.entry_id)}, + name=self._entry.data.get(CONF_NAME, "XT211 HAN"), + manufacturer="Sagemcom", + model="XT211 AMM", + ) + + @property + def native_value(self) -> float | None: + if self.coordinator.data is None: + return None + obj = self.coordinator.data.get(self._obis) + if obj is None: + return None + raw = obj.get("value") + if raw is None: + return None + try: + val = float(raw) + if self._wh_to_kwh: + val = val / 1000.0 + return round(val, 3) + except (TypeError, ValueError): + return None + + @property + def available(self) -> bool: + return self.coordinator.connected and self.coordinator.data is not None diff --git a/custom_components/xt211_han/strings.json b/custom_components/xt211_han/strings.json new file mode 100644 index 0000000..138db1a --- /dev/null +++ b/custom_components/xt211_han/strings.json @@ -0,0 +1,22 @@ +{ + "config": { + "step": { + "user": { + "title": "Nastavení XT211 HAN adaptéru", + "description": "Zadej IP adresu a port RS485-to-Ethernet adaptéru (např. PUSR USR-DR134). Výchozí port pro TCP server mód je 8899.", + "data": { + "host": "IP adresa adaptéru", + "port": "TCP port", + "name": "Název zařízení" + } + } + }, + "error": { + "cannot_connect": "Nepodařilo se připojit k adaptéru. Zkontroluj IP adresu, port a připojení.", + "unknown": "Neočekávaná chyba. Zkontroluj log." + }, + "abort": { + "already_configured": "Toto zařízení je již nakonfigurováno." + } + } +} diff --git a/custom_components/xt211_han/translations/cs.json b/custom_components/xt211_han/translations/cs.json new file mode 100644 index 0000000..138db1a --- /dev/null +++ b/custom_components/xt211_han/translations/cs.json @@ -0,0 +1,22 @@ +{ + "config": { + "step": { + "user": { + "title": "Nastavení XT211 HAN adaptéru", + "description": "Zadej IP adresu a port RS485-to-Ethernet adaptéru (např. PUSR USR-DR134). Výchozí port pro TCP server mód je 8899.", + "data": { + "host": "IP adresa adaptéru", + "port": "TCP port", + "name": "Název zařízení" + } + } + }, + "error": { + "cannot_connect": "Nepodařilo se připojit k adaptéru. Zkontroluj IP adresu, port a připojení.", + "unknown": "Neočekávaná chyba. Zkontroluj log." + }, + "abort": { + "already_configured": "Toto zařízení je již nakonfigurováno." + } + } +} diff --git a/custom_components/xt211_han/translations/en.json b/custom_components/xt211_han/translations/en.json new file mode 100644 index 0000000..1b3fffe --- /dev/null +++ b/custom_components/xt211_han/translations/en.json @@ -0,0 +1,22 @@ +{ + "config": { + "step": { + "user": { + "title": "XT211 HAN Adapter Setup", + "description": "Enter the IP address and port of your RS485-to-Ethernet adapter (e.g. PUSR USR-DR134). The default TCP server port is 8899.", + "data": { + "host": "Adapter IP address", + "port": "TCP port", + "name": "Device name" + } + } + }, + "error": { + "cannot_connect": "Could not connect to the adapter. Check the IP address, port and network connection.", + "unknown": "Unexpected error. Check the log." + }, + "abort": { + "already_configured": "This device is already configured." + } + } +} diff --git a/hacs.json b/hacs.json new file mode 100644 index 0000000..c673f67 --- /dev/null +++ b/hacs.json @@ -0,0 +1,8 @@ +{ + "name": "XT211 HAN RS485 (via Ethernet)", + "description": "Home Assistant integration for Sagemcom XT211 smart meter via RS485-to-Ethernet adapter (e.g. USR-DR134). Reads DLMS/COSEM PUSH data without ESP32.", + "documentation": "https://github.com/yourusername/xt211-han-ha", + "issue_tracker": "https://github.com/yourusername/xt211-han-ha/issues", + "iot_class": "local_push", + "render_readme": true +} diff --git a/test_parser.py b/test_parser.py new file mode 100644 index 0000000..31bbbb3 --- /dev/null +++ b/test_parser.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +Standalone test / debug script for the DLMS parser and TCP listener. + +Usage: + # Parse a raw hex frame from the meter (paste from HA debug log): + python3 test_parser.py --hex "7ea0...7e" + + # Live listen on TCP socket (forward output to terminal): + python3 test_parser.py --host 192.168.1.100 --port 8899 + + # Replay a saved binary capture file: + python3 test_parser.py --file capture.bin +""" + +import argparse +import asyncio +import sys +import os + +# Allow running from repo root without installing +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "custom_components")) + +from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS + + +def print_result(result) -> None: + if not result.success: + print(f" ❌ Parse error: {result.error}") + return + if not result.objects: + print(" ⚠️ Frame OK but no DLMS objects extracted") + return + print(f" ✅ {len(result.objects)} OBIS objects decoded:") + for obj in result.objects: + meta = OBIS_DESCRIPTIONS.get(obj.obis, {}) + name = meta.get("name", obj.obis) + unit = obj.unit or meta.get("unit", "") + print(f" {obj.obis:25s} {name:35s} {obj.value} {unit}") + + +def test_hex(hex_str: str) -> None: + """Parse a single hex-encoded frame.""" + raw = bytes.fromhex(hex_str.replace(" ", "").replace("\n", "")) + print(f"\n📦 Frame: {len(raw)} bytes") + parser = DLMSParser() + parser.feed(raw) + result = parser.get_frame() + if result: + print_result(result) + else: + print(" ⚠️ No complete frame found in data") + + +def test_file(path: str) -> None: + """Parse all frames from a binary capture file.""" + with open(path, "rb") as f: + data = f.read() + print(f"\n📂 File: {path} ({len(data)} bytes)") + parser = DLMSParser() + parser.feed(data) + count = 0 + while True: + result = parser.get_frame() + if result is None: + break + count += 1 + print(f"\n--- Frame #{count} ---") + print_result(result) + print(f"\nTotal frames parsed: {count}") + + +async def listen_tcp(host: str, port: int) -> None: + """Connect to the TCP adapter and print decoded frames as they arrive.""" + print(f"\n🔌 Connecting to {host}:{port} ...") + reader, writer = await asyncio.open_connection(host, port) + print(" Connected. Waiting for DLMS PUSH frames (every ~60 s)...\n") + parser = DLMSParser() + frame_count = 0 + try: + while True: + chunk = await asyncio.wait_for(reader.read(4096), timeout=120) + if not chunk: + print("Connection closed by remote.") + break + parser.feed(chunk) + while True: + result = parser.get_frame() + if result is None: + break + frame_count += 1 + print(f"\n--- Frame #{frame_count} raw: {result.raw_hex[:40]}... ---") + print_result(result) + except asyncio.TimeoutError: + print("No data for 120 s, giving up.") + finally: + writer.close() + + +def main() -> None: + parser = argparse.ArgumentParser(description="XT211 DLMS parser test tool") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--hex", help="Hex-encoded raw frame to parse") + group.add_argument("--file", help="Binary capture file to parse") + group.add_argument("--host", help="Adapter IP address for live TCP test") + parser.add_argument("--port", type=int, default=8899, help="TCP port (default 8899)") + args = parser.parse_args() + + if args.hex: + test_hex(args.hex) + elif args.file: + test_file(args.file) + elif args.host: + asyncio.run(listen_tcp(args.host, args.port)) + + +if __name__ == "__main__": + main()