Compare commits

..

7 Commits
0.8.3 ... main

Author SHA1 Message Date
Nero b80cd0d443
Update README.md 2026-03-19 11:01:09 +01:00
Nero 641a8eb531
Add files via upload 2026-03-19 10:45:06 +01:00
Nero a8b69d2487
Add files via upload 2026-03-19 10:22:57 +01:00
Nero 67be2ae88a
Add files via upload 2026-03-19 08:35:51 +01:00
Nero 8432ae6153
Add files via upload 2026-03-19 08:08:13 +01:00
Nero 900aed571b
Add files via upload 2026-03-19 08:06:10 +01:00
Nero f96b6e6210
Add files via upload 2026-03-19 07:09:53 +01:00
15 changed files with 93 additions and 387 deletions

View File

@ -1,5 +1,18 @@
# Changelog
## 0.8.9
- opraven HACS release balík pro `content_in_root: true`
- release asset `xt211_han.zip` už obsahuje přímo soubory integrace v rootu ZIPu
- doplněno `content_in_root` do `hacs.json`
- sjednocena verze balíku na 0.8.9
## 0.8.7
- zvýšena verze balíku na 0.8.7
- sjednocena verze v kořenovém i integračním `manifest.json`
- doplněny GitHub Actions pro HACS validaci a release ZIP asset
- vyčištěn README od interní citační značky
- připraven kompletní balík repozitáře pro HACS release workflow
## 0.8.0
- doplněno README o reálné nastavení převodníku a screenshoty
- přidána složka `docs/images` se screenshoty konfigurace a výsledku v Home Assistantu

View File

@ -63,7 +63,7 @@ Použité nastavení na funkční sestavě:
### 3. Kontrola, že převodník opravdu posílá data
Na stavové stránce převodníku je vidět aktivní klient a počitadlo `TX/RX Count`. Pokud **roste TX počet bajtů**, převodník normálně odesílá data z elektroměru do sítě. Na ukázce níže je vidět připojený klient `192.168.0.190` a narůstající `TX Count`, zatímco `RX` zůstává nulové. To odpovídá jednosměrnému provozu elektroměr → zákazník, který je uvedený i v dokumentaci ČEZ. fileciteturn6file0
Na stavové stránce převodníku je vidět aktivní klient a počitadlo `TX/RX Count`. Pokud **roste TX počet bajtů**, převodník normálně odesílá data z elektroměru do sítě. Na ukázce níže je vidět připojený klient `192.168.0.190` a narůstající `TX Count`, zatímco `RX` zůstává nulové. To odpovídá jednosměrnému provozu elektroměr → zákazník, který je uvedený i v dokumentaci ČEZ.
![Status převodníku a TX počitadlo](docs/images/converter_status_tx.png)

View File

@ -15,21 +15,11 @@ from .sensor import BINARY_OBIS, build_enabled_obis, _device_info
from .dlms_parser import OBIS_DESCRIPTIONS
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
enabled_obis = build_enabled_obis(entry)
entities = [
XT211BinarySensorEntity(coordinator, entry, obis, meta)
for obis, meta in OBIS_DESCRIPTIONS.items()
if obis in enabled_obis and obis in BINARY_OBIS
]
entities = [XT211BinarySensorEntity(coordinator, entry, obis, meta) for obis, meta in OBIS_DESCRIPTIONS.items() if obis in enabled_obis and obis in BINARY_OBIS]
async_add_entities(entities)
registered_obis = {entity._obis for entity in entities}
@callback

View File

