Add files via upload

main
nero150 2026-03-18 07:20:25 +01:00 committed by GitHub
parent 793ae5b593
commit 968ef18f43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1312 additions and 3 deletions

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2026 nero150
Copyright (c) 2026
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

140
README.md
View File

@ -1,2 +1,138 @@
# CEZ_rele_box
Připojení relé boxu do Home asistenta
# XT211 HAN Home Assistant Integration
[![hacs_badge](https://img.shields.io/badge/HACS-Custom-orange.svg)](https://github.com/hacs/integration)
![Maintained](https://img.shields.io/maintenance/yes/2026)
> **Čtení dat z elektroměru Sagemcom XT211 (ČEZ Distribuce) přes RS485-to-Ethernet adaptér bez ESP32.**
Tato integrace nahrazuje ESPHome řešení s ESP32 + RS485→TTL převodníkem. Místo toho používá průmyslový RS485-to-Ethernet adaptér (doporučen **PUSR USR-DR134**), který posílá syrová RS485 data přes TCP přímo do Home Assistantu.
---
## Jak to funguje
```
XT211 / WM-RelayBox
└── RJ12 HAN port (RS485, 9600 baud)
└── USR-DR134 (RS485 → Ethernet)
└── TCP socket (LAN)
└── Home Assistant (tato integrace)
```
Elektroměr posílá DLMS/COSEM PUSH zprávy každých **60 sekund**. Integrace udržuje persistentní TCP spojení k adaptéru a dekóduje příchozí HDLC rámce.
---
## Požadavky
- Home Assistant 2024.1+
- RS485-to-Ethernet adaptér s TCP server módem:
- **PUSR USR-DR134** (doporučeno) RS485, DIN rail, 524V
- Nebo jiný kompatibilní adaptér (USR-TCP232-410S, Waveshare, apod.)
---
## Instalace přes HACS
1. Otevři HACS → **Integrace** → tři tečky vpravo nahoře → **Vlastní repozitáře**
2. Přidej URL tohoto repozitáře, kategorie: **Integration**
3. Najdi „XT211 HAN" a nainstaluj
4. Restartuj Home Assistant
5. **Nastavení → Zařízení a služby → Přidat integraci → XT211 HAN**
---
## Nastavení adaptéru USR-DR134
Nastavení přes webové rozhraní adaptéru (výchozí IP `192.168.0.7`):
| Parametr | Hodnota |
|----------|---------|
| Work Mode | **TCP Server** |
| Local Port | `8899` (nebo libovolný) |
| Baud Rate | `9600` |
| Data Bits | `8` |
| Stop Bits | `1` |
| Parity | `None` |
| Flow Control | `None` |
> ⚠️ Použij model **USR-DR134** (RS485), ne DR132 (RS232)!
---
## Zapojení
```
WM-RelayBox HAN port (RJ12):
Pin 3 (Data A+) → USR-DR134 terminal A+
Pin 4 (Data B-) → USR-DR134 terminal B-
Pin 6 (GND) → USR-DR134 GND (volitelné)
```
Napájení USR-DR134: 524V DC (např. z USB adaptéru přes step-up, nebo 12V zdroj).
---
## Dostupné senzory
| Název | OBIS kód | Jednotka |
|-------|----------|----------|
| Active Power Consumption | `1-0:1.7.0.255` | W |
| Active Power Delivery | `1-0:2.7.0.255` | W |
| Active Power L1 | `1-0:21.7.0.255` | W |
| Active Power L2 | `1-0:41.7.0.255` | W |
| Active Power L3 | `1-0:61.7.0.255` | W |
| Energy Consumed | `1-0:1.8.0.255` | kWh |
| Energy Consumed T1 | `1-0:1.8.1.255` | kWh |
| Energy Consumed T2 | `1-0:1.8.2.255` | kWh |
| Energy Delivered | `1-0:2.8.0.255` | kWh |
| Serial Number | `0-0:96.1.1.255` | |
| Current Tariff | `0-0:96.14.0.255` | |
| Disconnector Status | `0-0:96.3.10.255` | |
---
## Ladění (debug)
Přidej do `configuration.yaml`:
```yaml
logger:
default: warning
logs:
custom_components.xt211_han: debug
```
V logu uvidíš surová hex data každého HDLC rámce a dekódované OBIS hodnoty.
---
## Struktura repozitáře
```
custom_components/xt211_han/
├── __init__.py # Inicializace integrace
├── manifest.json # Metadata pro HA / HACS
├── const.py # Konstanty
├── config_flow.py # UI průvodce nastavením
├── coordinator.py # TCP listener + DataUpdateCoordinator
├── sensor.py # Senzorová platforma
├── dlms_parser.py # HDLC / DLMS / COSEM parser
├── strings.json # Texty UI
└── translations/
├── cs.json # Čeština
└── en.json # Angličtina
```
---
## Poděkování / Credits
- [Tomer27cz/xt211](https://github.com/Tomer27cz/xt211) původní ESPHome komponenta a dokumentace protokolu
- ČEZ Distribuce dokumentace OBIS kódů a RS485 rozhraní
---
## Licence
MIT

View File

@ -0,0 +1,62 @@
"""XT211 HAN integration for Home Assistant.
Reads DLMS/COSEM PUSH data from a Sagemcom XT211 smart meter via a
RS485-to-Ethernet adapter (e.g. PUSR USR-DR134) over TCP.
No ESP32 or dedicated hardware needed beyond the adapter.
"""
from __future__ import annotations
import logging
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME, Platform
from homeassistant.core import HomeAssistant
from .const import DOMAIN, DEFAULT_NAME
from .coordinator import XT211Coordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up XT211 HAN from a config entry."""
hass.data.setdefault(DOMAIN, {})
coordinator = XT211Coordinator(
hass,
host=entry.data[CONF_HOST],
port=entry.data[CONF_PORT],
name=entry.data.get(CONF_NAME, DEFAULT_NAME),
)
hass.data[DOMAIN][entry.entry_id] = coordinator
# Start the background TCP listener
await coordinator.async_setup()
# Set up sensor platform
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
_LOGGER.info(
"XT211 HAN integration started for %s:%d",
entry.data[CONF_HOST],
entry.data[CONF_PORT],
)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
coordinator: XT211Coordinator = hass.data[DOMAIN].get(entry.entry_id)
if coordinator:
await coordinator.async_shutdown()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id, None)
return unload_ok

View File

@ -0,0 +1,85 @@
"""Config flow for XT211 HAN integration."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_NAME
from homeassistant.data_entry_flow import FlowResult
from .const import DOMAIN, DEFAULT_PORT, DEFAULT_NAME
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
async def _test_connection(host: str, port: int) -> str | None:
"""Try to open a TCP connection. Returns error string or None on success."""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=5,
)
writer.close()
await writer.wait_closed()
return None
except asyncio.TimeoutError:
return "cannot_connect"
except OSError:
return "cannot_connect"
except Exception:
return "unknown"
class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle the config flow for XT211 HAN."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
errors: dict[str, str] = {}
if user_input is not None:
host = user_input[CONF_HOST]
port = user_input[CONF_PORT]
name = user_input.get(CONF_NAME, DEFAULT_NAME)
# Prevent duplicate entries
await self.async_set_unique_id(f"{host}:{port}")
self._abort_if_unique_id_configured()
error = await _test_connection(host, port)
if error:
errors["base"] = error
else:
return self.async_create_entry(
title=f"{name} ({host}:{port})",
data={
CONF_HOST: host,
CONF_PORT: port,
CONF_NAME: name,
},
)
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_DATA_SCHEMA,
errors=errors,
description_placeholders={
"default_port": str(DEFAULT_PORT),
},
)

View File

@ -0,0 +1,10 @@
"""Constants for XT211 HAN integration."""
DOMAIN = "xt211_han"
CONF_HOST = "host"
CONF_PORT = "port"
CONF_NAME = "name"
DEFAULT_PORT = 8899
DEFAULT_NAME = "XT211 HAN"

View File

@ -0,0 +1,196 @@
"""
Coordinator for XT211 HAN integration.
Opens a persistent TCP connection to the RS485-to-Ethernet adapter
(e.g. USR-DR134) and receives DLMS/COSEM PUSH frames every 60 seconds.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import timedelta
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .dlms_parser import DLMSParser, DLMSObject, OBIS_DESCRIPTIONS
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
# We expect a frame every 60 s; allow some margin before timing out
PUSH_TIMEOUT = 90 # seconds
RECONNECT_DELAY = 10 # seconds after connection loss
class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
"""
Coordinator that maintains a persistent TCP connection to the
RS485-to-Ethernet adapter and decodes incoming DLMS PUSH frames.
Data is published to HA listeners whenever a new frame arrives,
not on a fixed poll interval (update_interval=None triggers manual).
"""
def __init__(
self,
hass: HomeAssistant,
host: str,
port: int,
name: str,
) -> None:
super().__init__(
hass,
_LOGGER,
name=f"XT211 HAN ({host}:{port})",
update_interval=None, # push-driven, not poll-driven
)
self.host = host
self.port = port
self.device_name = name
self._parser = DLMSParser()
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._listen_task: asyncio.Task | None = None
self._connected = False
# ------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------
@property
def connected(self) -> bool:
return self._connected
async def async_setup(self) -> None:
"""Start the background listener task."""
if self._listen_task is None or self._listen_task.done():
self._listen_task = self.hass.async_create_background_task(
self._listen_loop(),
name=f"xt211_han_{self.host}_{self.port}",
)
async def async_shutdown(self) -> None:
"""Stop the listener and close the connection."""
if self._listen_task:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
await self._disconnect()
# ------------------------------------------------------------------
# DataUpdateCoordinator required override
# ------------------------------------------------------------------
async def _async_update_data(self) -> dict[str, Any]:
"""Called by HA when entities want a refresh. Returns current data."""
if self.data is None:
return {}
return self.data
# ------------------------------------------------------------------
# TCP listener loop
# ------------------------------------------------------------------
async def _listen_loop(self) -> None:
"""Main loop: connect → receive → reconnect on error."""
while True:
try:
await self._connect()
await self._receive_loop()
except asyncio.CancelledError:
_LOGGER.info("XT211 listener task cancelled")
raise
except Exception as exc:
_LOGGER.warning(
"XT211 connection error (%s:%d): %s retrying in %ds",
self.host, self.port, exc, RECONNECT_DELAY,
)
self._connected = False
finally:
await self._disconnect()
await asyncio.sleep(RECONNECT_DELAY)
async def _connect(self) -> None:
_LOGGER.info("Connecting to XT211 adapter at %s:%d", self.host, self.port)
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=10,
)
self._connected = True
self._parser = DLMSParser() # reset parser state on new connection
_LOGGER.info("Connected to XT211 adapter at %s:%d", self.host, self.port)
async def _disconnect(self) -> None:
self._connected = False
if self._writer:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception:
pass
self._writer = None
self._reader = None
async def _receive_loop(self) -> None:
"""Read bytes from the TCP stream and feed them to the parser."""
assert self._reader is not None
while True:
try:
chunk = await asyncio.wait_for(
self._reader.read(4096),
timeout=PUSH_TIMEOUT,
)
except asyncio.TimeoutError:
_LOGGER.warning(
"No data from XT211 for %d s reconnecting", PUSH_TIMEOUT
)
raise ConnectionError("Push timeout")
if not chunk:
_LOGGER.warning("XT211 adapter closed connection")
raise ConnectionError("Remote closed")
self._parser.feed(chunk)
# Process all complete frames in the buffer
while True:
result = self._parser.get_frame()
if result is None:
break
if result.success:
await self._process_frame(result.objects)
else:
_LOGGER.debug(
"Frame parse error: %s (raw: %s)",
result.error, result.raw_hex[:80],
)
async def _process_frame(self, objects: list[DLMSObject]) -> None:
"""Update coordinator data from a decoded DLMS frame."""
if not objects:
_LOGGER.debug("Received empty DLMS frame")
return
current = dict(self.data or {})
for obj in objects:
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
current[obj.obis] = {
"value": obj.value,
"unit": obj.unit or meta.get("unit", ""),
"name": meta.get("name", obj.obis),
"class": meta.get("class", "sensor"),
}
_LOGGER.debug(
"OBIS %s = %s %s", obj.obis, obj.value, obj.unit
)
self.async_set_updated_data(current)
_LOGGER.debug("Updated %d DLMS objects from XT211 frame", len(objects))

View File

@ -0,0 +1,458 @@
"""
DLMS/COSEM PUSH mode parser for Sagemcom XT211 smart meter.
The XT211 sends unsolicited HDLC-framed DLMS/COSEM data every 60 seconds
over RS485 (9600 baud, 8N1). This module decodes those frames.
Frame structure (HDLC):
7E - HDLC flag
A0 xx - Frame type + length
00 02 00 01 ... - Destination / source addresses
13 - Control byte (UI frame)
xx xx - HCS (header checksum)
[LLC header] - E6 E7 00
[APDU] - DLMS application data (tag 0F = Data-notification)
xx xx - FCS (frame checksum)
7E - HDLC flag
OBIS codes supported (from ČEZ Distribuce spec):
0-0:96.1.1.255 - Serial number (Device ID)
0-0:96.3.10.255 - Disconnector status
0-0:96.14.0.255 - Current tariff
1-0:1.7.0.255 - Instant active power consumption (W)
1-0:2.7.0.255 - Instant active power delivery (W)
1-0:21.7.0.255 - Instant power L1 (W)
1-0:41.7.0.255 - Instant power L2 (W)
1-0:61.7.0.255 - Instant power L3 (W)
1-0:1.8.0.255 - Active energy consumed (Wh)
1-0:1.8.1.255 - Active energy T1 (Wh)
1-0:1.8.2.255 - Active energy T2 (Wh)
1-0:2.8.0.255 - Active energy delivered (Wh)
0-1:96.3.10.255 - Relay R1 status
0-2:96.3.10.255 - Relay R2 status
0-3:96.3.10.255 - Relay R3 status
0-4:96.3.10.255 - Relay R4 status
"""
from __future__ import annotations
import logging
import struct
from dataclasses import dataclass
from typing import Any
_LOGGER = logging.getLogger(__name__)
HDLC_FLAG = 0x7E
# DLMS data types
DLMS_TYPE_NULL = 0x00
DLMS_TYPE_BOOL = 0x03
DLMS_TYPE_INT8 = 0x0F
DLMS_TYPE_INT16 = 0x10
DLMS_TYPE_UINT8 = 0x11
DLMS_TYPE_UINT16 = 0x12
DLMS_TYPE_INT32 = 0x05
DLMS_TYPE_UINT32 = 0x06
DLMS_TYPE_INT64 = 0x14
DLMS_TYPE_UINT64 = 0x15
DLMS_TYPE_FLOAT32 = 0x16
DLMS_TYPE_FLOAT64 = 0x17
DLMS_TYPE_OCTET_STRING = 0x09
DLMS_TYPE_VISIBLE_STRING = 0x0A
DLMS_TYPE_ARRAY = 0x01
DLMS_TYPE_STRUCTURE = 0x02
DLMS_TYPE_COMPACT_ARRAY = 0x13
# SI unit multipliers (DLMS scaler)
# Scaler is a signed int8 representing 10^scaler
def apply_scaler(value: int | float, scaler: int) -> float:
"""Apply DLMS scaler (10^scaler) to a raw value."""
return float(value) * (10 ** scaler)
@dataclass
class DLMSObject:
"""A single decoded DLMS COSEM object."""
obis: str # e.g. "1-0:1.8.0.255"
value: Any # decoded Python value
unit: str = "" # e.g. "W", "Wh", ""
scaler: int = 0 # raw scaler from frame
@dataclass
class ParseResult:
"""Result of parsing one HDLC frame."""
success: bool
objects: list[DLMSObject]
raw_hex: str = ""
error: str = ""
class DLMSParser:
"""
Stateful DLMS/COSEM PUSH mode parser for XT211.
Usage:
parser = DLMSParser()
parser.feed(bytes_from_tcp)
while (result := parser.get_frame()):
process(result)
"""
# DLMS unit codes → human readable strings
UNIT_MAP = {
1: "a", 2: "mo", 3: "wk", 4: "d", 5: "h",
6: "min", 7: "s", 8: "°", 9: "°C", 10: "currency",
11: "m", 12: "m/s", 13: "", 14: "", 15: "m³/h",
16: "m³/h", 17: "m³/d", 18: "m³/d", 19: "l", 20: "kg",
21: "N", 22: "Nm", 23: "Pa", 24: "bar", 25: "J",
26: "J/h", 27: "W", 28: "VA", 29: "var", 30: "Wh",
31: "VAh", 32: "varh", 33: "A", 34: "C", 35: "V",
36: "V/m", 37: "F", 38: "Ω", 39: "Ωm²/m",40: "Wb",
41: "T", 42: "A/m", 43: "H", 44: "Hz", 45: "1/Wh",
46: "1/varh",47: "1/VAh",48: "V²h", 49: "A²h", 50: "kg/s",
51: "S", 52: "K", 53: "1/(V²h)",54: "1/(A²h)",
255: "", 0: "",
}
def __init__(self) -> None:
self._buffer = bytearray()
def feed(self, data: bytes) -> None:
"""Add raw bytes from TCP socket to the internal buffer."""
self._buffer.extend(data)
def get_frame(self) -> ParseResult | None:
"""
Try to extract and parse one complete HDLC frame from the buffer.
Returns ParseResult if a frame was found, None if more data is needed.
"""
buf = self._buffer
# Find opening flag
start = buf.find(HDLC_FLAG)
if start == -1:
self._buffer.clear()
return None
if start > 0:
_LOGGER.debug("Discarding %d bytes before HDLC flag", start)
del self._buffer[:start]
buf = self._buffer
if len(buf) < 3:
return None
# Parse frame length from bytes 1-2 (A0 XX or A8 XX)
# Bits 11-0 of bytes 1-2 give frame length
frame_len = ((buf[1] & 0x07) << 8) | buf[2]
# Total on-wire length = frame_len + 2 flags (opening already at 0, closing at frame_len+1)
total = frame_len + 2
if len(buf) < total:
return None # incomplete frame, wait for more data
raw = bytes(buf[:total])
del self._buffer[:total]
raw_hex = raw.hex()
_LOGGER.debug("HDLC frame: %s", raw_hex)
# Basic sanity: starts and ends with 0x7E
if raw[0] != HDLC_FLAG or raw[-1] != HDLC_FLAG:
return ParseResult(success=False, raw_hex=raw_hex, error="Missing HDLC flags")
try:
result = self._parse_hdlc(raw)
result.raw_hex = raw_hex
return result
except Exception as exc:
_LOGGER.exception("Error parsing HDLC frame")
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
# ------------------------------------------------------------------
# Internal parsing methods
# ------------------------------------------------------------------
def _parse_hdlc(self, raw: bytes) -> ParseResult:
"""Parse full HDLC frame and extract DLMS objects."""
pos = 1 # skip opening flag
# Frame format byte (should be A0 or A8)
# bits 11-0 = length
_frame_type = raw[pos] & 0xF8
frame_len = ((raw[pos] & 0x07) << 8) | raw[pos + 1]
pos += 2
# Destination address (variable length, LSB=1 means last byte)
dest_addr, pos = self._read_hdlc_address(raw, pos)
# Source address
src_addr, pos = self._read_hdlc_address(raw, pos)
# Control byte
control = raw[pos]; pos += 1
# HCS (2 bytes header checksum) - skip
pos += 2
# From here: LLC + APDU
# LLC header: E6 E7 00 (or E6 E6 00 for request)
if pos + 3 > len(raw) - 3:
return ParseResult(success=False, error="Frame too short for LLC")
llc = raw[pos:pos+3]; pos += 3
_LOGGER.debug("LLC: %s dest=%s src=%s", llc.hex(), dest_addr, src_addr)
# APDU starts here, ends 3 bytes before end (FCS + closing flag)
apdu = raw[pos:-3]
_LOGGER.debug("APDU (%d bytes): %s", len(apdu), apdu.hex())
return self._parse_apdu(apdu)
def _read_hdlc_address(self, data: bytes, pos: int) -> tuple[int, int]:
"""Read HDLC variable-length address. Returns (address_value, new_pos)."""
addr = 0
shift = 0
while pos < len(data):
byte = data[pos]; pos += 1
addr |= (byte >> 1) << shift
shift += 7
if byte & 0x01: # last byte of address
break
return addr, pos
def _parse_apdu(self, apdu: bytes) -> ParseResult:
"""Parse DLMS APDU (Data-Notification = tag 0x0F)."""
if not apdu:
return ParseResult(success=False, error="Empty APDU")
tag = apdu[0]
if tag != 0x0F:
return ParseResult(
success=False,
error=f"Unexpected APDU tag 0x{tag:02X} (expected 0x0F Data-Notification)"
)
# Data-Notification structure:
# 0F [long-invoke-id-and-priority 4B] [date-time opt] [notification-body]
pos = 1
if len(apdu) < 5:
return ParseResult(success=False, error="APDU too short")
# Long invoke-id-and-priority (4 bytes)
invoke_id = struct.unpack_from(">I", apdu, pos)[0]; pos += 4
_LOGGER.debug("Invoke ID: 0x%08X", invoke_id)
# Optional date-time: if next byte == 0x09 then it's an octet string with time
if pos < len(apdu) and apdu[pos] == 0x09:
pos += 1 # skip type tag
dt_len = apdu[pos]; pos += 1
_dt_bytes = apdu[pos:pos+dt_len]; pos += dt_len
_LOGGER.debug("Timestamp bytes: %s", _dt_bytes.hex())
elif pos < len(apdu) and apdu[pos] == 0x00:
pos += 1 # optional field absent
# Notification body is a structure containing the push data
objects, _ = self._decode_value(apdu, pos)
dlms_objects = self._extract_objects(objects)
return ParseResult(success=True, objects=dlms_objects)
def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]:
"""Recursively decode a DLMS typed value. Returns (value, new_pos)."""
if pos >= len(data):
return None, pos
dtype = data[pos]; pos += 1
if dtype == DLMS_TYPE_NULL:
return None, pos
elif dtype == DLMS_TYPE_BOOL:
return bool(data[pos]), pos + 1
elif dtype == DLMS_TYPE_INT8:
return struct.unpack_from(">b", data, pos)[0], pos + 1
elif dtype == DLMS_TYPE_UINT8:
return data[pos], pos + 1
elif dtype == DLMS_TYPE_INT16:
return struct.unpack_from(">h", data, pos)[0], pos + 2
elif dtype == DLMS_TYPE_UINT16:
return struct.unpack_from(">H", data, pos)[0], pos + 2
elif dtype == DLMS_TYPE_INT32:
return struct.unpack_from(">i", data, pos)[0], pos + 4
elif dtype == DLMS_TYPE_UINT32:
return struct.unpack_from(">I", data, pos)[0], pos + 4
elif dtype == DLMS_TYPE_INT64:
return struct.unpack_from(">q", data, pos)[0], pos + 8
elif dtype == DLMS_TYPE_UINT64:
return struct.unpack_from(">Q", data, pos)[0], pos + 8
elif dtype == DLMS_TYPE_FLOAT32:
return struct.unpack_from(">f", data, pos)[0], pos + 4
elif dtype == DLMS_TYPE_FLOAT64:
return struct.unpack_from(">d", data, pos)[0], pos + 8
elif dtype in (DLMS_TYPE_OCTET_STRING, DLMS_TYPE_VISIBLE_STRING):
length, pos = self._decode_length(data, pos)
raw_bytes = data[pos:pos+length]
pos += length
if dtype == DLMS_TYPE_VISIBLE_STRING:
try:
return raw_bytes.decode("ascii", errors="replace"), pos
except Exception:
return raw_bytes.hex(), pos
return raw_bytes, pos
elif dtype in (DLMS_TYPE_ARRAY, DLMS_TYPE_STRUCTURE, DLMS_TYPE_COMPACT_ARRAY):
count, pos = self._decode_length(data, pos)
items = []
for _ in range(count):
val, pos = self._decode_value(data, pos)
items.append(val)
return items, pos
else:
_LOGGER.warning("Unknown DLMS type 0x%02X at pos %d", dtype, pos)
return None, pos
def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]:
"""Decode BER-style length field."""
first = data[pos]; pos += 1
if first < 0x80:
return first, pos
num_bytes = first & 0x7F
length = 0
for _ in range(num_bytes):
length = (length << 8) | data[pos]; pos += 1
return length, pos
def _extract_objects(self, notification_body: Any) -> list[DLMSObject]:
"""
Walk the decoded notification body and extract OBIS-keyed objects.
The XT211 push notification body is a structure containing an array
of structures, each typically:
[OBIS bytes (6B octet-string), value (structure with scaler+unit), data]
We try to handle both flat and nested layouts.
"""
objects = []
if not isinstance(notification_body, list):
return objects
# The outer structure may wrap an inner array
# Try to unwrap one level of nesting
payload = notification_body
if len(payload) == 1 and isinstance(payload[0], list):
payload = payload[0]
for item in payload:
if not isinstance(item, list) or len(item) < 2:
continue
try:
obj = self._parse_cosem_entry(item)
if obj:
objects.append(obj)
except Exception as exc:
_LOGGER.debug("Could not parse COSEM entry %s: %s", item, exc)
return objects
def _parse_cosem_entry(self, entry: list) -> DLMSObject | None:
"""
Parse one COSEM entry from the push notification.
Expected layout: [obis_bytes, [scaler, unit], value]
or simplified: [obis_bytes, value]
"""
if len(entry) < 2:
return None
# First element should be the OBIS code as 6-byte octet string
obis_raw = entry[0]
if not isinstance(obis_raw, (bytes, bytearray)) or len(obis_raw) != 6:
return None
obis_str = self._format_obis(obis_raw)
scaler = 0
unit_code = 255
value = None
if len(entry) == 3:
# entry[1] = [scaler, unit], entry[2] = value
scaler_unit = entry[1]
if isinstance(scaler_unit, list) and len(scaler_unit) == 2:
raw_scaler = scaler_unit[0]
# scaler is signed int8
if isinstance(raw_scaler, int):
scaler = raw_scaler if raw_scaler < 128 else raw_scaler - 256
unit_code = scaler_unit[1] if isinstance(scaler_unit[1], int) else 255
value = entry[2]
else:
value = entry[1]
# Apply scaler to numeric values
if isinstance(value, int) and scaler != 0:
final_value: Any = apply_scaler(value, scaler)
elif isinstance(value, bytes):
# Try to decode as ASCII string (e.g. serial number)
try:
final_value = value.decode("ascii", errors="replace").strip("\x00")
except Exception:
final_value = value.hex()
else:
final_value = value
unit_str = self.UNIT_MAP.get(unit_code, "")
return DLMSObject(
obis=obis_str,
value=final_value,
unit=unit_str,
scaler=scaler,
)
@staticmethod
def _format_obis(raw: bytes) -> str:
"""Convert 6 raw bytes to OBIS string notation A-B:C.D.E.F"""
if len(raw) != 6:
return raw.hex()
a, b, c, d, e, f = raw
return f"{a}-{b}:{c}.{d}.{e}.{f}"
# ---------------------------------------------------------------------------
# Convenience: known OBIS codes for the XT211
# ---------------------------------------------------------------------------
OBIS_DESCRIPTIONS: dict[str, dict] = {
"0-0:96.1.1.255": {"name": "Serial Number", "unit": "", "class": "text"},
"0-0:96.3.10.255": {"name": "Disconnector Status", "unit": "", "class": "binary"},
"0-0:96.14.0.255": {"name": "Current Tariff", "unit": "", "class": "text"},
"1-0:1.7.0.255": {"name": "Active Power Consumption", "unit": "W", "class": "power"},
"1-0:2.7.0.255": {"name": "Active Power Delivery", "unit": "W", "class": "power"},
"1-0:21.7.0.255": {"name": "Active Power L1", "unit": "W", "class": "power"},
"1-0:41.7.0.255": {"name": "Active Power L2", "unit": "W", "class": "power"},
"1-0:61.7.0.255": {"name": "Active Power L3", "unit": "W", "class": "power"},
"1-0:1.8.0.255": {"name": "Energy Consumed", "unit": "Wh", "class": "energy"},
"1-0:1.8.1.255": {"name": "Energy Consumed T1", "unit": "Wh", "class": "energy"},
"1-0:1.8.2.255": {"name": "Energy Consumed T2", "unit": "Wh", "class": "energy"},
"1-0:1.8.3.255": {"name": "Energy Consumed T3", "unit": "Wh", "class": "energy"},
"1-0:1.8.4.255": {"name": "Energy Consumed T4", "unit": "Wh", "class": "energy"},
"1-0:2.8.0.255": {"name": "Energy Delivered", "unit": "Wh", "class": "energy"},
"0-1:96.3.10.255": {"name": "Relay R1 Status", "unit": "", "class": "binary"},
"0-2:96.3.10.255": {"name": "Relay R2 Status", "unit": "", "class": "binary"},
"0-3:96.3.10.255": {"name": "Relay R3 Status", "unit": "", "class": "binary"},
"0-4:96.3.10.255": {"name": "Relay R4 Status", "unit": "", "class": "binary"},
"0-0:17.0.0.255": {"name": "Limiter Value", "unit": "W", "class": "power"},
}

View File

@ -0,0 +1,12 @@
{
"domain": "xt211_han",
"name": "XT211 HAN (RS485 via Ethernet)",
"version": "0.1.0",
"documentation": "https://github.com/yourusername/xt211-han-ha",
"issue_tracker": "https://github.com/yourusername/xt211-han-ha/issues",
"dependencies": [],
"codeowners": ["@yourusername"],
"requirements": [],
"iot_class": "local_push",
"config_flow": true
}

View File

@ -0,0 +1,158 @@
"""Sensor platform for XT211 HAN integration."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_NAME,
UnitOfEnergy,
UnitOfPower,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import XT211Coordinator
from .dlms_parser import OBIS_DESCRIPTIONS
_LOGGER = logging.getLogger(__name__)
# Map OBIS "class" strings → HA SensorDeviceClass + StateClass + unit
SENSOR_META: dict[str, dict] = {
"power": {
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
"unit": UnitOfPower.WATT,
},
"energy": {
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
"unit": UnitOfEnergy.WATT_HOUR,
},
"sensor": {
"device_class": None,
"state_class": SensorStateClass.MEASUREMENT,
"unit": None,
},
}
# OBIS codes that are NOT numeric sensors (text / binary) handled separately
NON_SENSOR_CLASSES = {"text", "binary"}
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up XT211 HAN sensors from a config entry."""
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
# We create entities for all known OBIS codes upfront.
# Unknown codes that arrive later will be added dynamically.
entities: list[XT211SensorEntity] = []
registered_obis: set[str] = set()
for obis, meta in OBIS_DESCRIPTIONS.items():
if meta.get("class") in NON_SENSOR_CLASSES:
continue
entities.append(
XT211SensorEntity(coordinator, entry, obis, meta)
)
registered_obis.add(obis)
async_add_entities(entities)
@callback
def _handle_new_obis(obis: str, data: dict) -> None:
"""Dynamically add sensor for a previously unknown OBIS code."""
if obis in registered_obis:
return
if data.get("class") in NON_SENSOR_CLASSES:
return
registered_obis.add(obis)
async_add_entities([XT211SensorEntity(coordinator, entry, obis, data)])
# Subscribe to coordinator updates to detect new OBIS codes
@callback
def _on_update() -> None:
if coordinator.data:
for obis, data in coordinator.data.items():
_handle_new_obis(obis, data)
coordinator.async_add_listener(_on_update)
class XT211SensorEntity(CoordinatorEntity[XT211Coordinator], SensorEntity):
"""A single numeric sensor entity backed by an OBIS code."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: XT211Coordinator,
entry: ConfigEntry,
obis: str,
meta: dict,
) -> None:
super().__init__(coordinator)
self._obis = obis
self._meta = meta
self._entry = entry
sensor_type = meta.get("class", "sensor")
sm = SENSOR_META.get(sensor_type, SENSOR_META["sensor"])
self._attr_unique_id = f"{entry.entry_id}_{obis}"
self._attr_name = meta.get("name", obis)
self._attr_device_class = sm["device_class"]
self._attr_state_class = sm["state_class"]
self._attr_native_unit_of_measurement = sm["unit"] or meta.get("unit")
# Energy sensors: convert Wh → kWh for HA Energy dashboard
if sensor_type == "energy":
self._attr_native_unit_of_measurement = UnitOfEnergy.KILO_WATT_HOUR
self._wh_to_kwh = True
else:
self._wh_to_kwh = False
@property
def device_info(self) -> DeviceInfo:
return DeviceInfo(
identifiers={(DOMAIN, self._entry.entry_id)},
name=self._entry.data.get(CONF_NAME, "XT211 HAN"),
manufacturer="Sagemcom",
model="XT211 AMM",
)
@property
def native_value(self) -> float | None:
if self.coordinator.data is None:
return None
obj = self.coordinator.data.get(self._obis)
if obj is None:
return None
raw = obj.get("value")
if raw is None:
return None
try:
val = float(raw)
if self._wh_to_kwh:
val = val / 1000.0
return round(val, 3)
except (TypeError, ValueError):
return None
@property
def available(self) -> bool:
return self.coordinator.connected and self.coordinator.data is not None

View File

@ -0,0 +1,22 @@
{
"config": {
"step": {
"user": {
"title": "Nastavení XT211 HAN adaptéru",
"description": "Zadej IP adresu a port RS485-to-Ethernet adaptéru (např. PUSR USR-DR134). Výchozí port pro TCP server mód je 8899.",
"data": {
"host": "IP adresa adaptéru",
"port": "TCP port",
"name": "Název zařízení"
}
}
},
"error": {
"cannot_connect": "Nepodařilo se připojit k adaptéru. Zkontroluj IP adresu, port a připojení.",
"unknown": "Neočekávaná chyba. Zkontroluj log."
},
"abort": {
"already_configured": "Toto zařízení je již nakonfigurováno."
}
}
}

View File

@ -0,0 +1,22 @@
{
"config": {
"step": {
"user": {
"title": "Nastavení XT211 HAN adaptéru",
"description": "Zadej IP adresu a port RS485-to-Ethernet adaptéru (např. PUSR USR-DR134). Výchozí port pro TCP server mód je 8899.",
"data": {
"host": "IP adresa adaptéru",
"port": "TCP port",
"name": "Název zařízení"
}
}
},
"error": {
"cannot_connect": "Nepodařilo se připojit k adaptéru. Zkontroluj IP adresu, port a připojení.",
"unknown": "Neočekávaná chyba. Zkontroluj log."
},
"abort": {
"already_configured": "Toto zařízení je již nakonfigurováno."
}
}
}

View File

@ -0,0 +1,22 @@
{
"config": {
"step": {
"user": {
"title": "XT211 HAN Adapter Setup",
"description": "Enter the IP address and port of your RS485-to-Ethernet adapter (e.g. PUSR USR-DR134). The default TCP server port is 8899.",
"data": {
"host": "Adapter IP address",
"port": "TCP port",
"name": "Device name"
}
}
},
"error": {
"cannot_connect": "Could not connect to the adapter. Check the IP address, port and network connection.",
"unknown": "Unexpected error. Check the log."
},
"abort": {
"already_configured": "This device is already configured."
}
}
}

8
hacs.json Normal file
View File

@ -0,0 +1,8 @@
{
"name": "XT211 HAN RS485 (via Ethernet)",
"description": "Home Assistant integration for Sagemcom XT211 smart meter via RS485-to-Ethernet adapter (e.g. USR-DR134). Reads DLMS/COSEM PUSH data without ESP32.",
"documentation": "https://github.com/yourusername/xt211-han-ha",
"issue_tracker": "https://github.com/yourusername/xt211-han-ha/issues",
"iot_class": "local_push",
"render_readme": true
}

118
test_parser.py Normal file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Standalone test / debug script for the DLMS parser and TCP listener.
Usage:
# Parse a raw hex frame from the meter (paste from HA debug log):
python3 test_parser.py --hex "7ea0...7e"
# Live listen on TCP socket (forward output to terminal):
python3 test_parser.py --host 192.168.1.100 --port 8899
# Replay a saved binary capture file:
python3 test_parser.py --file capture.bin
"""
import argparse
import asyncio
import sys
import os
# Allow running from repo root without installing
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "custom_components"))
from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS
def print_result(result) -> None:
if not result.success:
print(f" ❌ Parse error: {result.error}")
return
if not result.objects:
print(" ⚠️ Frame OK but no DLMS objects extracted")
return
print(f"{len(result.objects)} OBIS objects decoded:")
for obj in result.objects:
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
name = meta.get("name", obj.obis)
unit = obj.unit or meta.get("unit", "")
print(f" {obj.obis:25s} {name:35s} {obj.value} {unit}")
def test_hex(hex_str: str) -> None:
"""Parse a single hex-encoded frame."""
raw = bytes.fromhex(hex_str.replace(" ", "").replace("\n", ""))
print(f"\n📦 Frame: {len(raw)} bytes")
parser = DLMSParser()
parser.feed(raw)
result = parser.get_frame()
if result:
print_result(result)
else:
print(" ⚠️ No complete frame found in data")
def test_file(path: str) -> None:
"""Parse all frames from a binary capture file."""
with open(path, "rb") as f:
data = f.read()
print(f"\n📂 File: {path} ({len(data)} bytes)")
parser = DLMSParser()
parser.feed(data)
count = 0
while True:
result = parser.get_frame()
if result is None:
break
count += 1
print(f"\n--- Frame #{count} ---")
print_result(result)
print(f"\nTotal frames parsed: {count}")
async def listen_tcp(host: str, port: int) -> None:
"""Connect to the TCP adapter and print decoded frames as they arrive."""
print(f"\n🔌 Connecting to {host}:{port} ...")
reader, writer = await asyncio.open_connection(host, port)
print(" Connected. Waiting for DLMS PUSH frames (every ~60 s)...\n")
parser = DLMSParser()
frame_count = 0
try:
while True:
chunk = await asyncio.wait_for(reader.read(4096), timeout=120)
if not chunk:
print("Connection closed by remote.")
break
parser.feed(chunk)
while True:
result = parser.get_frame()
if result is None:
break
frame_count += 1
print(f"\n--- Frame #{frame_count} raw: {result.raw_hex[:40]}... ---")
print_result(result)
except asyncio.TimeoutError:
print("No data for 120 s, giving up.")
finally:
writer.close()
def main() -> None:
parser = argparse.ArgumentParser(description="XT211 DLMS parser test tool")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--hex", help="Hex-encoded raw frame to parse")
group.add_argument("--file", help="Binary capture file to parse")
group.add_argument("--host", help="Adapter IP address for live TCP test")
parser.add_argument("--port", type=int, default=8899, help="TCP port (default 8899)")
args = parser.parse_args()
if args.hex:
test_hex(args.hex)
elif args.file:
test_file(args.file)
elif args.host:
asyncio.run(listen_tcp(args.host, args.port))
if __name__ == "__main__":
main()