From b5696d63256f5223bb0dada8a9f9250571013749 Mon Sep 17 00:00:00 2001 From: Nero <95982029+nero150@users.noreply.github.com> Date: Wed, 18 Mar 2026 17:24:05 +0100 Subject: [PATCH] Add files via upload --- custom_components/xt211_han/dlms_parser.py | 293 ++++++++++++--------- custom_components/xt211_han/manifest.json | 2 +- 2 files changed, 172 insertions(+), 123 deletions(-) diff --git a/custom_components/xt211_han/dlms_parser.py b/custom_components/xt211_han/dlms_parser.py index 70f790c..741c79c 100644 --- a/custom_components/xt211_han/dlms_parser.py +++ b/custom_components/xt211_han/dlms_parser.py @@ -84,10 +84,14 @@ class DLMSObject: class ParseResult: """Result of parsing one HDLC frame.""" success: bool - objects: list[DLMSObject] + objects: list[DLMSObject] = None raw_hex: str = "" error: str = "" + def __post_init__(self): + if self.objects is None: + self.objects = [] + class DLMSParser: """ @@ -254,40 +258,184 @@ class DLMSParser: def _parse_apdu(self, apdu: bytes) -> ParseResult: """Parse DLMS APDU (Data-Notification = tag 0x0F).""" if not apdu: - return ParseResult(success=False, error="Empty APDU") + return ParseResult(success=False, objects=[], error="Empty APDU") tag = apdu[0] - if tag != 0x0F: return ParseResult( - success=False, - error=f"Unexpected APDU tag 0x{tag:02X} (expected 0x0F Data-Notification)" + success=False, objects=[], + error=f"Unexpected APDU tag 0x{tag:02X} (expected 0x0F)" ) - # Data-Notification structure: - # 0F [long-invoke-id-and-priority 4B] [date-time opt] [notification-body] pos = 1 if len(apdu) < 5: - return ParseResult(success=False, error="APDU too short") + return ParseResult(success=False, objects=[], error="APDU too short") - # Long invoke-id-and-priority (4 bytes) - invoke_id = struct.unpack_from(">I", apdu, pos)[0]; pos += 4 + # Invoke-id (4 bytes) - bit 31 set = data frame, clear = push-setup (ignore) + invoke_id = struct.unpack_from(">I", apdu, pos)[0] + pos += 4 _LOGGER.debug("Invoke ID: 0x%08X", invoke_id) - # Optional date-time: if next byte == 0x09 then it's an octet string with time + # Push-setup frames (invoke_id MSB = 0) contain no measurement data + if not (invoke_id & 0x80000000): + _LOGGER.debug("Push-setup frame (invoke_id MSB=0), skipping") + return ParseResult(success=True, objects=[]) + + # Optional date-time if pos < len(apdu) and apdu[pos] == 0x09: - pos += 1 # skip type tag + pos += 1 dt_len = apdu[pos]; pos += 1 - _dt_bytes = apdu[pos:pos+dt_len]; pos += dt_len - _LOGGER.debug("Timestamp bytes: %s", _dt_bytes.hex()) + pos += dt_len elif pos < len(apdu) and apdu[pos] == 0x00: - pos += 1 # optional field absent + pos += 1 # absent - # Notification body is a structure containing the push data - objects, _ = self._decode_value(apdu, pos) - dlms_objects = self._extract_objects(objects) + # Notification body: + # structure(2): + # [0] enum = push type (ignore) + # [1] array(N) of structure(2): [obis_octet_string, value_structure(2): [scaler_unit, value]] + if pos >= len(apdu): + return ParseResult(success=True, objects=[]) - return ParseResult(success=True, objects=dlms_objects) + # Outer structure tag + if apdu[pos] != 0x02: + _LOGGER.debug("Expected structure (0x02) at pos %d, got 0x%02X", pos, apdu[pos]) + return ParseResult(success=True, objects=[]) + pos += 1 + outer_count, pos = self._decode_length(apdu, pos) + _LOGGER.debug("Outer structure count: %d", outer_count) + + # Skip push type (first element, usually enum) + if pos < len(apdu) and apdu[pos] == 0x16: + pos += 2 # enum tag + 1 byte value + + # Next should be array of COSEM objects + if pos >= len(apdu) or apdu[pos] != 0x01: + _LOGGER.debug("Expected array (0x01) at pos %d, got 0x%02X", pos, apdu[pos] if pos < len(apdu) else -1) + return ParseResult(success=True, objects=[]) + pos += 1 + array_count, pos = self._decode_length(apdu, pos) + _LOGGER.debug("Array count: %d objects", array_count) + + objects = [] + for i in range(array_count): + if pos >= len(apdu): + break + try: + obj, pos = self._parse_cosem_object(apdu, pos) + if obj: + objects.append(obj) + except Exception as exc: + _LOGGER.debug("Error parsing COSEM object %d at pos %d: %s", i, pos, exc) + break + + return ParseResult(success=True, objects=objects) + + def _parse_cosem_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]: + """ + Parse one COSEM object entry from the push array. + + Expected structure(2): + [0] octet-string(6) = OBIS code + [1] structure(2): + [0] structure(2): [int8 scaler, enum unit] + [1] value (any type) + Or simplified structure(2): + [0] octet-string(6) = OBIS + [1] value directly + """ + if data[pos] != 0x02: + # Not a structure - skip unknown type + val, pos = self._decode_value(data, pos) + return None, pos + pos += 1 # skip structure tag + count, pos = self._decode_length(data, pos) + + if count < 2: + return None, pos + + # Element 0: OBIS code as octet-string + if data[pos] != 0x09: + val, pos = self._decode_value(data, pos) + return None, pos + pos += 1 # skip octet-string tag + obis_len, pos = self._decode_length(data, pos) + obis_raw = data[pos:pos+obis_len] + pos += obis_len + + if len(obis_raw) != 6: + # Skip remaining elements + for _ in range(count - 1): + _, pos = self._decode_value(data, pos) + return None, pos + + obis_str = self._format_obis(obis_raw) + + # Element 1: value wrapper + # Can be: structure(2)[scaler_unit, value] OR direct value + scaler = 0 + unit_code = 255 + value = None + + if pos < len(data) and data[pos] == 0x02: + # structure(2): [scaler_unit_struct, value] + pos += 1 # skip structure tag + inner_count, pos = self._decode_length(data, pos) + + if inner_count >= 2: + # First inner: scaler+unit structure(2)[int8, enum] + if pos < len(data) and data[pos] == 0x02: + pos += 1 + su_count, pos = self._decode_length(data, pos) + if su_count >= 2: + raw_scaler, pos = self._decode_value(data, pos) + raw_unit, pos = self._decode_value(data, pos) + if isinstance(raw_scaler, int): + scaler = raw_scaler if raw_scaler < 128 else raw_scaler - 256 + if isinstance(raw_unit, int): + unit_code = raw_unit + # skip extra + for _ in range(su_count - 2): + _, pos = self._decode_value(data, pos) + else: + _, pos = self._decode_value(data, pos) + + # Second inner: actual value + value, pos = self._decode_value(data, pos) + + # skip extra inner elements + for _ in range(inner_count - 2): + _, pos = self._decode_value(data, pos) + else: + for _ in range(inner_count): + _, pos = self._decode_value(data, pos) + else: + # Direct value + value, pos = self._decode_value(data, pos) + + # Skip any extra elements in the outer structure + for _ in range(count - 2): + _, pos = self._decode_value(data, pos) + + # Apply scaler + if isinstance(value, int) and scaler != 0: + final_value: Any = apply_scaler(value, scaler) + elif isinstance(value, (bytes, bytearray)): + try: + final_value = value.decode("ascii", errors="replace").strip("\x00") + except Exception: + final_value = value.hex() + else: + final_value = value + + unit_str = self.UNIT_MAP.get(unit_code, "") + meta = OBIS_DESCRIPTIONS.get(obis_str, {}) + + return DLMSObject( + obis=obis_str, + value=final_value, + unit=unit_str or meta.get("unit", ""), + scaler=scaler, + ), pos def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]: """Recursively decode a DLMS typed value. Returns (value, new_pos).""" @@ -298,40 +446,30 @@ class DLMSParser: if dtype == DLMS_TYPE_NULL: return None, pos - elif dtype == DLMS_TYPE_BOOL: return bool(data[pos]), pos + 1 - elif dtype == DLMS_TYPE_INT8: return struct.unpack_from(">b", data, pos)[0], pos + 1 - elif dtype == DLMS_TYPE_UINT8: return data[pos], pos + 1 - + elif dtype == 0x16: # enum = uint8 + return data[pos], pos + 1 elif dtype == DLMS_TYPE_INT16: return struct.unpack_from(">h", data, pos)[0], pos + 2 - elif dtype == DLMS_TYPE_UINT16: return struct.unpack_from(">H", data, pos)[0], pos + 2 - elif dtype == DLMS_TYPE_INT32: return struct.unpack_from(">i", data, pos)[0], pos + 4 - elif dtype == DLMS_TYPE_UINT32: return struct.unpack_from(">I", data, pos)[0], pos + 4 - elif dtype == DLMS_TYPE_INT64: return struct.unpack_from(">q", data, pos)[0], pos + 8 - elif dtype == DLMS_TYPE_UINT64: return struct.unpack_from(">Q", data, pos)[0], pos + 8 - elif dtype == DLMS_TYPE_FLOAT32: return struct.unpack_from(">f", data, pos)[0], pos + 4 - elif dtype == DLMS_TYPE_FLOAT64: return struct.unpack_from(">d", data, pos)[0], pos + 8 - elif dtype in (DLMS_TYPE_OCTET_STRING, DLMS_TYPE_VISIBLE_STRING): length, pos = self._decode_length(data, pos) raw_bytes = data[pos:pos+length] @@ -342,7 +480,6 @@ class DLMSParser: except Exception: return raw_bytes.hex(), pos return raw_bytes, pos - elif dtype in (DLMS_TYPE_ARRAY, DLMS_TYPE_STRUCTURE, DLMS_TYPE_COMPACT_ARRAY): count, pos = self._decode_length(data, pos) items = [] @@ -350,9 +487,8 @@ class DLMSParser: val, pos = self._decode_value(data, pos) items.append(val) return items, pos - else: - _LOGGER.warning("Unknown DLMS type 0x%02X at pos %d", dtype, pos) + _LOGGER.debug("Unknown DLMS type 0x%02X at pos %d, skipping", dtype, pos) return None, pos def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]: @@ -366,94 +502,7 @@ class DLMSParser: length = (length << 8) | data[pos]; pos += 1 return length, pos - def _extract_objects(self, notification_body: Any) -> list[DLMSObject]: - """ - Walk the decoded notification body and extract OBIS-keyed objects. - - The XT211 push notification body is a structure containing an array - of structures, each typically: - [OBIS bytes (6B octet-string), value (structure with scaler+unit), data] - - We try to handle both flat and nested layouts. - """ - objects = [] - if not isinstance(notification_body, list): - return objects - - # The outer structure may wrap an inner array - # Try to unwrap one level of nesting - payload = notification_body - if len(payload) == 1 and isinstance(payload[0], list): - payload = payload[0] - - for item in payload: - if not isinstance(item, list) or len(item) < 2: - continue - try: - obj = self._parse_cosem_entry(item) - if obj: - objects.append(obj) - except Exception as exc: - _LOGGER.debug("Could not parse COSEM entry %s: %s", item, exc) - - return objects - - def _parse_cosem_entry(self, entry: list) -> DLMSObject | None: - """ - Parse one COSEM entry from the push notification. - Expected layout: [obis_bytes, [scaler, unit], value] - or simplified: [obis_bytes, value] - """ - if len(entry) < 2: - return None - - # First element should be the OBIS code as 6-byte octet string - obis_raw = entry[0] - if not isinstance(obis_raw, (bytes, bytearray)) or len(obis_raw) != 6: - return None - - obis_str = self._format_obis(obis_raw) - - scaler = 0 - unit_code = 255 - value = None - - if len(entry) == 3: - # entry[1] = [scaler, unit], entry[2] = value - scaler_unit = entry[1] - if isinstance(scaler_unit, list) and len(scaler_unit) == 2: - raw_scaler = scaler_unit[0] - # scaler is signed int8 - if isinstance(raw_scaler, int): - scaler = raw_scaler if raw_scaler < 128 else raw_scaler - 256 - unit_code = scaler_unit[1] if isinstance(scaler_unit[1], int) else 255 - value = entry[2] - else: - value = entry[1] - - # Apply scaler to numeric values - if isinstance(value, int) and scaler != 0: - final_value: Any = apply_scaler(value, scaler) - elif isinstance(value, bytes): - # Try to decode as ASCII string (e.g. serial number) - try: - final_value = value.decode("ascii", errors="replace").strip("\x00") - except Exception: - final_value = value.hex() - else: - final_value = value - - unit_str = self.UNIT_MAP.get(unit_code, "") - - return DLMSObject( - obis=obis_str, - value=final_value, - unit=unit_str, - scaler=scaler, - ) - - @staticmethod - def _format_obis(raw: bytes) -> str: + """Convert 6 raw bytes to OBIS string notation A-B:C.D.E.F""" if len(raw) != 6: return raw.hex() diff --git a/custom_components/xt211_han/manifest.json b/custom_components/xt211_han/manifest.json index 7061b5b..505f8fa 100644 --- a/custom_components/xt211_han/manifest.json +++ b/custom_components/xt211_han/manifest.json @@ -1,7 +1,7 @@ { "domain": "xt211_han", "name": "XT211 HAN (RS485 via Ethernet)", - "version": "0.7.3", + "version": "0.7.4", "documentation": "https://github.com/nero150/xt211-han-ha", "issue_tracker": "https://github.com/nero150/xt211-han-ha/issues", "dependencies": [],