@ -15,59 +15,18 @@ from homeassistant.components import dhcp
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
from homeassistant.data_entry_flow import FlowResult
from .const import (
CONF_HAS_FVE,
CONF_PHASES,
CONF_RELAY_COUNT,
CONF_TARIFFS,
DEFAULT_NAME,
DEFAULT_PORT,
DOMAIN,
PHASES_1,
PHASES_3,
RELAYS_0,
RELAYS_4,
RELAYS_6,
TARIFFS_1,
TARIFFS_2,
TARIFFS_4,
)
from .const import CONF_HAS_FVE, CONF_PHASES, CONF_RELAY_COUNT, CONF_TARIFFS, DEFAULT_NAME, DEFAULT_PORT, DOMAIN, PHASES_1, PHASES_3, RELAYS_0, RELAYS_4, RELAYS_6, TARIFFS_1, TARIFFS_2, TARIFFS_4
_LOGGER = logging.getLogger(__name__)
USR_IOT_MAC_PREFIXES = ("d8b04c", "b4e62d")
MANUAL_CHOICE = "__manual__"
STEP_CONNECTION_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
STEP_METER_SCHEMA = vol.Schema(
{
vol.Required(CONF_PHASES, default=PHASES_3): vol.In(
{PHASES_1: "Jednofázový", PHASES_3: "Třífázový"}
),
STEP_CONNECTION_SCHEMA = vol.Schema({vol.Required(CONF_HOST): str, vol.Required(CONF_PORT, default=DEFAULT_PORT): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str})
STEP_METER_SCHEMA = vol.Schema({
vol.Required(CONF_PHASES, default=PHASES_3): vol.In({PHASES_1: "Jednofázový", PHASES_3: "Třífázový"}),
vol.Required(CONF_HAS_FVE, default=False): bool,
vol.Required(CONF_TARIFFS, default=TARIFFS_2): vol.In(
{
TARIFFS_1: "Jeden tarif (pouze T1)",
TARIFFS_2: "Dva tarify (T1 + T2)",
TARIFFS_4: "Čtyři tarify (T1 T4)",
}
),
vol.Required(CONF_RELAY_COUNT, default=RELAYS_4): vol.In(
{
RELAYS_0: "Žádné relé",
RELAYS_4: "WM-RelayBox (R1 R4)",
RELAYS_6: "WM-RelayBox rozšířený (R1 R6)",
}
),
}
)
vol.Required(CONF_TARIFFS, default=TARIFFS_2): vol.In({TARIFFS_1: "Jeden tarif (pouze T1)", TARIFFS_2: "Dva tarify (T1 + T2)", TARIFFS_4: "Čtyři tarify (T1 T4)"}),
vol.Required(CONF_RELAY_COUNT, default=RELAYS_4): vol.In({RELAYS_0: "Žádné relé", RELAYS_4: "WM-RelayBox (R1 R4)", RELAYS_6: "WM-RelayBox rozšířený (R1 R6)"}),
})
async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str | None:
@ -80,7 +39,7 @@ async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str |
return "cannot_connect"
except OSError:
return "cannot_connect"
except Exception: # pragma: no cover - defensive
except Exception:
return "unknown"
@ -96,15 +55,12 @@ async def _scan_network(port: int, timeout: float = 1.0) -> list[str]:
local_ip = socket.gethostbyname(socket.gethostname())
except Exception:
pass
if local_ip.startswith("127.") or local_ip == "0.0.0.0":
local_ip = "192.168.1.1"
try:
network = IPv4Network(f"{local_ip}/24", strict=False)
except ValueError:
network = IPv4Network("192.168.1.0/24", strict=False)
found: list[str] = []
async def _probe(ip: str) -> None:
@ -122,7 +78,6 @@ async def _scan_network(port: int, timeout: float = 1.0) -> list[str]:
hosts = [str(host) for host in network.hosts()]
for index in range(0, len(hosts), 50):
await asyncio.gather(*(_probe(ip) for ip in hosts[index:index + 50]))
found.sort()
_LOGGER.debug("XT211 scan found %d host(s) on port %d: %s", len(found), port, found)
return found
@ -141,11 +96,9 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
mac = discovery_info.macaddress.replace(":", "").lower()
if not any(mac.startswith(prefix) for prefix in USR_IOT_MAC_PREFIXES):
return self.async_abort(reason="not_supported")
ip = discovery_info.ip
await self.async_set_unique_id(f"{ip}:{DEFAULT_PORT}")
self._abort_if_unique_id_configured(updates={CONF_HOST: ip})
self._discovered_host = ip
self._discovered_port = DEFAULT_PORT
_LOGGER.info("XT211 HAN: DHCP discovered USR IOT device at %s (MAC %s)", ip, mac)
@ -155,102 +108,41 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
error = await _test_connection(self._discovered_host, self._discovered_port)
if error:
return self.async_show_form(
step_id="dhcp_confirm",
errors={"base": error},
description_placeholders={
"host": self._discovered_host,
"port": str(self._discovered_port),
},
)
self._connection_data = {
CONF_HOST: self._discovered_host,
CONF_PORT: self._discovered_port,
CONF_NAME: DEFAULT_NAME,
}
return self.async_show_form(step_id="dhcp_confirm", errors={"base": error}, description_placeholders={"host": self._discovered_host, "port": str(self._discovered_port)})
self._connection_data = {CONF_HOST: self._discovered_host, CONF_PORT: self._discovered_port, CONF_NAME: DEFAULT_NAME}
return await self.async_step_meter()
return self.async_show_form(
step_id="dhcp_confirm",
description_placeholders={
"host": self._discovered_host,
"port": str(self._discovered_port),
},
)
return self.async_show_form(step_id="dhcp_confirm", description_placeholders={"host": self._discovered_host, "port": str(self._discovered_port)})
async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult:
if user_input is not None:
return await (self.async_step_scan() if user_input.get("method") == "scan" else self.async_step_manual())
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required("method", default="scan"): vol.In(
{
"scan": "🔍 Automaticky vyhledat v síti",
"manual": "✏️ Zadat IP adresu ručně",
}
)
}
),
)
return self.async_show_form(step_id="user", data_schema=vol.Schema({vol.Required("method", default="scan"): vol.In({"scan": "🔍 Automaticky vyhledat v síti", "manual": "✏️ Zadat IP adresu ručně"})}))
async def async_step_scan(self, user_input: dict[str, Any] | None = None) -> FlowResult:
if user_input is not None:
host = user_input[CONF_HOST]
if host == MANUAL_CHOICE:
return await self.async_step_manual()
port = user_input.get(CONF_PORT, DEFAULT_PORT)
name = user_input.get(CONF_NAME, DEFAULT_NAME)
await self.async_set_unique_id(f"{host}:{port}")
self._abort_if_unique_id_configured()
error = await _test_connection(host, port)
if error:
return self.async_show_form(
step_id="scan",
data_schema=self._scan_schema(port, include_choices=not self._scan_results == []),
errors={"base": error},
)
return self.async_show_form(step_id="scan", data_schema=self._scan_schema(port, include_choices=not self._scan_results == []), errors={"base": error})
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
return await self.async_step_meter()
self._scan_results = await _scan_network(DEFAULT_PORT)
if not self._scan_results:
return self.async_show_form(
step_id="scan",
data_schema=self._scan_schema(DEFAULT_PORT, include_choices=False),
errors={"base": "no_devices_found"},
)
return self.async_show_form(
step_id="scan",
data_schema=self._scan_schema(DEFAULT_PORT, include_choices=True),
)
return self.async_show_form(step_id="scan", data_schema=self._scan_schema(DEFAULT_PORT, include_choices=False), errors={"base": "no_devices_found"})
return self.async_show_form(step_id="scan", data_schema=self._scan_schema(DEFAULT_PORT, include_choices=True))
def _scan_schema(self, port: int, include_choices: bool) -> vol.Schema:
if include_choices:
choices = {ip: f"{ip}:{port}" for ip in self._scan_results}
choices[MANUAL_CHOICE] = "✏️ Zadat IP adresu ručně"
return vol.Schema(
{
vol.Required(CONF_HOST): vol.In(choices),
vol.Optional(CONF_PORT, default=port): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
return vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=port): int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
}
)
return vol.Schema({vol.Required(CONF_HOST): vol.In(choices), vol.Optional(CONF_PORT, default=port): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str})
return vol.Schema({vol.Required(CONF_HOST): str, vol.Required(CONF_PORT, default=port): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str})
async def async_step_manual(self, user_input: dict[str, Any] | None = None) -> FlowResult:
errors: dict[str, str] = {}
@ -258,17 +150,14 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
host = user_input[CONF_HOST]
port = user_input[CONF_PORT]
name = user_input.get(CONF_NAME, DEFAULT_NAME)
await self.async_set_unique_id(f"{host}:{port}")
self._abort_if_unique_id_configured()
error = await _test_connection(host, port)
if error:
errors["base"] = error
else:
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
return await self.async_step_meter()
return self.async_show_form(step_id="manual", data_schema=STEP_CONNECTION_SCHEMA, errors=errors)
async def async_step_meter(self, user_input: dict[str, Any] | None = None) -> FlowResult:
@ -278,5 +167,4 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
host = data[CONF_HOST]
port = data[CONF_PORT]
return self.async_create_entry(title=f"{name} ({host}:{port})", data=data)
return self.async_show_form(step_id="meter", data_schema=STEP_METER_SCHEMA)

