From cf91787f60655f3df94d67c259124f17e4ed134e Mon Sep 17 00:00:00 2001 From: Nero <95982029+nero150@users.noreply.github.com> Date: Wed, 18 Mar 2026 17:40:43 +0100 Subject: [PATCH] Add files via upload --- custom_components/xt211_han/dlms_parser.py | 190 ++++++++------------- custom_components/xt211_han/manifest.json | 2 +- 2 files changed, 69 insertions(+), 123 deletions(-) diff --git a/custom_components/xt211_han/dlms_parser.py b/custom_components/xt211_han/dlms_parser.py index 741c79c..c6cff62 100644 --- a/custom_components/xt211_han/dlms_parser.py +++ b/custom_components/xt211_han/dlms_parser.py @@ -256,187 +256,133 @@ class DLMSParser: return addr, pos def _parse_apdu(self, apdu: bytes) -> ParseResult: - """Parse DLMS APDU (Data-Notification = tag 0x0F).""" + """ + Parse DLMS APDU (Data-Notification = tag 0x0F). + + XT211 frame structure: + 0F - Data-Notification tag + [4B invoke-id] - MSB set = data frame, clear = push-setup (skip) + 00 - datetime absent + 02 02 - outer structure(2) + 16 [push_type] - elem[0]: enum (push type, ignore) + 01 [N] - elem[1]: array(N captured objects) + [N x object] - see _parse_xt211_object + + Each captured object (11-byte header + type-tagged value): + 02 02 00 - structure prefix (3 bytes, ignored) + [class_id] - 1 byte DLMS class ID + [A B C D E F] - 6-byte raw OBIS (NO type tag!) + [attr_idx] - 1 byte attribute index (ignored) + [type][value bytes] - standard DLMS type-tagged value + """ if not apdu: return ParseResult(success=False, objects=[], error="Empty APDU") - tag = apdu[0] - if tag != 0x0F: + if apdu[0] != 0x0F: return ParseResult( success=False, objects=[], - error=f"Unexpected APDU tag 0x{tag:02X} (expected 0x0F)" + error=f"Unexpected APDU tag 0x{apdu[0]:02X} (expected 0x0F)" ) - pos = 1 - if len(apdu) < 5: + if len(apdu) < 6: return ParseResult(success=False, objects=[], error="APDU too short") - # Invoke-id (4 bytes) - bit 31 set = data frame, clear = push-setup (ignore) - invoke_id = struct.unpack_from(">I", apdu, pos)[0] - pos += 4 + pos = 1 + invoke_id = struct.unpack_from(">I", apdu, pos)[0]; pos += 4 _LOGGER.debug("Invoke ID: 0x%08X", invoke_id) - # Push-setup frames (invoke_id MSB = 0) contain no measurement data + # Skip push-setup frames (invoke_id MSB = 0) if not (invoke_id & 0x80000000): - _LOGGER.debug("Push-setup frame (invoke_id MSB=0), skipping") + _LOGGER.debug("Push-setup frame, skipping") return ParseResult(success=True, objects=[]) - # Optional date-time + # Datetime: 0x09 = octet-string, 0x00 = absent if pos < len(apdu) and apdu[pos] == 0x09: pos += 1 - dt_len = apdu[pos]; pos += 1 - pos += dt_len + dt_len = apdu[pos]; pos += 1 + dt_len elif pos < len(apdu) and apdu[pos] == 0x00: - pos += 1 # absent + pos += 1 - # Notification body: - # structure(2): - # [0] enum = push type (ignore) - # [1] array(N) of structure(2): [obis_octet_string, value_structure(2): [scaler_unit, value]] - if pos >= len(apdu): + # Outer structure(2): skip tag + count + if pos + 2 > len(apdu) or apdu[pos] != 0x02: return ParseResult(success=True, objects=[]) + pos += 2 # 02 02 - # Outer structure tag - if apdu[pos] != 0x02: - _LOGGER.debug("Expected structure (0x02) at pos %d, got 0x%02X", pos, apdu[pos]) - return ParseResult(success=True, objects=[]) - pos += 1 - outer_count, pos = self._decode_length(apdu, pos) - _LOGGER.debug("Outer structure count: %d", outer_count) - - # Skip push type (first element, usually enum) + # Element[0]: enum = push type (skip 2 bytes: 16 XX) if pos < len(apdu) and apdu[pos] == 0x16: - pos += 2 # enum tag + 1 byte value + pos += 2 - # Next should be array of COSEM objects + # Element[1]: array of captured objects if pos >= len(apdu) or apdu[pos] != 0x01: - _LOGGER.debug("Expected array (0x01) at pos %d, got 0x%02X", pos, apdu[pos] if pos < len(apdu) else -1) return ParseResult(success=True, objects=[]) pos += 1 + array_count, pos = self._decode_length(apdu, pos) _LOGGER.debug("Array count: %d objects", array_count) objects = [] for i in range(array_count): - if pos >= len(apdu): + if pos + 11 > len(apdu): break try: - obj, pos = self._parse_cosem_object(apdu, pos) + obj, pos = self._parse_xt211_object(apdu, pos) if obj: objects.append(obj) + _LOGGER.debug("OBIS %s = %s %s", obj.obis, obj.value, obj.unit) except Exception as exc: - _LOGGER.debug("Error parsing COSEM object %d at pos %d: %s", i, pos, exc) + _LOGGER.debug("Error parsing object %d at pos %d: %s", i, pos, exc) break return ParseResult(success=True, objects=objects) - def _parse_cosem_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]: + def _parse_xt211_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]: """ - Parse one COSEM object entry from the push array. + Parse one captured object from XT211 push notification. - Expected structure(2): - [0] octet-string(6) = OBIS code - [1] structure(2): - [0] structure(2): [int8 scaler, enum unit] - [1] value (any type) - Or simplified structure(2): - [0] octet-string(6) = OBIS - [1] value directly + Format per object: + 02 02 00 - 3-byte structure prefix (ignored) + [class_id] - 1 byte + [A B C D E F] - 6-byte raw OBIS (no type tag) + [attr_idx] - 1 byte (ignored) + [type][value] - DLMS type-tagged value """ if data[pos] != 0x02: - # Not a structure - skip unknown type - val, pos = self._decode_value(data, pos) - return None, pos - pos += 1 # skip structure tag - count, pos = self._decode_length(data, pos) + _LOGGER.debug("Expected 0x02 at pos %d, got 0x%02X", pos, data[pos]) + return None, pos + 1 - if count < 2: - return None, pos + pos += 3 # skip: 02 02 00 - # Element 0: OBIS code as octet-string - if data[pos] != 0x09: - val, pos = self._decode_value(data, pos) - return None, pos - pos += 1 # skip octet-string tag - obis_len, pos = self._decode_length(data, pos) - obis_raw = data[pos:pos+obis_len] - pos += obis_len + # Class ID + pos += 1 # class_id (not needed for value extraction) - if len(obis_raw) != 6: - # Skip remaining elements - for _ in range(count - 1): - _, pos = self._decode_value(data, pos) + # Raw OBIS (6 bytes, no type tag) + if pos + 6 > len(data): return None, pos - + obis_raw = data[pos:pos+6]; pos += 6 obis_str = self._format_obis(obis_raw) - # Element 1: value wrapper - # Can be: structure(2)[scaler_unit, value] OR direct value - scaler = 0 - unit_code = 255 - value = None + # Attribute index (skip) + pos += 1 - if pos < len(data) and data[pos] == 0x02: - # structure(2): [scaler_unit_struct, value] - pos += 1 # skip structure tag - inner_count, pos = self._decode_length(data, pos) + # Type-tagged value + value, pos = self._decode_value(data, pos) - if inner_count >= 2: - # First inner: scaler+unit structure(2)[int8, enum] - if pos < len(data) and data[pos] == 0x02: - pos += 1 - su_count, pos = self._decode_length(data, pos) - if su_count >= 2: - raw_scaler, pos = self._decode_value(data, pos) - raw_unit, pos = self._decode_value(data, pos) - if isinstance(raw_scaler, int): - scaler = raw_scaler if raw_scaler < 128 else raw_scaler - 256 - if isinstance(raw_unit, int): - unit_code = raw_unit - # skip extra - for _ in range(su_count - 2): - _, pos = self._decode_value(data, pos) - else: - _, pos = self._decode_value(data, pos) - - # Second inner: actual value - value, pos = self._decode_value(data, pos) - - # skip extra inner elements - for _ in range(inner_count - 2): - _, pos = self._decode_value(data, pos) - else: - for _ in range(inner_count): - _, pos = self._decode_value(data, pos) - else: - # Direct value - value, pos = self._decode_value(data, pos) - - # Skip any extra elements in the outer structure - for _ in range(count - 2): - _, pos = self._decode_value(data, pos) - - # Apply scaler - if isinstance(value, int) and scaler != 0: - final_value: Any = apply_scaler(value, scaler) - elif isinstance(value, (bytes, bytearray)): + # Convert bytes to string for text objects + if isinstance(value, (bytes, bytearray)): try: - final_value = value.decode("ascii", errors="replace").strip("\x00") + value = value.decode("ascii", errors="replace").strip("\x00") except Exception: - final_value = value.hex() - else: - final_value = value + value = value.hex() - unit_str = self.UNIT_MAP.get(unit_code, "") meta = OBIS_DESCRIPTIONS.get(obis_str, {}) - return DLMSObject( obis=obis_str, - value=final_value, - unit=unit_str or meta.get("unit", ""), - scaler=scaler, + value=value, + unit=meta.get("unit", ""), + scaler=0, ), pos + def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]: """Recursively decode a DLMS typed value. Returns (value, new_pos).""" if pos >= len(data): diff --git a/custom_components/xt211_han/manifest.json b/custom_components/xt211_han/manifest.json index 505f8fa..4cd54dd 100644 --- a/custom_components/xt211_han/manifest.json +++ b/custom_components/xt211_han/manifest.json @@ -1,7 +1,7 @@ { "domain": "xt211_han", "name": "XT211 HAN (RS485 via Ethernet)", - "version": "0.7.4", + "version": "0.7.5", "documentation": "https://github.com/nero150/xt211-han-ha", "issue_tracker": "https://github.com/nero150/xt211-han-ha/issues", "dependencies": [],