Add files via upload

main 0.7.5
Nero 2026-03-18 17:40:43 +01:00 committed by GitHub
parent b5696d6325
commit cf91787f60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 69 additions and 123 deletions

View File

@ -256,187 +256,133 @@ class DLMSParser:
return addr, pos return addr, pos
def _parse_apdu(self, apdu: bytes) -> ParseResult: def _parse_apdu(self, apdu: bytes) -> ParseResult:
"""Parse DLMS APDU (Data-Notification = tag 0x0F).""" """
Parse DLMS APDU (Data-Notification = tag 0x0F).
XT211 frame structure:
0F - Data-Notification tag
[4B invoke-id] - MSB set = data frame, clear = push-setup (skip)
00 - datetime absent
02 02 - outer structure(2)
16 [push_type] - elem[0]: enum (push type, ignore)
01 [N] - elem[1]: array(N captured objects)
[N x object] - see _parse_xt211_object
Each captured object (11-byte header + type-tagged value):
02 02 00 - structure prefix (3 bytes, ignored)
[class_id] - 1 byte DLMS class ID
[A B C D E F] - 6-byte raw OBIS (NO type tag!)
[attr_idx] - 1 byte attribute index (ignored)
[type][value bytes] - standard DLMS type-tagged value
"""
if not apdu: if not apdu:
return ParseResult(success=False, objects=[], error="Empty APDU") return ParseResult(success=False, objects=[], error="Empty APDU")
tag = apdu[0] if apdu[0] != 0x0F:
if tag != 0x0F:
return ParseResult( return ParseResult(
success=False, objects=[], success=False, objects=[],
error=f"Unexpected APDU tag 0x{tag:02X} (expected 0x0F)" error=f"Unexpected APDU tag 0x{apdu[0]:02X} (expected 0x0F)"
) )
pos = 1 if len(apdu) < 6:
if len(apdu) < 5:
return ParseResult(success=False, objects=[], error="APDU too short") return ParseResult(success=False, objects=[], error="APDU too short")
# Invoke-id (4 bytes) - bit 31 set = data frame, clear = push-setup (ignore) pos = 1
invoke_id = struct.unpack_from(">I", apdu, pos)[0] invoke_id = struct.unpack_from(">I", apdu, pos)[0]; pos += 4
pos += 4
_LOGGER.debug("Invoke ID: 0x%08X", invoke_id) _LOGGER.debug("Invoke ID: 0x%08X", invoke_id)
# Push-setup frames (invoke_id MSB = 0) contain no measurement data # Skip push-setup frames (invoke_id MSB = 0)
if not (invoke_id & 0x80000000): if not (invoke_id & 0x80000000):
_LOGGER.debug("Push-setup frame (invoke_id MSB=0), skipping") _LOGGER.debug("Push-setup frame, skipping")
return ParseResult(success=True, objects=[]) return ParseResult(success=True, objects=[])
# Optional date-time # Datetime: 0x09 = octet-string, 0x00 = absent
if pos < len(apdu) and apdu[pos] == 0x09: if pos < len(apdu) and apdu[pos] == 0x09:
pos += 1 pos += 1
dt_len = apdu[pos]; pos += 1 dt_len = apdu[pos]; pos += 1 + dt_len
pos += dt_len
elif pos < len(apdu) and apdu[pos] == 0x00: elif pos < len(apdu) and apdu[pos] == 0x00:
pos += 1 # absent
# Notification body:
# structure(2):
# [0] enum = push type (ignore)
# [1] array(N) of structure(2): [obis_octet_string, value_structure(2): [scaler_unit, value]]
if pos >= len(apdu):
return ParseResult(success=True, objects=[])
# Outer structure tag
if apdu[pos] != 0x02:
_LOGGER.debug("Expected structure (0x02) at pos %d, got 0x%02X", pos, apdu[pos])
return ParseResult(success=True, objects=[])
pos += 1 pos += 1
outer_count, pos = self._decode_length(apdu, pos)
_LOGGER.debug("Outer structure count: %d", outer_count)
# Skip push type (first element, usually enum) # Outer structure(2): skip tag + count
if pos + 2 > len(apdu) or apdu[pos] != 0x02:
return ParseResult(success=True, objects=[])
pos += 2 # 02 02
# Element[0]: enum = push type (skip 2 bytes: 16 XX)
if pos < len(apdu) and apdu[pos] == 0x16: if pos < len(apdu) and apdu[pos] == 0x16:
pos += 2 # enum tag + 1 byte value pos += 2
# Next should be array of COSEM objects # Element[1]: array of captured objects
if pos >= len(apdu) or apdu[pos] != 0x01: if pos >= len(apdu) or apdu[pos] != 0x01:
_LOGGER.debug("Expected array (0x01) at pos %d, got 0x%02X", pos, apdu[pos] if pos < len(apdu) else -1)
return ParseResult(success=True, objects=[]) return ParseResult(success=True, objects=[])
pos += 1 pos += 1
array_count, pos = self._decode_length(apdu, pos) array_count, pos = self._decode_length(apdu, pos)
_LOGGER.debug("Array count: %d objects", array_count) _LOGGER.debug("Array count: %d objects", array_count)
objects = [] objects = []
for i in range(array_count): for i in range(array_count):
if pos >= len(apdu): if pos + 11 > len(apdu):
break break
try: try:
obj, pos = self._parse_cosem_object(apdu, pos) obj, pos = self._parse_xt211_object(apdu, pos)
if obj: if obj:
objects.append(obj) objects.append(obj)
_LOGGER.debug("OBIS %s = %s %s", obj.obis, obj.value, obj.unit)
except Exception as exc: except Exception as exc:
_LOGGER.debug("Error parsing COSEM object %d at pos %d: %s", i, pos, exc) _LOGGER.debug("Error parsing object %d at pos %d: %s", i, pos, exc)
break break
return ParseResult(success=True, objects=objects) return ParseResult(success=True, objects=objects)
def _parse_cosem_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]: def _parse_xt211_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]:
""" """
Parse one COSEM object entry from the push array. Parse one captured object from XT211 push notification.
Expected structure(2): Format per object:
[0] octet-string(6) = OBIS code 02 02 00 - 3-byte structure prefix (ignored)
[1] structure(2): [class_id] - 1 byte
[0] structure(2): [int8 scaler, enum unit] [A B C D E F] - 6-byte raw OBIS (no type tag)
[1] value (any type) [attr_idx] - 1 byte (ignored)
Or simplified structure(2): [type][value] - DLMS type-tagged value
[0] octet-string(6) = OBIS
[1] value directly
""" """
if data[pos] != 0x02: if data[pos] != 0x02:
# Not a structure - skip unknown type _LOGGER.debug("Expected 0x02 at pos %d, got 0x%02X", pos, data[pos])
val, pos = self._decode_value(data, pos) return None, pos + 1
return None, pos
pos += 1 # skip structure tag
count, pos = self._decode_length(data, pos)
if count < 2: pos += 3 # skip: 02 02 00
return None, pos
# Element 0: OBIS code as octet-string # Class ID
if data[pos] != 0x09: pos += 1 # class_id (not needed for value extraction)
val, pos = self._decode_value(data, pos)
return None, pos
pos += 1 # skip octet-string tag
obis_len, pos = self._decode_length(data, pos)
obis_raw = data[pos:pos+obis_len]
pos += obis_len
if len(obis_raw) != 6: # Raw OBIS (6 bytes, no type tag)
# Skip remaining elements if pos + 6 > len(data):
for _ in range(count - 1):
_, pos = self._decode_value(data, pos)
return None, pos return None, pos
obis_raw = data[pos:pos+6]; pos += 6
obis_str = self._format_obis(obis_raw) obis_str = self._format_obis(obis_raw)
# Element 1: value wrapper # Attribute index (skip)
# Can be: structure(2)[scaler_unit, value] OR direct value
scaler = 0
unit_code = 255
value = None
if pos < len(data) and data[pos] == 0x02:
# structure(2): [scaler_unit_struct, value]
pos += 1 # skip structure tag
inner_count, pos = self._decode_length(data, pos)
if inner_count >= 2:
# First inner: scaler+unit structure(2)[int8, enum]
if pos < len(data) and data[pos] == 0x02:
pos += 1 pos += 1
su_count, pos = self._decode_length(data, pos)
if su_count >= 2:
raw_scaler, pos = self._decode_value(data, pos)
raw_unit, pos = self._decode_value(data, pos)
if isinstance(raw_scaler, int):
scaler = raw_scaler if raw_scaler < 128 else raw_scaler - 256
if isinstance(raw_unit, int):
unit_code = raw_unit
# skip extra
for _ in range(su_count - 2):
_, pos = self._decode_value(data, pos)
else:
_, pos = self._decode_value(data, pos)
# Second inner: actual value # Type-tagged value
value, pos = self._decode_value(data, pos) value, pos = self._decode_value(data, pos)
# skip extra inner elements # Convert bytes to string for text objects
for _ in range(inner_count - 2): if isinstance(value, (bytes, bytearray)):
_, pos = self._decode_value(data, pos)
else:
for _ in range(inner_count):
_, pos = self._decode_value(data, pos)
else:
# Direct value
value, pos = self._decode_value(data, pos)
# Skip any extra elements in the outer structure
for _ in range(count - 2):
_, pos = self._decode_value(data, pos)
# Apply scaler
if isinstance(value, int) and scaler != 0:
final_value: Any = apply_scaler(value, scaler)
elif isinstance(value, (bytes, bytearray)):
try: try:
final_value = value.decode("ascii", errors="replace").strip("\x00") value = value.decode("ascii", errors="replace").strip("\x00")
except Exception: except Exception:
final_value = value.hex() value = value.hex()
else:
final_value = value
unit_str = self.UNIT_MAP.get(unit_code, "")
meta = OBIS_DESCRIPTIONS.get(obis_str, {}) meta = OBIS_DESCRIPTIONS.get(obis_str, {})
return DLMSObject( return DLMSObject(
obis=obis_str, obis=obis_str,
value=final_value, value=value,
unit=unit_str or meta.get("unit", ""), unit=meta.get("unit", ""),
scaler=scaler, scaler=0,
), pos ), pos
def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]: def _decode_value(self, data: bytes, pos: int) -> tuple[Any, int]:
"""Recursively decode a DLMS typed value. Returns (value, new_pos).""" """Recursively decode a DLMS typed value. Returns (value, new_pos)."""
if pos >= len(data): if pos >= len(data):

View File

@ -1,7 +1,7 @@
{ {
"domain": "xt211_han", "domain": "xt211_han",
"name": "XT211 HAN (RS485 via Ethernet)", "name": "XT211 HAN (RS485 via Ethernet)",
"version": "0.7.4", "version": "0.7.5",
"documentation": "https://github.com/nero150/xt211-han-ha", "documentation": "https://github.com/nero150/xt211-han-ha",
"issue_tracker": "https://github.com/nero150/xt211-han-ha/issues", "issue_tracker": "https://github.com/nero150/xt211-han-ha/issues",
"dependencies": [], "dependencies": [],