View File

@ -13,16 +13,11 @@ CONF_RELAY_COUNT = "relay_count"
DEFAULT_PORT = 8899
DEFAULT_NAME = "XT211 HAN"
# Phases
PHASES_1 = "1"
PHASES_3 = "3"
# Tariff counts
TARIFFS_1 = 1
TARIFFS_2 = 2
TARIFFS_4 = 4
# Relay counts
RELAYS_0 = 0
RELAYS_4 = 4
RELAYS_6 = 6

View File

@ -12,7 +12,6 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .dlms_parser import DLMSObject, DLMSParser, OBIS_DESCRIPTIONS
_LOGGER = logging.getLogger(__name__)
PUSH_TIMEOUT = 90
RECONNECT_DELAY = 10
@ -21,12 +20,7 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Persistent TCP listener for XT211 DLMS push frames."""
def __init__(self, hass: HomeAssistant, host: str, port: int, name: str) -> None:
super().__init__(
hass,
_LOGGER,
name=f"XT211 HAN ({host}:{port})",
update_interval=None,
)
super().__init__(hass, _LOGGER, name=f"XT211 HAN ({host}:{port})", update_interval=None)
self.host = host
self.port = port
self.device_name = name
@ -79,15 +73,11 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
)
finally:
await self._disconnect()
await asyncio.sleep(RECONNECT_DELAY)
async def _connect(self) -> None:
_LOGGER.info("Connecting to XT211 adapter at %s:%d", self.host, self.port)
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=10,
)
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.port), timeout=10)
self._parser = DLMSParser()
self._connected = True
_LOGGER.info("Connected to XT211 adapter at %s:%d", self.host, self.port)
@ -105,50 +95,34 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
async def _receive_loop(self) -> None:
assert self._reader is not None
while True:
try:
chunk = await asyncio.wait_for(self._reader.read(4096), timeout=PUSH_TIMEOUT)
except asyncio.TimeoutError as exc:
_LOGGER.warning("No data from XT211 for %d s reconnecting", PUSH_TIMEOUT)
raise ConnectionError("Push timeout") from exc
if not chunk:
_LOGGER.warning("XT211 adapter closed connection")
raise ConnectionError("Remote closed")
_LOGGER.debug("XT211 RX %d bytes: %s", len(chunk), chunk.hex())
self._parser.feed(chunk)
while True:
result = self._parser.get_frame()
if result is None:
break
self._frames_received += 1
if result.success:
_LOGGER.debug(
"XT211 frame #%d parsed OK: %d object(s)",
self._frames_received,
len(result.objects),
)
_LOGGER.debug("XT211 frame #%d parsed OK: %d object(s)", self._frames_received, len(result.objects))
await self._process_frame(result.objects)
else:
_LOGGER.debug(
"XT211 frame #%d parse error: %s (raw: %s)",
self._frames_received,
result.error,
result.raw_hex[:120],
)
_LOGGER.debug("XT211 frame #%d parse error: %s (raw: %s)", self._frames_received, result.error, result.raw_hex[:120])
async def _process_frame(self, objects: list[DLMSObject]) -> None:
if not objects:
_LOGGER.debug("Received empty DLMS frame")
return
current = dict(self.data or {})
changed: list[str] = []
for obj in objects:
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
new_value = {
@ -161,11 +135,5 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
changed.append(obj.obis)
current[obj.obis] = new_value
_LOGGER.debug("XT211 OBIS %s = %r %s", obj.obis, obj.value, new_value["unit"])
self.async_set_updated_data(current)
_LOGGER.debug(
"Coordinator updated with %d object(s), %d changed: %s",
len(objects),
len(changed),
", ".join(changed[:10]),
)
_LOGGER.debug("Coordinator updated with %d object(s), %d changed: %s", len(objects), len(changed), ", ".join(changed[:10]))

View File

@ -8,9 +8,7 @@ from dataclasses import dataclass, field
from typing import Any
_LOGGER = logging.getLogger(__name__)
HDLC_FLAG = 0x7E
DLMS_TYPE_NULL = 0x00
DLMS_TYPE_ARRAY = 0x01
DLMS_TYPE_STRUCTURE = 0x02
@ -32,7 +30,7 @@ DLMS_TYPE_FLOAT64 = 0x18
class NeedMoreData(Exception):
"""Raised when the parser needs more bytes to finish a frame."""
pass
@dataclass
@ -52,8 +50,6 @@ class ParseResult:
class DLMSParser:
"""Stateful parser for raw DLMS APDUs and HDLC-wrapped frames."""
def __init__(self) -> None:
self._buffer = bytearray()
@ -61,53 +57,42 @@ class DLMSParser:
self._buffer.extend(data)
def get_frame(self) -> ParseResult | None:
"""Return one parsed frame from the internal buffer, if available."""
if not self._buffer:
return None
if self._buffer[0] == HDLC_FLAG:
return self._get_hdlc_frame()
start = self._find_apdu_start(self._buffer)
if start == -1:
_LOGGER.debug("Discarding %d bytes without known frame start", len(self._buffer))
self._buffer.clear()
return None
if start > 0:
_LOGGER.debug("Discarding %d leading byte(s) before APDU", start)
del self._buffer[:start]
if self._buffer and self._buffer[0] == 0x0F:
return self._get_raw_apdu_frame()
return None
def _get_hdlc_frame(self) -> ParseResult | None:
buf = self._buffer
if len(buf) < 3:
return None
frame_len = ((buf[1] & 0x07) << 8) | buf[2]
total = frame_len + 2
if len(buf) < total:
return None
raw = bytes(buf[:total])
del buf[:total]
raw_hex = raw.hex()
if raw[0] != HDLC_FLAG or raw[-1] != HDLC_FLAG:
return ParseResult(success=False, raw_hex=raw_hex, error="Missing HDLC flags")
try:
result = self._parse_hdlc(raw)
result.raw_hex = raw_hex
return result
except NeedMoreData:
# Should not happen for HDLC because total length is known.
return None
except Exception as exc: # pragma: no cover - defensive logging
except Exception as exc:
_LOGGER.exception("Error parsing HDLC frame")
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
@ -117,12 +102,11 @@ class DLMSParser:
result, consumed = self._parse_apdu_with_length(bytes(buf))
except NeedMoreData:
return None
except Exception as exc: # pragma: no cover - defensive logging
except Exception as exc:
raw_hex = bytes(buf).hex()
_LOGGER.exception("Error parsing raw DLMS APDU")
del buf[:]
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
raw = bytes(buf[:consumed])
del buf[:consumed]
result.raw_hex = raw.hex()
@ -130,16 +114,14 @@ class DLMSParser:
def _parse_hdlc(self, raw: bytes) -> ParseResult:
pos = 1
pos += 2 # frame format
pos += 2
_, pos = self._read_hdlc_address(raw, pos)
_, pos = self._read_hdlc_address(raw, pos)
pos += 1 # control
pos += 2 # HCS
pos += 1
pos += 2
if pos + 3 > len(raw) - 3:
raise ValueError("Frame too short for LLC")
pos += 3 # LLC header
pos += 3
apdu = raw[pos:-3]
result, _ = self._parse_apdu_with_length(apdu)
return result
@ -164,15 +146,12 @@ class DLMSParser:
raise ValueError(f"Unexpected APDU tag 0x{apdu[0]:02X}")
if len(apdu) < 6:
raise NeedMoreData
pos = 1
invoke_id = struct.unpack_from(">I", apdu, pos)[0]
pos += 4
_LOGGER.debug("XT211 invoke_id=0x%08X", invoke_id)
if pos >= len(apdu):
raise NeedMoreData
if apdu[pos] == DLMS_TYPE_OCTET_STRING:
pos += 1
dt_len, pos = self._decode_length(apdu, pos)
@ -180,7 +159,6 @@ class DLMSParser:
pos += dt_len
elif apdu[pos] == DLMS_TYPE_NULL:
pos += 1
self._require(apdu, pos, 2)
if apdu[pos] != DLMS_TYPE_STRUCTURE:
return ParseResult(success=True, objects=[]), pos
@ -188,7 +166,6 @@ class DLMSParser:
pos += 2
if structure_count < 2:
return ParseResult(success=True, objects=[]), pos
if pos >= len(apdu):
raise NeedMoreData
if apdu[pos] == DLMS_TYPE_ENUM:
@ -196,20 +173,17 @@ class DLMSParser:
pos += 2
else:
_, pos = self._decode_value(apdu, pos)
if pos >= len(apdu):
raise NeedMoreData
if apdu[pos] != DLMS_TYPE_ARRAY:
return ParseResult(success=True, objects=[]), pos
pos += 1
array_count, pos = self._decode_length(apdu, pos)
objects: list[DLMSObject] = []
for _ in range(array_count):
obj, pos = self._parse_xt211_object(apdu, pos)
if obj is not None:
objects.append(obj)
return ParseResult(success=True, objects=objects), pos
def _parse_xt211_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]:
@ -217,13 +191,9 @@ class DLMSParser:
if data[pos] != DLMS_TYPE_STRUCTURE:
raise ValueError(f"Expected object structure at {pos}, got 0x{data[pos]:02X}")
pos += 1
count, pos = self._decode_length(data, pos)
if count < 1:
raise ValueError(f"Unexpected object element count {count}")
# XT211 measurement objects use a raw descriptor layout:
# 02 02 00 [class_id_hi class_id_lo] [6B OBIS] [attr_idx] [typed value]
if pos < len(data) and data[pos] == 0x00:
if pos + 10 > len(data):
raise NeedMoreData
@ -231,34 +201,17 @@ class DLMSParser:
pos += 2
obis_raw = bytes(data[pos:pos + 6])
pos += 6
_attr_idx = data[pos]
pos += 1
value, pos = self._decode_value(data, pos)
if isinstance(value, (bytes, bytearray)):
try:
value = bytes(value).decode("ascii", errors="replace").strip("\x00")
except Exception:
value = bytes(value).hex()
obis = self._format_obis(obis_raw)
meta = OBIS_DESCRIPTIONS.get(obis, {})
_LOGGER.debug(
"Parsed XT211 object class_id=%s obis=%s value=%r unit=%s",
class_id,
obis,
value,
meta.get("unit", ""),
)
return DLMSObject(
obis=obis,
value=value,
unit=meta.get("unit", ""),
scaler=0,
), pos
# Short housekeeping frames use simple typed structures without OBIS.
# Consume them cleanly and ignore them.
_LOGGER.debug("Parsed XT211 object class_id=%s obis=%s value=%r unit=%s", class_id, obis, value, meta.get("unit", ""))
return DLMSObject(obis=obis, value=value, unit=meta.get("unit", ""), scaler=0), pos
last_value: Any = None
for _ in range(count):
last_value, pos = self._decode_value(data, pos)
@ -269,7 +222,6 @@ class DLMSParser:
self._require(data, pos, 1)
dtype = data[pos]
pos += 1
if dtype == DLMS_TYPE_NULL:
return None, pos
if dtype == DLMS_TYPE_BOOL:
@ -320,7 +272,6 @@ class DLMSParser:
item, pos = self._decode_value(data, pos)
items.append(item)
return items, pos
raise ValueError(f"Unknown DLMS type 0x{dtype:02X} at pos {pos - 1}")
def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]:
@ -354,7 +305,7 @@ class DLMSParser:
return f"{a}-{b}:{c}.{d}.{e}.{f}"
OBIS_DESCRIPTIONS: dict[str, dict[str, str]] = {
OBIS_DESCRIPTIONS = {
"0-0:42.0.0.255": {"name": "Název zařízení", "unit": "", "class": "text"},
"0-0:96.1.0.255": {"name": "Výrobní číslo", "unit": "", "class": "text"},
"0-0:96.1.1.255": {"name": "Výrobní číslo", "unit": "", "class": "text"},
@ -381,5 +332,5 @@ OBIS_DESCRIPTIONS: dict[str, dict[str, str]] = {
"1-0:1.8.3.255": {"name": "Spotřeba energie T3", "unit": "Wh", "class": "energy"},
"1-0:1.8.4.255": {"name": "Spotřeba energie T4", "unit": "Wh", "class": "energy"},
"1-0:2.8.0.255": {"name": "Dodávka energie celkem", "unit": "Wh", "class": "energy"},
"0-0:96.13.0.255": {"name": "Zpráva pro zákazníka", "unit": "", "class": "text"},
"0-0:96.13.0.255": {"name": "Zpráva pro zákazníka", "unit": "", "class": "text"}
}

View File

@ -1,9 +1,9 @@
{
"domain": "xt211_han",
"name": "XT211 HAN (RS485 via Ethernet)",
"version": "0.8.0",
"documentation": "https://github.com/nero150/xt211-han-ha",
"issue_tracker": "https://github.com/nero150/xt211-han-ha/issues",
"version": "0.9.0",
"documentation": "https://github.com/nero150/CEZ_rele_box",
"issue_tracker": "https://github.com/nero150/CEZ_rele_box/issues",
"dependencies": [],
"codeowners": [
"@nero150"

View File

@ -10,73 +10,27 @@ from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
CONF_HAS_FVE,
CONF_PHASES,
CONF_RELAY_COUNT,
CONF_TARIFFS,
DOMAIN,
PHASES_3,
RELAYS_4,
TARIFFS_2,
)
from .const import CONF_HAS_FVE, CONF_PHASES, CONF_RELAY_COUNT, CONF_TARIFFS, DOMAIN, PHASES_3, RELAYS_4, TARIFFS_2
from .coordinator import XT211Coordinator
from .dlms_parser import OBIS_DESCRIPTIONS
SENSOR_META: dict[str, dict] = {
"power": {
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
"unit": UnitOfPower.WATT,
},
"energy": {
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
"unit": UnitOfEnergy.KILO_WATT_HOUR,
},
"sensor": {
"device_class": None,
"state_class": SensorStateClass.MEASUREMENT,
"unit": None,
},
SENSOR_META = {
"power": {"device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, "unit": UnitOfPower.WATT},
"energy": {"device_class": SensorDeviceClass.ENERGY, "state_class": SensorStateClass.TOTAL_INCREASING, "unit": UnitOfEnergy.KILO_WATT_HOUR},
"sensor": {"device_class": None, "state_class": SensorStateClass.MEASUREMENT, "unit": None},
}
SERIAL_OBIS = ("0-0:96.1.1.255", "0-0:96.1.0.255")
PRECREATED_TEXT_ENTITIES = {
"serial_number": {
"name": "Výrobní číslo",
"obises": SERIAL_OBIS,
"entity_category": EntityCategory.DIAGNOSTIC,
},
"current_tariff": {
"name": "Aktuální tarif",
"obises": ("0-0:96.14.0.255",),
"entity_category": EntityCategory.DIAGNOSTIC,
},
}
DYNAMIC_TEXT_OBIS = {
"0-0:42.0.0.255",
"0-0:96.13.0.255",
}
BINARY_OBIS = {
"0-0:96.3.10.255",
"0-1:96.3.10.255",
"0-2:96.3.10.255",
"0-3:96.3.10.255",
"0-4:96.3.10.255",
"0-5:96.3.10.255",
"0-6:96.3.10.255",
"serial_number": {"name": "Výrobní číslo", "obises": SERIAL_OBIS, "entity_category": EntityCategory.DIAGNOSTIC},
"current_tariff": {"name": "Aktuální tarif", "obises": ("0-0:96.14.0.255",), "entity_category": EntityCategory.DIAGNOSTIC},
}
DYNAMIC_TEXT_OBIS = {"0-0:42.0.0.255", "0-0:96.13.0.255"}
BINARY_OBIS = {"0-0:96.3.10.255", "0-1:96.3.10.255", "0-2:96.3.10.255", "0-3:96.3.10.255", "0-4:96.3.10.255", "0-5:96.3.10.255", "0-6:96.3.10.255"}
TEXT_OBIS = set().union(*(spec["obises"] for spec in PRECREATED_TEXT_ENTITIES.values()), DYNAMIC_TEXT_OBIS)
def _device_info(entry: ConfigEntry) -> DeviceInfo:
return DeviceInfo(
identifiers={(DOMAIN, entry.entry_id)},
name=entry.data.get(CONF_NAME, "XT211 HAN"),
manufacturer="Sagemcom",
model="XT211 AMM",
)
return DeviceInfo(identifiers={(DOMAIN, entry.entry_id)}, name=entry.data.get(CONF_NAME, "XT211 HAN"), manufacturer="Sagemcom", model="XT211 AMM")
def build_enabled_obis(entry: ConfigEntry) -> set[str]:
@ -84,60 +38,29 @@ def build_enabled_obis(entry: ConfigEntry) -> set[str]:
has_fve = entry.data.get(CONF_HAS_FVE, True)
tariffs = int(entry.data.get(CONF_TARIFFS, TARIFFS_2))
relay_count = int(entry.data.get(CONF_RELAY_COUNT, RELAYS_4))
enabled_obis: set[str] = {
"0-0:17.0.0.255",
"1-0:1.7.0.255",
"1-0:1.8.0.255",
*TEXT_OBIS,
}
relay_obis = {
1: "0-1:96.3.10.255",
2: "0-2:96.3.10.255",
3: "0-3:96.3.10.255",
4: "0-4:96.3.10.255",
5: "0-5:96.3.10.255",
6: "0-6:96.3.10.255",
}
enabled_obis = {"0-0:17.0.0.255", "1-0:1.7.0.255", "1-0:1.8.0.255", *TEXT_OBIS}
relay_obis = {1: "0-1:96.3.10.255", 2: "0-2:96.3.10.255", 3: "0-3:96.3.10.255", 4: "0-4:96.3.10.255", 5: "0-5:96.3.10.255", 6: "0-6:96.3.10.255"}
enabled_obis.add("0-0:96.3.10.255")
for idx in range(1, relay_count + 1):
enabled_obis.add(relay_obis[idx])
if phases == PHASES_3:
enabled_obis.update({"1-0:21.7.0.255", "1-0:41.7.0.255", "1-0:61.7.0.255"})
if has_fve:
enabled_obis.add("1-0:2.7.0.255")
enabled_obis.add("1-0:2.8.0.255")
if phases == PHASES_3:
enabled_obis.update({"1-0:22.7.0.255", "1-0:42.7.0.255", "1-0:62.7.0.255"})
for tariff in range(1, tariffs + 1):
enabled_obis.add(f"1-0:1.8.{tariff}.255")
return enabled_obis
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
enabled_obis = build_enabled_obis(entry)
entities = [
XT211SensorEntity(coordinator, entry, obis, meta)
for obis, meta in OBIS_DESCRIPTIONS.items()
if obis in enabled_obis and obis not in BINARY_OBIS and obis not in TEXT_OBIS
]
entities.extend(
XT211AliasedTextSensorEntity(coordinator, entry, key, spec)
for key, spec in PRECREATED_TEXT_ENTITIES.items()
)
entities = [XT211SensorEntity(coordinator, entry, obis, meta) for obis, meta in OBIS_DESCRIPTIONS.items() if obis in enabled_obis and obis not in BINARY_OBIS and obis not in TEXT_OBIS]
entities.extend(XT211AliasedTextSensorEntity(coordinator, entry, key, spec) for key, spec in PRECREATED_TEXT_ENTITIES.items())
async_add_entities(entities)
registered_obis = {entity._obis for entity in entities if hasattr(entity, "_obis")}
@callback

8
docs/images/README.txt Normal file
View File

@ -0,0 +1,8 @@
Do této složky patří screenshoty použité v README:
- converter_network_settings.png
- converter_serial_settings.png
- converter_status_tx.png
- home_assistant_entities.png
V tomto generovaném balíku nejsou původní binární obrázky přiložené.
Doplň je sem z původního repozitáře před finálním vydáním, pokud je chceš zachovat i v release archivu.

6
docs/pdfs/README.txt Normal file
View File

@ -0,0 +1,6 @@
Do této složky patří podkladové PDF soubory zmíněné v README:
- cez_rs485_han_interface.pdf
- cez_obis_codes_han_2025-02-01.pdf
V tomto generovaném balíku nejsou původní PDF přiložená.
Doplň je sem z původního repozitáře před finálním vydáním, pokud je chceš zachovat i v release archivu.

View File

@ -6,5 +6,6 @@
"iot_class": "local_push",
"render_readme": true,
"zip_release": true,
"filename": "xt211_han.zip"
"filename": "xt211_han.zip",
"content_in_root": true
}

View File

@ -1,7 +1,7 @@
{
"domain": "xt211_han",
"name": "XT211 HAN (RS485 via Ethernet)",
"version": "0.8.2",
"version": "0.8.9",
"documentation": "https://github.com/nero150/CEZ_rele_box",
"issue_tracker": "https://github.com/nero150/CEZ_rele_box/issues",
"dependencies": [],

View File

@ -1,24 +1,13 @@
#!/usr/bin/env python3
"""
Standalone test / debug script for the DLMS parser and TCP listener.
Usage:
# Parse a raw hex frame from the meter (paste from HA debug log):
python3 test_parser.py --hex "7ea0...7e"
# Live listen on TCP socket (forward output to terminal):
python3 test_parser.py --host 192.168.1.100 --port 8899
# Replay a saved binary capture file:
python3 test_parser.py --file capture.bin
"""
import argparse
import asyncio
import sys
import os
import sys
# Allow running from repo root without installing
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "custom_components"))
from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS
@ -26,12 +15,12 @@ from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS
def print_result(result) -> None:
if not result.success:
print(f" Parse error: {result.error}")
print(f" Parse error: {result.error}")
return
if not result.objects:
print(" ⚠️ Frame OK but no DLMS objects extracted")
print(" Frame OK but no DLMS objects extracted")
return
print(f" {len(result.objects)} OBIS objects decoded:")
print(f" {len(result.objects)} OBIS objects decoded:")
for obj in result.objects:
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
name = meta.get("name", obj.obis)
@ -40,43 +29,22 @@ def print_result(result) -> None:
def test_hex(hex_str: str) -> None:
"""Parse a single hex-encoded frame."""
raw = bytes.fromhex(hex_str.replace(" ", "").replace("\n", ""))
print(f"\n📦 Frame: {len(raw)} bytes")
print(f"\nFrame: {len(raw)} bytes")
parser = DLMSParser()
parser.feed(raw)
result = parser.get_frame()
if result:
print_result(result)
else:
print(" ⚠️ No complete frame found in data")
def test_file(path: str) -> None:
"""Parse all frames from a binary capture file."""
with open(path, "rb") as f:
data = f.read()
print(f"\n📂 File: {path} ({len(data)} bytes)")
parser = DLMSParser()
parser.feed(data)
count = 0
while True:
result = parser.get_frame()
if result is None:
break
count += 1
print(f"\n--- Frame #{count} ---")
print_result(result)
print(f"\nTotal frames parsed: {count}")
print(" No complete frame found in data")
async def listen_tcp(host: str, port: int) -> None:
"""Connect to the TCP adapter and print decoded frames as they arrive."""
print(f"\n🔌 Connecting to {host}:{port} ...")
print(f"\nConnecting to {host}:{port} ...")
reader, writer = await asyncio.open_connection(host, port)
print(" Connected. Waiting for DLMS PUSH frames (every ~60 s)...\n")
print("Connected. Waiting for DLMS PUSH frames...\n")
parser = DLMSParser()
frame_count = 0
try:
while True:
chunk = await asyncio.wait_for(reader.read(4096), timeout=120)
@ -88,8 +56,6 @@ async def listen_tcp(host: str, port: int) -> None:
result = parser.get_frame()
if result is None:
break
frame_count += 1
print(f"\n--- Frame #{frame_count} raw: {result.raw_hex[:40]}... ---")
print_result(result)
except asyncio.TimeoutError:
print("No data for 120 s, giving up.")
@ -101,15 +67,12 @@ def main() -> None:
parser = argparse.ArgumentParser(description="XT211 DLMS parser test tool")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--hex", help="Hex-encoded raw frame to parse")
group.add_argument("--file", help="Binary capture file to parse")
group.add_argument("--host", help="Adapter IP address for live TCP test")
parser.add_argument("--port", type=int, default=8899, help="TCP port (default 8899)")
args = parser.parse_args()
if args.hex:
test_hex(args.hex)
elif args.file:
test_file(args.file)
elif args.host:
asyncio.run(listen_tcp(args.host, args.port))

BIN
xt211_han.zip Normal file

Binary file not shown.