Compare commits
7 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
b80cd0d443 | |
|
|
641a8eb531 | |
|
|
a8b69d2487 | |
|
|
67be2ae88a | |
|
|
8432ae6153 | |
|
|
900aed571b | |
|
|
f96b6e6210 |
13
CHANGELOG.md
13
CHANGELOG.md
|
|
@ -1,5 +1,18 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 0.8.9
|
||||||
|
- opraven HACS release balík pro `content_in_root: true`
|
||||||
|
- release asset `xt211_han.zip` už obsahuje přímo soubory integrace v rootu ZIPu
|
||||||
|
- doplněno `content_in_root` do `hacs.json`
|
||||||
|
- sjednocena verze balíku na 0.8.9
|
||||||
|
|
||||||
|
## 0.8.7
|
||||||
|
- zvýšena verze balíku na 0.8.7
|
||||||
|
- sjednocena verze v kořenovém i integračním `manifest.json`
|
||||||
|
- doplněny GitHub Actions pro HACS validaci a release ZIP asset
|
||||||
|
- vyčištěn README od interní citační značky
|
||||||
|
- připraven kompletní balík repozitáře pro HACS release workflow
|
||||||
|
|
||||||
## 0.8.0
|
## 0.8.0
|
||||||
- doplněno README o reálné nastavení převodníku a screenshoty
|
- doplněno README o reálné nastavení převodníku a screenshoty
|
||||||
- přidána složka `docs/images` se screenshoty konfigurace a výsledku v Home Assistantu
|
- přidána složka `docs/images` se screenshoty konfigurace a výsledku v Home Assistantu
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ Použité nastavení na funkční sestavě:
|
||||||
|
|
||||||
### 3. Kontrola, že převodník opravdu posílá data
|
### 3. Kontrola, že převodník opravdu posílá data
|
||||||
|
|
||||||
Na stavové stránce převodníku je vidět aktivní klient a počitadlo `TX/RX Count`. Pokud **roste TX počet bajtů**, převodník normálně odesílá data z elektroměru do sítě. Na ukázce níže je vidět připojený klient `192.168.0.190` a narůstající `TX Count`, zatímco `RX` zůstává nulové. To odpovídá jednosměrnému provozu elektroměr → zákazník, který je uvedený i v dokumentaci ČEZ. fileciteturn6file0
|
Na stavové stránce převodníku je vidět aktivní klient a počitadlo `TX/RX Count`. Pokud **roste TX počet bajtů**, převodník normálně odesílá data z elektroměru do sítě. Na ukázce níže je vidět připojený klient `192.168.0.190` a narůstající `TX Count`, zatímco `RX` zůstává nulové. To odpovídá jednosměrnému provozu elektroměr → zákazník, který je uvedený i v dokumentaci ČEZ.
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,21 +15,11 @@ from .sensor import BINARY_OBIS, build_enabled_obis, _device_info
|
||||||
from .dlms_parser import OBIS_DESCRIPTIONS
|
from .dlms_parser import OBIS_DESCRIPTIONS
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
|
||||||
hass: HomeAssistant,
|
|
||||||
entry: ConfigEntry,
|
|
||||||
async_add_entities: AddEntitiesCallback,
|
|
||||||
) -> None:
|
|
||||||
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
|
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
|
||||||
enabled_obis = build_enabled_obis(entry)
|
enabled_obis = build_enabled_obis(entry)
|
||||||
|
entities = [XT211BinarySensorEntity(coordinator, entry, obis, meta) for obis, meta in OBIS_DESCRIPTIONS.items() if obis in enabled_obis and obis in BINARY_OBIS]
|
||||||
entities = [
|
|
||||||
XT211BinarySensorEntity(coordinator, entry, obis, meta)
|
|
||||||
for obis, meta in OBIS_DESCRIPTIONS.items()
|
|
||||||
if obis in enabled_obis and obis in BINARY_OBIS
|
|
||||||
]
|
|
||||||
async_add_entities(entities)
|
async_add_entities(entities)
|
||||||
|
|
||||||
registered_obis = {entity._obis for entity in entities}
|
registered_obis = {entity._obis for entity in entities}
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
|
|
|
||||||
|
|
@ -15,59 +15,18 @@ from homeassistant.components import dhcp
|
||||||
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
|
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
|
||||||
from homeassistant.data_entry_flow import FlowResult
|
from homeassistant.data_entry_flow import FlowResult
|
||||||
|
|
||||||
from .const import (
|
from .const import CONF_HAS_FVE, CONF_PHASES, CONF_RELAY_COUNT, CONF_TARIFFS, DEFAULT_NAME, DEFAULT_PORT, DOMAIN, PHASES_1, PHASES_3, RELAYS_0, RELAYS_4, RELAYS_6, TARIFFS_1, TARIFFS_2, TARIFFS_4
|
||||||
CONF_HAS_FVE,
|
|
||||||
CONF_PHASES,
|
|
||||||
CONF_RELAY_COUNT,
|
|
||||||
CONF_TARIFFS,
|
|
||||||
DEFAULT_NAME,
|
|
||||||
DEFAULT_PORT,
|
|
||||||
DOMAIN,
|
|
||||||
PHASES_1,
|
|
||||||
PHASES_3,
|
|
||||||
RELAYS_0,
|
|
||||||
RELAYS_4,
|
|
||||||
RELAYS_6,
|
|
||||||
TARIFFS_1,
|
|
||||||
TARIFFS_2,
|
|
||||||
TARIFFS_4,
|
|
||||||
)
|
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
USR_IOT_MAC_PREFIXES = ("d8b04c", "b4e62d")
|
USR_IOT_MAC_PREFIXES = ("d8b04c", "b4e62d")
|
||||||
MANUAL_CHOICE = "__manual__"
|
MANUAL_CHOICE = "__manual__"
|
||||||
|
STEP_CONNECTION_SCHEMA = vol.Schema({vol.Required(CONF_HOST): str, vol.Required(CONF_PORT, default=DEFAULT_PORT): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str})
|
||||||
STEP_CONNECTION_SCHEMA = vol.Schema(
|
STEP_METER_SCHEMA = vol.Schema({
|
||||||
{
|
vol.Required(CONF_PHASES, default=PHASES_3): vol.In({PHASES_1: "Jednofázový", PHASES_3: "Třífázový"}),
|
||||||
vol.Required(CONF_HOST): str,
|
|
||||||
vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
|
|
||||||
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
STEP_METER_SCHEMA = vol.Schema(
|
|
||||||
{
|
|
||||||
vol.Required(CONF_PHASES, default=PHASES_3): vol.In(
|
|
||||||
{PHASES_1: "Jednofázový", PHASES_3: "Třífázový"}
|
|
||||||
),
|
|
||||||
vol.Required(CONF_HAS_FVE, default=False): bool,
|
vol.Required(CONF_HAS_FVE, default=False): bool,
|
||||||
vol.Required(CONF_TARIFFS, default=TARIFFS_2): vol.In(
|
vol.Required(CONF_TARIFFS, default=TARIFFS_2): vol.In({TARIFFS_1: "Jeden tarif (pouze T1)", TARIFFS_2: "Dva tarify (T1 + T2)", TARIFFS_4: "Čtyři tarify (T1 – T4)"}),
|
||||||
{
|
vol.Required(CONF_RELAY_COUNT, default=RELAYS_4): vol.In({RELAYS_0: "Žádné relé", RELAYS_4: "WM-RelayBox (R1 – R4)", RELAYS_6: "WM-RelayBox rozšířený (R1 – R6)"}),
|
||||||
TARIFFS_1: "Jeden tarif (pouze T1)",
|
})
|
||||||
TARIFFS_2: "Dva tarify (T1 + T2)",
|
|
||||||
TARIFFS_4: "Čtyři tarify (T1 – T4)",
|
|
||||||
}
|
|
||||||
),
|
|
||||||
vol.Required(CONF_RELAY_COUNT, default=RELAYS_4): vol.In(
|
|
||||||
{
|
|
||||||
RELAYS_0: "Žádné relé",
|
|
||||||
RELAYS_4: "WM-RelayBox (R1 – R4)",
|
|
||||||
RELAYS_6: "WM-RelayBox rozšířený (R1 – R6)",
|
|
||||||
}
|
|
||||||
),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str | None:
|
async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str | None:
|
||||||
|
|
@ -80,7 +39,7 @@ async def _test_connection(host: str, port: int, timeout: float = 5.0) -> str |
|
||||||
return "cannot_connect"
|
return "cannot_connect"
|
||||||
except OSError:
|
except OSError:
|
||||||
return "cannot_connect"
|
return "cannot_connect"
|
||||||
except Exception: # pragma: no cover - defensive
|
except Exception:
|
||||||
return "unknown"
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -96,15 +55,12 @@ async def _scan_network(port: int, timeout: float = 1.0) -> list[str]:
|
||||||
local_ip = socket.gethostbyname(socket.gethostname())
|
local_ip = socket.gethostbyname(socket.gethostname())
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if local_ip.startswith("127.") or local_ip == "0.0.0.0":
|
if local_ip.startswith("127.") or local_ip == "0.0.0.0":
|
||||||
local_ip = "192.168.1.1"
|
local_ip = "192.168.1.1"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
network = IPv4Network(f"{local_ip}/24", strict=False)
|
network = IPv4Network(f"{local_ip}/24", strict=False)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
network = IPv4Network("192.168.1.0/24", strict=False)
|
network = IPv4Network("192.168.1.0/24", strict=False)
|
||||||
|
|
||||||
found: list[str] = []
|
found: list[str] = []
|
||||||
|
|
||||||
async def _probe(ip: str) -> None:
|
async def _probe(ip: str) -> None:
|
||||||
|
|
@ -122,7 +78,6 @@ async def _scan_network(port: int, timeout: float = 1.0) -> list[str]:
|
||||||
hosts = [str(host) for host in network.hosts()]
|
hosts = [str(host) for host in network.hosts()]
|
||||||
for index in range(0, len(hosts), 50):
|
for index in range(0, len(hosts), 50):
|
||||||
await asyncio.gather(*(_probe(ip) for ip in hosts[index:index + 50]))
|
await asyncio.gather(*(_probe(ip) for ip in hosts[index:index + 50]))
|
||||||
|
|
||||||
found.sort()
|
found.sort()
|
||||||
_LOGGER.debug("XT211 scan found %d host(s) on port %d: %s", len(found), port, found)
|
_LOGGER.debug("XT211 scan found %d host(s) on port %d: %s", len(found), port, found)
|
||||||
return found
|
return found
|
||||||
|
|
@ -141,11 +96,9 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
mac = discovery_info.macaddress.replace(":", "").lower()
|
mac = discovery_info.macaddress.replace(":", "").lower()
|
||||||
if not any(mac.startswith(prefix) for prefix in USR_IOT_MAC_PREFIXES):
|
if not any(mac.startswith(prefix) for prefix in USR_IOT_MAC_PREFIXES):
|
||||||
return self.async_abort(reason="not_supported")
|
return self.async_abort(reason="not_supported")
|
||||||
|
|
||||||
ip = discovery_info.ip
|
ip = discovery_info.ip
|
||||||
await self.async_set_unique_id(f"{ip}:{DEFAULT_PORT}")
|
await self.async_set_unique_id(f"{ip}:{DEFAULT_PORT}")
|
||||||
self._abort_if_unique_id_configured(updates={CONF_HOST: ip})
|
self._abort_if_unique_id_configured(updates={CONF_HOST: ip})
|
||||||
|
|
||||||
self._discovered_host = ip
|
self._discovered_host = ip
|
||||||
self._discovered_port = DEFAULT_PORT
|
self._discovered_port = DEFAULT_PORT
|
||||||
_LOGGER.info("XT211 HAN: DHCP discovered USR IOT device at %s (MAC %s)", ip, mac)
|
_LOGGER.info("XT211 HAN: DHCP discovered USR IOT device at %s (MAC %s)", ip, mac)
|
||||||
|
|
@ -155,102 +108,41 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
if user_input is not None:
|
if user_input is not None:
|
||||||
error = await _test_connection(self._discovered_host, self._discovered_port)
|
error = await _test_connection(self._discovered_host, self._discovered_port)
|
||||||
if error:
|
if error:
|
||||||
return self.async_show_form(
|
return self.async_show_form(step_id="dhcp_confirm", errors={"base": error}, description_placeholders={"host": self._discovered_host, "port": str(self._discovered_port)})
|
||||||
step_id="dhcp_confirm",
|
self._connection_data = {CONF_HOST: self._discovered_host, CONF_PORT: self._discovered_port, CONF_NAME: DEFAULT_NAME}
|
||||||
errors={"base": error},
|
|
||||||
description_placeholders={
|
|
||||||
"host": self._discovered_host,
|
|
||||||
"port": str(self._discovered_port),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
self._connection_data = {
|
|
||||||
CONF_HOST: self._discovered_host,
|
|
||||||
CONF_PORT: self._discovered_port,
|
|
||||||
CONF_NAME: DEFAULT_NAME,
|
|
||||||
}
|
|
||||||
return await self.async_step_meter()
|
return await self.async_step_meter()
|
||||||
|
return self.async_show_form(step_id="dhcp_confirm", description_placeholders={"host": self._discovered_host, "port": str(self._discovered_port)})
|
||||||
return self.async_show_form(
|
|
||||||
step_id="dhcp_confirm",
|
|
||||||
description_placeholders={
|
|
||||||
"host": self._discovered_host,
|
|
||||||
"port": str(self._discovered_port),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
||||||
if user_input is not None:
|
if user_input is not None:
|
||||||
return await (self.async_step_scan() if user_input.get("method") == "scan" else self.async_step_manual())
|
return await (self.async_step_scan() if user_input.get("method") == "scan" else self.async_step_manual())
|
||||||
|
return self.async_show_form(step_id="user", data_schema=vol.Schema({vol.Required("method", default="scan"): vol.In({"scan": "🔍 Automaticky vyhledat v síti", "manual": "✏️ Zadat IP adresu ručně"})}))
|
||||||
return self.async_show_form(
|
|
||||||
step_id="user",
|
|
||||||
data_schema=vol.Schema(
|
|
||||||
{
|
|
||||||
vol.Required("method", default="scan"): vol.In(
|
|
||||||
{
|
|
||||||
"scan": "🔍 Automaticky vyhledat v síti",
|
|
||||||
"manual": "✏️ Zadat IP adresu ručně",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
async def async_step_scan(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
async def async_step_scan(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
||||||
if user_input is not None:
|
if user_input is not None:
|
||||||
host = user_input[CONF_HOST]
|
host = user_input[CONF_HOST]
|
||||||
if host == MANUAL_CHOICE:
|
if host == MANUAL_CHOICE:
|
||||||
return await self.async_step_manual()
|
return await self.async_step_manual()
|
||||||
|
|
||||||
port = user_input.get(CONF_PORT, DEFAULT_PORT)
|
port = user_input.get(CONF_PORT, DEFAULT_PORT)
|
||||||
name = user_input.get(CONF_NAME, DEFAULT_NAME)
|
name = user_input.get(CONF_NAME, DEFAULT_NAME)
|
||||||
|
|
||||||
await self.async_set_unique_id(f"{host}:{port}")
|
await self.async_set_unique_id(f"{host}:{port}")
|
||||||
self._abort_if_unique_id_configured()
|
self._abort_if_unique_id_configured()
|
||||||
|
|
||||||
error = await _test_connection(host, port)
|
error = await _test_connection(host, port)
|
||||||
if error:
|
if error:
|
||||||
return self.async_show_form(
|
return self.async_show_form(step_id="scan", data_schema=self._scan_schema(port, include_choices=not self._scan_results == []), errors={"base": error})
|
||||||
step_id="scan",
|
|
||||||
data_schema=self._scan_schema(port, include_choices=not self._scan_results == []),
|
|
||||||
errors={"base": error},
|
|
||||||
)
|
|
||||||
|
|
||||||
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
|
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
|
||||||
return await self.async_step_meter()
|
return await self.async_step_meter()
|
||||||
|
|
||||||
self._scan_results = await _scan_network(DEFAULT_PORT)
|
self._scan_results = await _scan_network(DEFAULT_PORT)
|
||||||
if not self._scan_results:
|
if not self._scan_results:
|
||||||
return self.async_show_form(
|
return self.async_show_form(step_id="scan", data_schema=self._scan_schema(DEFAULT_PORT, include_choices=False), errors={"base": "no_devices_found"})
|
||||||
step_id="scan",
|
return self.async_show_form(step_id="scan", data_schema=self._scan_schema(DEFAULT_PORT, include_choices=True))
|
||||||
data_schema=self._scan_schema(DEFAULT_PORT, include_choices=False),
|
|
||||||
errors={"base": "no_devices_found"},
|
|
||||||
)
|
|
||||||
|
|
||||||
return self.async_show_form(
|
|
||||||
step_id="scan",
|
|
||||||
data_schema=self._scan_schema(DEFAULT_PORT, include_choices=True),
|
|
||||||
)
|
|
||||||
|
|
||||||
def _scan_schema(self, port: int, include_choices: bool) -> vol.Schema:
|
def _scan_schema(self, port: int, include_choices: bool) -> vol.Schema:
|
||||||
if include_choices:
|
if include_choices:
|
||||||
choices = {ip: f"{ip}:{port}" for ip in self._scan_results}
|
choices = {ip: f"{ip}:{port}" for ip in self._scan_results}
|
||||||
choices[MANUAL_CHOICE] = "✏️ Zadat IP adresu ručně"
|
choices[MANUAL_CHOICE] = "✏️ Zadat IP adresu ručně"
|
||||||
return vol.Schema(
|
return vol.Schema({vol.Required(CONF_HOST): vol.In(choices), vol.Optional(CONF_PORT, default=port): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str})
|
||||||
{
|
return vol.Schema({vol.Required(CONF_HOST): str, vol.Required(CONF_PORT, default=port): int, vol.Optional(CONF_NAME, default=DEFAULT_NAME): str})
|
||||||
vol.Required(CONF_HOST): vol.In(choices),
|
|
||||||
vol.Optional(CONF_PORT, default=port): int,
|
|
||||||
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return vol.Schema(
|
|
||||||
{
|
|
||||||
vol.Required(CONF_HOST): str,
|
|
||||||
vol.Required(CONF_PORT, default=port): int,
|
|
||||||
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
async def async_step_manual(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
async def async_step_manual(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
||||||
errors: dict[str, str] = {}
|
errors: dict[str, str] = {}
|
||||||
|
|
@ -258,17 +150,14 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
host = user_input[CONF_HOST]
|
host = user_input[CONF_HOST]
|
||||||
port = user_input[CONF_PORT]
|
port = user_input[CONF_PORT]
|
||||||
name = user_input.get(CONF_NAME, DEFAULT_NAME)
|
name = user_input.get(CONF_NAME, DEFAULT_NAME)
|
||||||
|
|
||||||
await self.async_set_unique_id(f"{host}:{port}")
|
await self.async_set_unique_id(f"{host}:{port}")
|
||||||
self._abort_if_unique_id_configured()
|
self._abort_if_unique_id_configured()
|
||||||
|
|
||||||
error = await _test_connection(host, port)
|
error = await _test_connection(host, port)
|
||||||
if error:
|
if error:
|
||||||
errors["base"] = error
|
errors["base"] = error
|
||||||
else:
|
else:
|
||||||
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
|
self._connection_data = {CONF_HOST: host, CONF_PORT: port, CONF_NAME: name}
|
||||||
return await self.async_step_meter()
|
return await self.async_step_meter()
|
||||||
|
|
||||||
return self.async_show_form(step_id="manual", data_schema=STEP_CONNECTION_SCHEMA, errors=errors)
|
return self.async_show_form(step_id="manual", data_schema=STEP_CONNECTION_SCHEMA, errors=errors)
|
||||||
|
|
||||||
async def async_step_meter(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
async def async_step_meter(self, user_input: dict[str, Any] | None = None) -> FlowResult:
|
||||||
|
|
@ -278,5 +167,4 @@ class XT211HANConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
host = data[CONF_HOST]
|
host = data[CONF_HOST]
|
||||||
port = data[CONF_PORT]
|
port = data[CONF_PORT]
|
||||||
return self.async_create_entry(title=f"{name} ({host}:{port})", data=data)
|
return self.async_create_entry(title=f"{name} ({host}:{port})", data=data)
|
||||||
|
|
||||||
return self.async_show_form(step_id="meter", data_schema=STEP_METER_SCHEMA)
|
return self.async_show_form(step_id="meter", data_schema=STEP_METER_SCHEMA)
|
||||||
|
|
|
||||||
|
|
@ -13,16 +13,11 @@ CONF_RELAY_COUNT = "relay_count"
|
||||||
DEFAULT_PORT = 8899
|
DEFAULT_PORT = 8899
|
||||||
DEFAULT_NAME = "XT211 HAN"
|
DEFAULT_NAME = "XT211 HAN"
|
||||||
|
|
||||||
# Phases
|
|
||||||
PHASES_1 = "1"
|
PHASES_1 = "1"
|
||||||
PHASES_3 = "3"
|
PHASES_3 = "3"
|
||||||
|
|
||||||
# Tariff counts
|
|
||||||
TARIFFS_1 = 1
|
TARIFFS_1 = 1
|
||||||
TARIFFS_2 = 2
|
TARIFFS_2 = 2
|
||||||
TARIFFS_4 = 4
|
TARIFFS_4 = 4
|
||||||
|
|
||||||
# Relay counts
|
|
||||||
RELAYS_0 = 0
|
RELAYS_0 = 0
|
||||||
RELAYS_4 = 4
|
RELAYS_4 = 4
|
||||||
RELAYS_6 = 6
|
RELAYS_6 = 6
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||||
from .dlms_parser import DLMSObject, DLMSParser, OBIS_DESCRIPTIONS
|
from .dlms_parser import DLMSObject, DLMSParser, OBIS_DESCRIPTIONS
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
PUSH_TIMEOUT = 90
|
PUSH_TIMEOUT = 90
|
||||||
RECONNECT_DELAY = 10
|
RECONNECT_DELAY = 10
|
||||||
|
|
||||||
|
|
@ -21,12 +20,7 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||||
"""Persistent TCP listener for XT211 DLMS push frames."""
|
"""Persistent TCP listener for XT211 DLMS push frames."""
|
||||||
|
|
||||||
def __init__(self, hass: HomeAssistant, host: str, port: int, name: str) -> None:
|
def __init__(self, hass: HomeAssistant, host: str, port: int, name: str) -> None:
|
||||||
super().__init__(
|
super().__init__(hass, _LOGGER, name=f"XT211 HAN ({host}:{port})", update_interval=None)
|
||||||
hass,
|
|
||||||
_LOGGER,
|
|
||||||
name=f"XT211 HAN ({host}:{port})",
|
|
||||||
update_interval=None,
|
|
||||||
)
|
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.device_name = name
|
self.device_name = name
|
||||||
|
|
@ -79,15 +73,11 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
await self._disconnect()
|
await self._disconnect()
|
||||||
|
|
||||||
await asyncio.sleep(RECONNECT_DELAY)
|
await asyncio.sleep(RECONNECT_DELAY)
|
||||||
|
|
||||||
async def _connect(self) -> None:
|
async def _connect(self) -> None:
|
||||||
_LOGGER.info("Connecting to XT211 adapter at %s:%d", self.host, self.port)
|
_LOGGER.info("Connecting to XT211 adapter at %s:%d", self.host, self.port)
|
||||||
self._reader, self._writer = await asyncio.wait_for(
|
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.port), timeout=10)
|
||||||
asyncio.open_connection(self.host, self.port),
|
|
||||||
timeout=10,
|
|
||||||
)
|
|
||||||
self._parser = DLMSParser()
|
self._parser = DLMSParser()
|
||||||
self._connected = True
|
self._connected = True
|
||||||
_LOGGER.info("Connected to XT211 adapter at %s:%d", self.host, self.port)
|
_LOGGER.info("Connected to XT211 adapter at %s:%d", self.host, self.port)
|
||||||
|
|
@ -105,50 +95,34 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||||
|
|
||||||
async def _receive_loop(self) -> None:
|
async def _receive_loop(self) -> None:
|
||||||
assert self._reader is not None
|
assert self._reader is not None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
chunk = await asyncio.wait_for(self._reader.read(4096), timeout=PUSH_TIMEOUT)
|
chunk = await asyncio.wait_for(self._reader.read(4096), timeout=PUSH_TIMEOUT)
|
||||||
except asyncio.TimeoutError as exc:
|
except asyncio.TimeoutError as exc:
|
||||||
_LOGGER.warning("No data from XT211 for %d s – reconnecting", PUSH_TIMEOUT)
|
_LOGGER.warning("No data from XT211 for %d s – reconnecting", PUSH_TIMEOUT)
|
||||||
raise ConnectionError("Push timeout") from exc
|
raise ConnectionError("Push timeout") from exc
|
||||||
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
_LOGGER.warning("XT211 adapter closed connection")
|
_LOGGER.warning("XT211 adapter closed connection")
|
||||||
raise ConnectionError("Remote closed")
|
raise ConnectionError("Remote closed")
|
||||||
|
|
||||||
_LOGGER.debug("XT211 RX %d bytes: %s", len(chunk), chunk.hex())
|
_LOGGER.debug("XT211 RX %d bytes: %s", len(chunk), chunk.hex())
|
||||||
self._parser.feed(chunk)
|
self._parser.feed(chunk)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
result = self._parser.get_frame()
|
result = self._parser.get_frame()
|
||||||
if result is None:
|
if result is None:
|
||||||
break
|
break
|
||||||
|
|
||||||
self._frames_received += 1
|
self._frames_received += 1
|
||||||
if result.success:
|
if result.success:
|
||||||
_LOGGER.debug(
|
_LOGGER.debug("XT211 frame #%d parsed OK: %d object(s)", self._frames_received, len(result.objects))
|
||||||
"XT211 frame #%d parsed OK: %d object(s)",
|
|
||||||
self._frames_received,
|
|
||||||
len(result.objects),
|
|
||||||
)
|
|
||||||
await self._process_frame(result.objects)
|
await self._process_frame(result.objects)
|
||||||
else:
|
else:
|
||||||
_LOGGER.debug(
|
_LOGGER.debug("XT211 frame #%d parse error: %s (raw: %s)", self._frames_received, result.error, result.raw_hex[:120])
|
||||||
"XT211 frame #%d parse error: %s (raw: %s)",
|
|
||||||
self._frames_received,
|
|
||||||
result.error,
|
|
||||||
result.raw_hex[:120],
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _process_frame(self, objects: list[DLMSObject]) -> None:
|
async def _process_frame(self, objects: list[DLMSObject]) -> None:
|
||||||
if not objects:
|
if not objects:
|
||||||
_LOGGER.debug("Received empty DLMS frame")
|
_LOGGER.debug("Received empty DLMS frame")
|
||||||
return
|
return
|
||||||
|
|
||||||
current = dict(self.data or {})
|
current = dict(self.data or {})
|
||||||
changed: list[str] = []
|
changed: list[str] = []
|
||||||
|
|
||||||
for obj in objects:
|
for obj in objects:
|
||||||
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
|
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
|
||||||
new_value = {
|
new_value = {
|
||||||
|
|
@ -161,11 +135,5 @@ class XT211Coordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||||
changed.append(obj.obis)
|
changed.append(obj.obis)
|
||||||
current[obj.obis] = new_value
|
current[obj.obis] = new_value
|
||||||
_LOGGER.debug("XT211 OBIS %s = %r %s", obj.obis, obj.value, new_value["unit"])
|
_LOGGER.debug("XT211 OBIS %s = %r %s", obj.obis, obj.value, new_value["unit"])
|
||||||
|
|
||||||
self.async_set_updated_data(current)
|
self.async_set_updated_data(current)
|
||||||
_LOGGER.debug(
|
_LOGGER.debug("Coordinator updated with %d object(s), %d changed: %s", len(objects), len(changed), ", ".join(changed[:10]))
|
||||||
"Coordinator updated with %d object(s), %d changed: %s",
|
|
||||||
len(objects),
|
|
||||||
len(changed),
|
|
||||||
", ".join(changed[:10]),
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,7 @@ from dataclasses import dataclass, field
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
HDLC_FLAG = 0x7E
|
HDLC_FLAG = 0x7E
|
||||||
|
|
||||||
DLMS_TYPE_NULL = 0x00
|
DLMS_TYPE_NULL = 0x00
|
||||||
DLMS_TYPE_ARRAY = 0x01
|
DLMS_TYPE_ARRAY = 0x01
|
||||||
DLMS_TYPE_STRUCTURE = 0x02
|
DLMS_TYPE_STRUCTURE = 0x02
|
||||||
|
|
@ -32,7 +30,7 @@ DLMS_TYPE_FLOAT64 = 0x18
|
||||||
|
|
||||||
|
|
||||||
class NeedMoreData(Exception):
|
class NeedMoreData(Exception):
|
||||||
"""Raised when the parser needs more bytes to finish a frame."""
|
pass
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
@ -52,8 +50,6 @@ class ParseResult:
|
||||||
|
|
||||||
|
|
||||||
class DLMSParser:
|
class DLMSParser:
|
||||||
"""Stateful parser for raw DLMS APDUs and HDLC-wrapped frames."""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._buffer = bytearray()
|
self._buffer = bytearray()
|
||||||
|
|
||||||
|
|
@ -61,53 +57,42 @@ class DLMSParser:
|
||||||
self._buffer.extend(data)
|
self._buffer.extend(data)
|
||||||
|
|
||||||
def get_frame(self) -> ParseResult | None:
|
def get_frame(self) -> ParseResult | None:
|
||||||
"""Return one parsed frame from the internal buffer, if available."""
|
|
||||||
if not self._buffer:
|
if not self._buffer:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if self._buffer[0] == HDLC_FLAG:
|
if self._buffer[0] == HDLC_FLAG:
|
||||||
return self._get_hdlc_frame()
|
return self._get_hdlc_frame()
|
||||||
|
|
||||||
start = self._find_apdu_start(self._buffer)
|
start = self._find_apdu_start(self._buffer)
|
||||||
if start == -1:
|
if start == -1:
|
||||||
_LOGGER.debug("Discarding %d bytes without known frame start", len(self._buffer))
|
_LOGGER.debug("Discarding %d bytes without known frame start", len(self._buffer))
|
||||||
self._buffer.clear()
|
self._buffer.clear()
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if start > 0:
|
if start > 0:
|
||||||
_LOGGER.debug("Discarding %d leading byte(s) before APDU", start)
|
_LOGGER.debug("Discarding %d leading byte(s) before APDU", start)
|
||||||
del self._buffer[:start]
|
del self._buffer[:start]
|
||||||
|
|
||||||
if self._buffer and self._buffer[0] == 0x0F:
|
if self._buffer and self._buffer[0] == 0x0F:
|
||||||
return self._get_raw_apdu_frame()
|
return self._get_raw_apdu_frame()
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _get_hdlc_frame(self) -> ParseResult | None:
|
def _get_hdlc_frame(self) -> ParseResult | None:
|
||||||
buf = self._buffer
|
buf = self._buffer
|
||||||
if len(buf) < 3:
|
if len(buf) < 3:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
frame_len = ((buf[1] & 0x07) << 8) | buf[2]
|
frame_len = ((buf[1] & 0x07) << 8) | buf[2]
|
||||||
total = frame_len + 2
|
total = frame_len + 2
|
||||||
if len(buf) < total:
|
if len(buf) < total:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
raw = bytes(buf[:total])
|
raw = bytes(buf[:total])
|
||||||
del buf[:total]
|
del buf[:total]
|
||||||
raw_hex = raw.hex()
|
raw_hex = raw.hex()
|
||||||
|
|
||||||
if raw[0] != HDLC_FLAG or raw[-1] != HDLC_FLAG:
|
if raw[0] != HDLC_FLAG or raw[-1] != HDLC_FLAG:
|
||||||
return ParseResult(success=False, raw_hex=raw_hex, error="Missing HDLC flags")
|
return ParseResult(success=False, raw_hex=raw_hex, error="Missing HDLC flags")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = self._parse_hdlc(raw)
|
result = self._parse_hdlc(raw)
|
||||||
result.raw_hex = raw_hex
|
result.raw_hex = raw_hex
|
||||||
return result
|
return result
|
||||||
except NeedMoreData:
|
except NeedMoreData:
|
||||||
# Should not happen for HDLC because total length is known.
|
|
||||||
return None
|
return None
|
||||||
except Exception as exc: # pragma: no cover - defensive logging
|
except Exception as exc:
|
||||||
_LOGGER.exception("Error parsing HDLC frame")
|
_LOGGER.exception("Error parsing HDLC frame")
|
||||||
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
|
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
|
||||||
|
|
||||||
|
|
@ -117,12 +102,11 @@ class DLMSParser:
|
||||||
result, consumed = self._parse_apdu_with_length(bytes(buf))
|
result, consumed = self._parse_apdu_with_length(bytes(buf))
|
||||||
except NeedMoreData:
|
except NeedMoreData:
|
||||||
return None
|
return None
|
||||||
except Exception as exc: # pragma: no cover - defensive logging
|
except Exception as exc:
|
||||||
raw_hex = bytes(buf).hex()
|
raw_hex = bytes(buf).hex()
|
||||||
_LOGGER.exception("Error parsing raw DLMS APDU")
|
_LOGGER.exception("Error parsing raw DLMS APDU")
|
||||||
del buf[:]
|
del buf[:]
|
||||||
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
|
return ParseResult(success=False, raw_hex=raw_hex, error=str(exc))
|
||||||
|
|
||||||
raw = bytes(buf[:consumed])
|
raw = bytes(buf[:consumed])
|
||||||
del buf[:consumed]
|
del buf[:consumed]
|
||||||
result.raw_hex = raw.hex()
|
result.raw_hex = raw.hex()
|
||||||
|
|
@ -130,16 +114,14 @@ class DLMSParser:
|
||||||
|
|
||||||
def _parse_hdlc(self, raw: bytes) -> ParseResult:
|
def _parse_hdlc(self, raw: bytes) -> ParseResult:
|
||||||
pos = 1
|
pos = 1
|
||||||
pos += 2 # frame format
|
pos += 2
|
||||||
_, pos = self._read_hdlc_address(raw, pos)
|
_, pos = self._read_hdlc_address(raw, pos)
|
||||||
_, pos = self._read_hdlc_address(raw, pos)
|
_, pos = self._read_hdlc_address(raw, pos)
|
||||||
pos += 1 # control
|
pos += 1
|
||||||
pos += 2 # HCS
|
pos += 2
|
||||||
|
|
||||||
if pos + 3 > len(raw) - 3:
|
if pos + 3 > len(raw) - 3:
|
||||||
raise ValueError("Frame too short for LLC")
|
raise ValueError("Frame too short for LLC")
|
||||||
|
pos += 3
|
||||||
pos += 3 # LLC header
|
|
||||||
apdu = raw[pos:-3]
|
apdu = raw[pos:-3]
|
||||||
result, _ = self._parse_apdu_with_length(apdu)
|
result, _ = self._parse_apdu_with_length(apdu)
|
||||||
return result
|
return result
|
||||||
|
|
@ -164,15 +146,12 @@ class DLMSParser:
|
||||||
raise ValueError(f"Unexpected APDU tag 0x{apdu[0]:02X}")
|
raise ValueError(f"Unexpected APDU tag 0x{apdu[0]:02X}")
|
||||||
if len(apdu) < 6:
|
if len(apdu) < 6:
|
||||||
raise NeedMoreData
|
raise NeedMoreData
|
||||||
|
|
||||||
pos = 1
|
pos = 1
|
||||||
invoke_id = struct.unpack_from(">I", apdu, pos)[0]
|
invoke_id = struct.unpack_from(">I", apdu, pos)[0]
|
||||||
pos += 4
|
pos += 4
|
||||||
_LOGGER.debug("XT211 invoke_id=0x%08X", invoke_id)
|
_LOGGER.debug("XT211 invoke_id=0x%08X", invoke_id)
|
||||||
|
|
||||||
if pos >= len(apdu):
|
if pos >= len(apdu):
|
||||||
raise NeedMoreData
|
raise NeedMoreData
|
||||||
|
|
||||||
if apdu[pos] == DLMS_TYPE_OCTET_STRING:
|
if apdu[pos] == DLMS_TYPE_OCTET_STRING:
|
||||||
pos += 1
|
pos += 1
|
||||||
dt_len, pos = self._decode_length(apdu, pos)
|
dt_len, pos = self._decode_length(apdu, pos)
|
||||||
|
|
@ -180,7 +159,6 @@ class DLMSParser:
|
||||||
pos += dt_len
|
pos += dt_len
|
||||||
elif apdu[pos] == DLMS_TYPE_NULL:
|
elif apdu[pos] == DLMS_TYPE_NULL:
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
self._require(apdu, pos, 2)
|
self._require(apdu, pos, 2)
|
||||||
if apdu[pos] != DLMS_TYPE_STRUCTURE:
|
if apdu[pos] != DLMS_TYPE_STRUCTURE:
|
||||||
return ParseResult(success=True, objects=[]), pos
|
return ParseResult(success=True, objects=[]), pos
|
||||||
|
|
@ -188,7 +166,6 @@ class DLMSParser:
|
||||||
pos += 2
|
pos += 2
|
||||||
if structure_count < 2:
|
if structure_count < 2:
|
||||||
return ParseResult(success=True, objects=[]), pos
|
return ParseResult(success=True, objects=[]), pos
|
||||||
|
|
||||||
if pos >= len(apdu):
|
if pos >= len(apdu):
|
||||||
raise NeedMoreData
|
raise NeedMoreData
|
||||||
if apdu[pos] == DLMS_TYPE_ENUM:
|
if apdu[pos] == DLMS_TYPE_ENUM:
|
||||||
|
|
@ -196,20 +173,17 @@ class DLMSParser:
|
||||||
pos += 2
|
pos += 2
|
||||||
else:
|
else:
|
||||||
_, pos = self._decode_value(apdu, pos)
|
_, pos = self._decode_value(apdu, pos)
|
||||||
|
|
||||||
if pos >= len(apdu):
|
if pos >= len(apdu):
|
||||||
raise NeedMoreData
|
raise NeedMoreData
|
||||||
if apdu[pos] != DLMS_TYPE_ARRAY:
|
if apdu[pos] != DLMS_TYPE_ARRAY:
|
||||||
return ParseResult(success=True, objects=[]), pos
|
return ParseResult(success=True, objects=[]), pos
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
array_count, pos = self._decode_length(apdu, pos)
|
array_count, pos = self._decode_length(apdu, pos)
|
||||||
objects: list[DLMSObject] = []
|
objects: list[DLMSObject] = []
|
||||||
for _ in range(array_count):
|
for _ in range(array_count):
|
||||||
obj, pos = self._parse_xt211_object(apdu, pos)
|
obj, pos = self._parse_xt211_object(apdu, pos)
|
||||||
if obj is not None:
|
if obj is not None:
|
||||||
objects.append(obj)
|
objects.append(obj)
|
||||||
|
|
||||||
return ParseResult(success=True, objects=objects), pos
|
return ParseResult(success=True, objects=objects), pos
|
||||||
|
|
||||||
def _parse_xt211_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]:
|
def _parse_xt211_object(self, data: bytes, pos: int) -> tuple[DLMSObject | None, int]:
|
||||||
|
|
@ -217,13 +191,9 @@ class DLMSParser:
|
||||||
if data[pos] != DLMS_TYPE_STRUCTURE:
|
if data[pos] != DLMS_TYPE_STRUCTURE:
|
||||||
raise ValueError(f"Expected object structure at {pos}, got 0x{data[pos]:02X}")
|
raise ValueError(f"Expected object structure at {pos}, got 0x{data[pos]:02X}")
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
count, pos = self._decode_length(data, pos)
|
count, pos = self._decode_length(data, pos)
|
||||||
if count < 1:
|
if count < 1:
|
||||||
raise ValueError(f"Unexpected object element count {count}")
|
raise ValueError(f"Unexpected object element count {count}")
|
||||||
|
|
||||||
# XT211 measurement objects use a raw descriptor layout:
|
|
||||||
# 02 02 00 [class_id_hi class_id_lo] [6B OBIS] [attr_idx] [typed value]
|
|
||||||
if pos < len(data) and data[pos] == 0x00:
|
if pos < len(data) and data[pos] == 0x00:
|
||||||
if pos + 10 > len(data):
|
if pos + 10 > len(data):
|
||||||
raise NeedMoreData
|
raise NeedMoreData
|
||||||
|
|
@ -231,34 +201,17 @@ class DLMSParser:
|
||||||
pos += 2
|
pos += 2
|
||||||
obis_raw = bytes(data[pos:pos + 6])
|
obis_raw = bytes(data[pos:pos + 6])
|
||||||
pos += 6
|
pos += 6
|
||||||
_attr_idx = data[pos]
|
|
||||||
pos += 1
|
pos += 1
|
||||||
value, pos = self._decode_value(data, pos)
|
value, pos = self._decode_value(data, pos)
|
||||||
|
|
||||||
if isinstance(value, (bytes, bytearray)):
|
if isinstance(value, (bytes, bytearray)):
|
||||||
try:
|
try:
|
||||||
value = bytes(value).decode("ascii", errors="replace").strip("\x00")
|
value = bytes(value).decode("ascii", errors="replace").strip("\x00")
|
||||||
except Exception:
|
except Exception:
|
||||||
value = bytes(value).hex()
|
value = bytes(value).hex()
|
||||||
|
|
||||||
obis = self._format_obis(obis_raw)
|
obis = self._format_obis(obis_raw)
|
||||||
meta = OBIS_DESCRIPTIONS.get(obis, {})
|
meta = OBIS_DESCRIPTIONS.get(obis, {})
|
||||||
_LOGGER.debug(
|
_LOGGER.debug("Parsed XT211 object class_id=%s obis=%s value=%r unit=%s", class_id, obis, value, meta.get("unit", ""))
|
||||||
"Parsed XT211 object class_id=%s obis=%s value=%r unit=%s",
|
return DLMSObject(obis=obis, value=value, unit=meta.get("unit", ""), scaler=0), pos
|
||||||
class_id,
|
|
||||||
obis,
|
|
||||||
value,
|
|
||||||
meta.get("unit", ""),
|
|
||||||
)
|
|
||||||
return DLMSObject(
|
|
||||||
obis=obis,
|
|
||||||
value=value,
|
|
||||||
unit=meta.get("unit", ""),
|
|
||||||
scaler=0,
|
|
||||||
), pos
|
|
||||||
|
|
||||||
# Short housekeeping frames use simple typed structures without OBIS.
|
|
||||||
# Consume them cleanly and ignore them.
|
|
||||||
last_value: Any = None
|
last_value: Any = None
|
||||||
for _ in range(count):
|
for _ in range(count):
|
||||||
last_value, pos = self._decode_value(data, pos)
|
last_value, pos = self._decode_value(data, pos)
|
||||||
|
|
@ -269,7 +222,6 @@ class DLMSParser:
|
||||||
self._require(data, pos, 1)
|
self._require(data, pos, 1)
|
||||||
dtype = data[pos]
|
dtype = data[pos]
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
if dtype == DLMS_TYPE_NULL:
|
if dtype == DLMS_TYPE_NULL:
|
||||||
return None, pos
|
return None, pos
|
||||||
if dtype == DLMS_TYPE_BOOL:
|
if dtype == DLMS_TYPE_BOOL:
|
||||||
|
|
@ -320,7 +272,6 @@ class DLMSParser:
|
||||||
item, pos = self._decode_value(data, pos)
|
item, pos = self._decode_value(data, pos)
|
||||||
items.append(item)
|
items.append(item)
|
||||||
return items, pos
|
return items, pos
|
||||||
|
|
||||||
raise ValueError(f"Unknown DLMS type 0x{dtype:02X} at pos {pos - 1}")
|
raise ValueError(f"Unknown DLMS type 0x{dtype:02X} at pos {pos - 1}")
|
||||||
|
|
||||||
def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]:
|
def _decode_length(self, data: bytes, pos: int) -> tuple[int, int]:
|
||||||
|
|
@ -354,7 +305,7 @@ class DLMSParser:
|
||||||
return f"{a}-{b}:{c}.{d}.{e}.{f}"
|
return f"{a}-{b}:{c}.{d}.{e}.{f}"
|
||||||
|
|
||||||
|
|
||||||
OBIS_DESCRIPTIONS: dict[str, dict[str, str]] = {
|
OBIS_DESCRIPTIONS = {
|
||||||
"0-0:42.0.0.255": {"name": "Název zařízení", "unit": "", "class": "text"},
|
"0-0:42.0.0.255": {"name": "Název zařízení", "unit": "", "class": "text"},
|
||||||
"0-0:96.1.0.255": {"name": "Výrobní číslo", "unit": "", "class": "text"},
|
"0-0:96.1.0.255": {"name": "Výrobní číslo", "unit": "", "class": "text"},
|
||||||
"0-0:96.1.1.255": {"name": "Výrobní číslo", "unit": "", "class": "text"},
|
"0-0:96.1.1.255": {"name": "Výrobní číslo", "unit": "", "class": "text"},
|
||||||
|
|
@ -381,5 +332,5 @@ OBIS_DESCRIPTIONS: dict[str, dict[str, str]] = {
|
||||||
"1-0:1.8.3.255": {"name": "Spotřeba energie T3", "unit": "Wh", "class": "energy"},
|
"1-0:1.8.3.255": {"name": "Spotřeba energie T3", "unit": "Wh", "class": "energy"},
|
||||||
"1-0:1.8.4.255": {"name": "Spotřeba energie T4", "unit": "Wh", "class": "energy"},
|
"1-0:1.8.4.255": {"name": "Spotřeba energie T4", "unit": "Wh", "class": "energy"},
|
||||||
"1-0:2.8.0.255": {"name": "Dodávka energie celkem", "unit": "Wh", "class": "energy"},
|
"1-0:2.8.0.255": {"name": "Dodávka energie celkem", "unit": "Wh", "class": "energy"},
|
||||||
"0-0:96.13.0.255": {"name": "Zpráva pro zákazníka", "unit": "", "class": "text"},
|
"0-0:96.13.0.255": {"name": "Zpráva pro zákazníka", "unit": "", "class": "text"}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
{
|
{
|
||||||
"domain": "xt211_han",
|
"domain": "xt211_han",
|
||||||
"name": "XT211 HAN (RS485 via Ethernet)",
|
"name": "XT211 HAN (RS485 via Ethernet)",
|
||||||
"version": "0.8.0",
|
"version": "0.9.0",
|
||||||
"documentation": "https://github.com/nero150/xt211-han-ha",
|
"documentation": "https://github.com/nero150/CEZ_rele_box",
|
||||||
"issue_tracker": "https://github.com/nero150/xt211-han-ha/issues",
|
"issue_tracker": "https://github.com/nero150/CEZ_rele_box/issues",
|
||||||
"dependencies": [],
|
"dependencies": [],
|
||||||
"codeowners": [
|
"codeowners": [
|
||||||
"@nero150"
|
"@nero150"
|
||||||
|
|
|
||||||
|
|
@ -10,73 +10,27 @@ from homeassistant.helpers.device_registry import DeviceInfo
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||||
|
|
||||||
from .const import (
|
from .const import CONF_HAS_FVE, CONF_PHASES, CONF_RELAY_COUNT, CONF_TARIFFS, DOMAIN, PHASES_3, RELAYS_4, TARIFFS_2
|
||||||
CONF_HAS_FVE,
|
|
||||||
CONF_PHASES,
|
|
||||||
CONF_RELAY_COUNT,
|
|
||||||
CONF_TARIFFS,
|
|
||||||
DOMAIN,
|
|
||||||
PHASES_3,
|
|
||||||
RELAYS_4,
|
|
||||||
TARIFFS_2,
|
|
||||||
)
|
|
||||||
from .coordinator import XT211Coordinator
|
from .coordinator import XT211Coordinator
|
||||||
from .dlms_parser import OBIS_DESCRIPTIONS
|
from .dlms_parser import OBIS_DESCRIPTIONS
|
||||||
|
|
||||||
SENSOR_META: dict[str, dict] = {
|
SENSOR_META = {
|
||||||
"power": {
|
"power": {"device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, "unit": UnitOfPower.WATT},
|
||||||
"device_class": SensorDeviceClass.POWER,
|
"energy": {"device_class": SensorDeviceClass.ENERGY, "state_class": SensorStateClass.TOTAL_INCREASING, "unit": UnitOfEnergy.KILO_WATT_HOUR},
|
||||||
"state_class": SensorStateClass.MEASUREMENT,
|
"sensor": {"device_class": None, "state_class": SensorStateClass.MEASUREMENT, "unit": None},
|
||||||
"unit": UnitOfPower.WATT,
|
|
||||||
},
|
|
||||||
"energy": {
|
|
||||||
"device_class": SensorDeviceClass.ENERGY,
|
|
||||||
"state_class": SensorStateClass.TOTAL_INCREASING,
|
|
||||||
"unit": UnitOfEnergy.KILO_WATT_HOUR,
|
|
||||||
},
|
|
||||||
"sensor": {
|
|
||||||
"device_class": None,
|
|
||||||
"state_class": SensorStateClass.MEASUREMENT,
|
|
||||||
"unit": None,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SERIAL_OBIS = ("0-0:96.1.1.255", "0-0:96.1.0.255")
|
SERIAL_OBIS = ("0-0:96.1.1.255", "0-0:96.1.0.255")
|
||||||
PRECREATED_TEXT_ENTITIES = {
|
PRECREATED_TEXT_ENTITIES = {
|
||||||
"serial_number": {
|
"serial_number": {"name": "Výrobní číslo", "obises": SERIAL_OBIS, "entity_category": EntityCategory.DIAGNOSTIC},
|
||||||
"name": "Výrobní číslo",
|
"current_tariff": {"name": "Aktuální tarif", "obises": ("0-0:96.14.0.255",), "entity_category": EntityCategory.DIAGNOSTIC},
|
||||||
"obises": SERIAL_OBIS,
|
|
||||||
"entity_category": EntityCategory.DIAGNOSTIC,
|
|
||||||
},
|
|
||||||
"current_tariff": {
|
|
||||||
"name": "Aktuální tarif",
|
|
||||||
"obises": ("0-0:96.14.0.255",),
|
|
||||||
"entity_category": EntityCategory.DIAGNOSTIC,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
DYNAMIC_TEXT_OBIS = {
|
|
||||||
"0-0:42.0.0.255",
|
|
||||||
"0-0:96.13.0.255",
|
|
||||||
}
|
|
||||||
BINARY_OBIS = {
|
|
||||||
"0-0:96.3.10.255",
|
|
||||||
"0-1:96.3.10.255",
|
|
||||||
"0-2:96.3.10.255",
|
|
||||||
"0-3:96.3.10.255",
|
|
||||||
"0-4:96.3.10.255",
|
|
||||||
"0-5:96.3.10.255",
|
|
||||||
"0-6:96.3.10.255",
|
|
||||||
}
|
}
|
||||||
|
DYNAMIC_TEXT_OBIS = {"0-0:42.0.0.255", "0-0:96.13.0.255"}
|
||||||
|
BINARY_OBIS = {"0-0:96.3.10.255", "0-1:96.3.10.255", "0-2:96.3.10.255", "0-3:96.3.10.255", "0-4:96.3.10.255", "0-5:96.3.10.255", "0-6:96.3.10.255"}
|
||||||
TEXT_OBIS = set().union(*(spec["obises"] for spec in PRECREATED_TEXT_ENTITIES.values()), DYNAMIC_TEXT_OBIS)
|
TEXT_OBIS = set().union(*(spec["obises"] for spec in PRECREATED_TEXT_ENTITIES.values()), DYNAMIC_TEXT_OBIS)
|
||||||
|
|
||||||
|
|
||||||
def _device_info(entry: ConfigEntry) -> DeviceInfo:
|
def _device_info(entry: ConfigEntry) -> DeviceInfo:
|
||||||
return DeviceInfo(
|
return DeviceInfo(identifiers={(DOMAIN, entry.entry_id)}, name=entry.data.get(CONF_NAME, "XT211 HAN"), manufacturer="Sagemcom", model="XT211 AMM")
|
||||||
identifiers={(DOMAIN, entry.entry_id)},
|
|
||||||
name=entry.data.get(CONF_NAME, "XT211 HAN"),
|
|
||||||
manufacturer="Sagemcom",
|
|
||||||
model="XT211 AMM",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def build_enabled_obis(entry: ConfigEntry) -> set[str]:
|
def build_enabled_obis(entry: ConfigEntry) -> set[str]:
|
||||||
|
|
@ -84,60 +38,29 @@ def build_enabled_obis(entry: ConfigEntry) -> set[str]:
|
||||||
has_fve = entry.data.get(CONF_HAS_FVE, True)
|
has_fve = entry.data.get(CONF_HAS_FVE, True)
|
||||||
tariffs = int(entry.data.get(CONF_TARIFFS, TARIFFS_2))
|
tariffs = int(entry.data.get(CONF_TARIFFS, TARIFFS_2))
|
||||||
relay_count = int(entry.data.get(CONF_RELAY_COUNT, RELAYS_4))
|
relay_count = int(entry.data.get(CONF_RELAY_COUNT, RELAYS_4))
|
||||||
|
enabled_obis = {"0-0:17.0.0.255", "1-0:1.7.0.255", "1-0:1.8.0.255", *TEXT_OBIS}
|
||||||
enabled_obis: set[str] = {
|
relay_obis = {1: "0-1:96.3.10.255", 2: "0-2:96.3.10.255", 3: "0-3:96.3.10.255", 4: "0-4:96.3.10.255", 5: "0-5:96.3.10.255", 6: "0-6:96.3.10.255"}
|
||||||
"0-0:17.0.0.255",
|
|
||||||
"1-0:1.7.0.255",
|
|
||||||
"1-0:1.8.0.255",
|
|
||||||
*TEXT_OBIS,
|
|
||||||
}
|
|
||||||
|
|
||||||
relay_obis = {
|
|
||||||
1: "0-1:96.3.10.255",
|
|
||||||
2: "0-2:96.3.10.255",
|
|
||||||
3: "0-3:96.3.10.255",
|
|
||||||
4: "0-4:96.3.10.255",
|
|
||||||
5: "0-5:96.3.10.255",
|
|
||||||
6: "0-6:96.3.10.255",
|
|
||||||
}
|
|
||||||
enabled_obis.add("0-0:96.3.10.255")
|
enabled_obis.add("0-0:96.3.10.255")
|
||||||
for idx in range(1, relay_count + 1):
|
for idx in range(1, relay_count + 1):
|
||||||
enabled_obis.add(relay_obis[idx])
|
enabled_obis.add(relay_obis[idx])
|
||||||
|
|
||||||
if phases == PHASES_3:
|
if phases == PHASES_3:
|
||||||
enabled_obis.update({"1-0:21.7.0.255", "1-0:41.7.0.255", "1-0:61.7.0.255"})
|
enabled_obis.update({"1-0:21.7.0.255", "1-0:41.7.0.255", "1-0:61.7.0.255"})
|
||||||
|
|
||||||
if has_fve:
|
if has_fve:
|
||||||
enabled_obis.add("1-0:2.7.0.255")
|
enabled_obis.add("1-0:2.7.0.255")
|
||||||
enabled_obis.add("1-0:2.8.0.255")
|
enabled_obis.add("1-0:2.8.0.255")
|
||||||
if phases == PHASES_3:
|
if phases == PHASES_3:
|
||||||
enabled_obis.update({"1-0:22.7.0.255", "1-0:42.7.0.255", "1-0:62.7.0.255"})
|
enabled_obis.update({"1-0:22.7.0.255", "1-0:42.7.0.255", "1-0:62.7.0.255"})
|
||||||
|
|
||||||
for tariff in range(1, tariffs + 1):
|
for tariff in range(1, tariffs + 1):
|
||||||
enabled_obis.add(f"1-0:1.8.{tariff}.255")
|
enabled_obis.add(f"1-0:1.8.{tariff}.255")
|
||||||
|
|
||||||
return enabled_obis
|
return enabled_obis
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
|
||||||
hass: HomeAssistant,
|
|
||||||
entry: ConfigEntry,
|
|
||||||
async_add_entities: AddEntitiesCallback,
|
|
||||||
) -> None:
|
|
||||||
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
|
coordinator: XT211Coordinator = hass.data[DOMAIN][entry.entry_id]
|
||||||
enabled_obis = build_enabled_obis(entry)
|
enabled_obis = build_enabled_obis(entry)
|
||||||
|
entities = [XT211SensorEntity(coordinator, entry, obis, meta) for obis, meta in OBIS_DESCRIPTIONS.items() if obis in enabled_obis and obis not in BINARY_OBIS and obis not in TEXT_OBIS]
|
||||||
entities = [
|
entities.extend(XT211AliasedTextSensorEntity(coordinator, entry, key, spec) for key, spec in PRECREATED_TEXT_ENTITIES.items())
|
||||||
XT211SensorEntity(coordinator, entry, obis, meta)
|
|
||||||
for obis, meta in OBIS_DESCRIPTIONS.items()
|
|
||||||
if obis in enabled_obis and obis not in BINARY_OBIS and obis not in TEXT_OBIS
|
|
||||||
]
|
|
||||||
entities.extend(
|
|
||||||
XT211AliasedTextSensorEntity(coordinator, entry, key, spec)
|
|
||||||
for key, spec in PRECREATED_TEXT_ENTITIES.items()
|
|
||||||
)
|
|
||||||
async_add_entities(entities)
|
async_add_entities(entities)
|
||||||
|
|
||||||
registered_obis = {entity._obis for entity in entities if hasattr(entity, "_obis")}
|
registered_obis = {entity._obis for entity in entities if hasattr(entity, "_obis")}
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
Do této složky patří screenshoty použité v README:
|
||||||
|
- converter_network_settings.png
|
||||||
|
- converter_serial_settings.png
|
||||||
|
- converter_status_tx.png
|
||||||
|
- home_assistant_entities.png
|
||||||
|
|
||||||
|
V tomto generovaném balíku nejsou původní binární obrázky přiložené.
|
||||||
|
Doplň je sem z původního repozitáře před finálním vydáním, pokud je chceš zachovat i v release archivu.
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
Do této složky patří podkladové PDF soubory zmíněné v README:
|
||||||
|
- cez_rs485_han_interface.pdf
|
||||||
|
- cez_obis_codes_han_2025-02-01.pdf
|
||||||
|
|
||||||
|
V tomto generovaném balíku nejsou původní PDF přiložená.
|
||||||
|
Doplň je sem z původního repozitáře před finálním vydáním, pokud je chceš zachovat i v release archivu.
|
||||||
|
|
@ -6,5 +6,6 @@
|
||||||
"iot_class": "local_push",
|
"iot_class": "local_push",
|
||||||
"render_readme": true,
|
"render_readme": true,
|
||||||
"zip_release": true,
|
"zip_release": true,
|
||||||
"filename": "xt211_han.zip"
|
"filename": "xt211_han.zip",
|
||||||
|
"content_in_root": true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"domain": "xt211_han",
|
"domain": "xt211_han",
|
||||||
"name": "XT211 HAN (RS485 via Ethernet)",
|
"name": "XT211 HAN (RS485 via Ethernet)",
|
||||||
"version": "0.8.2",
|
"version": "0.8.9",
|
||||||
"documentation": "https://github.com/nero150/CEZ_rele_box",
|
"documentation": "https://github.com/nero150/CEZ_rele_box",
|
||||||
"issue_tracker": "https://github.com/nero150/CEZ_rele_box/issues",
|
"issue_tracker": "https://github.com/nero150/CEZ_rele_box/issues",
|
||||||
"dependencies": [],
|
"dependencies": [],
|
||||||
|
|
|
||||||
|
|
@ -1,24 +1,13 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Standalone test / debug script for the DLMS parser and TCP listener.
|
Standalone test / debug script for the DLMS parser and TCP listener.
|
||||||
|
|
||||||
Usage:
|
|
||||||
# Parse a raw hex frame from the meter (paste from HA debug log):
|
|
||||||
python3 test_parser.py --hex "7ea0...7e"
|
|
||||||
|
|
||||||
# Live listen on TCP socket (forward output to terminal):
|
|
||||||
python3 test_parser.py --host 192.168.1.100 --port 8899
|
|
||||||
|
|
||||||
# Replay a saved binary capture file:
|
|
||||||
python3 test_parser.py --file capture.bin
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import asyncio
|
import asyncio
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
# Allow running from repo root without installing
|
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "custom_components"))
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "custom_components"))
|
||||||
|
|
||||||
from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS
|
from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS
|
||||||
|
|
@ -26,12 +15,12 @@ from xt211_han.dlms_parser import DLMSParser, OBIS_DESCRIPTIONS
|
||||||
|
|
||||||
def print_result(result) -> None:
|
def print_result(result) -> None:
|
||||||
if not result.success:
|
if not result.success:
|
||||||
print(f" ❌ Parse error: {result.error}")
|
print(f" Parse error: {result.error}")
|
||||||
return
|
return
|
||||||
if not result.objects:
|
if not result.objects:
|
||||||
print(" ⚠️ Frame OK but no DLMS objects extracted")
|
print(" Frame OK but no DLMS objects extracted")
|
||||||
return
|
return
|
||||||
print(f" ✅ {len(result.objects)} OBIS objects decoded:")
|
print(f" {len(result.objects)} OBIS objects decoded:")
|
||||||
for obj in result.objects:
|
for obj in result.objects:
|
||||||
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
|
meta = OBIS_DESCRIPTIONS.get(obj.obis, {})
|
||||||
name = meta.get("name", obj.obis)
|
name = meta.get("name", obj.obis)
|
||||||
|
|
@ -40,43 +29,22 @@ def print_result(result) -> None:
|
||||||
|
|
||||||
|
|
||||||
def test_hex(hex_str: str) -> None:
|
def test_hex(hex_str: str) -> None:
|
||||||
"""Parse a single hex-encoded frame."""
|
|
||||||
raw = bytes.fromhex(hex_str.replace(" ", "").replace("\n", ""))
|
raw = bytes.fromhex(hex_str.replace(" ", "").replace("\n", ""))
|
||||||
print(f"\n📦 Frame: {len(raw)} bytes")
|
print(f"\nFrame: {len(raw)} bytes")
|
||||||
parser = DLMSParser()
|
parser = DLMSParser()
|
||||||
parser.feed(raw)
|
parser.feed(raw)
|
||||||
result = parser.get_frame()
|
result = parser.get_frame()
|
||||||
if result:
|
if result:
|
||||||
print_result(result)
|
print_result(result)
|
||||||
else:
|
else:
|
||||||
print(" ⚠️ No complete frame found in data")
|
print(" No complete frame found in data")
|
||||||
|
|
||||||
|
|
||||||
def test_file(path: str) -> None:
|
|
||||||
"""Parse all frames from a binary capture file."""
|
|
||||||
with open(path, "rb") as f:
|
|
||||||
data = f.read()
|
|
||||||
print(f"\n📂 File: {path} ({len(data)} bytes)")
|
|
||||||
parser = DLMSParser()
|
|
||||||
parser.feed(data)
|
|
||||||
count = 0
|
|
||||||
while True:
|
|
||||||
result = parser.get_frame()
|
|
||||||
if result is None:
|
|
||||||
break
|
|
||||||
count += 1
|
|
||||||
print(f"\n--- Frame #{count} ---")
|
|
||||||
print_result(result)
|
|
||||||
print(f"\nTotal frames parsed: {count}")
|
|
||||||
|
|
||||||
|
|
||||||
async def listen_tcp(host: str, port: int) -> None:
|
async def listen_tcp(host: str, port: int) -> None:
|
||||||
"""Connect to the TCP adapter and print decoded frames as they arrive."""
|
print(f"\nConnecting to {host}:{port} ...")
|
||||||
print(f"\n🔌 Connecting to {host}:{port} ...")
|
|
||||||
reader, writer = await asyncio.open_connection(host, port)
|
reader, writer = await asyncio.open_connection(host, port)
|
||||||
print(" Connected. Waiting for DLMS PUSH frames (every ~60 s)...\n")
|
print("Connected. Waiting for DLMS PUSH frames...\n")
|
||||||
parser = DLMSParser()
|
parser = DLMSParser()
|
||||||
frame_count = 0
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
chunk = await asyncio.wait_for(reader.read(4096), timeout=120)
|
chunk = await asyncio.wait_for(reader.read(4096), timeout=120)
|
||||||
|
|
@ -88,8 +56,6 @@ async def listen_tcp(host: str, port: int) -> None:
|
||||||
result = parser.get_frame()
|
result = parser.get_frame()
|
||||||
if result is None:
|
if result is None:
|
||||||
break
|
break
|
||||||
frame_count += 1
|
|
||||||
print(f"\n--- Frame #{frame_count} raw: {result.raw_hex[:40]}... ---")
|
|
||||||
print_result(result)
|
print_result(result)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
print("No data for 120 s, giving up.")
|
print("No data for 120 s, giving up.")
|
||||||
|
|
@ -101,15 +67,12 @@ def main() -> None:
|
||||||
parser = argparse.ArgumentParser(description="XT211 DLMS parser test tool")
|
parser = argparse.ArgumentParser(description="XT211 DLMS parser test tool")
|
||||||
group = parser.add_mutually_exclusive_group(required=True)
|
group = parser.add_mutually_exclusive_group(required=True)
|
||||||
group.add_argument("--hex", help="Hex-encoded raw frame to parse")
|
group.add_argument("--hex", help="Hex-encoded raw frame to parse")
|
||||||
group.add_argument("--file", help="Binary capture file to parse")
|
|
||||||
group.add_argument("--host", help="Adapter IP address for live TCP test")
|
group.add_argument("--host", help="Adapter IP address for live TCP test")
|
||||||
parser.add_argument("--port", type=int, default=8899, help="TCP port (default 8899)")
|
parser.add_argument("--port", type=int, default=8899, help="TCP port (default 8899)")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.hex:
|
if args.hex:
|
||||||
test_hex(args.hex)
|
test_hex(args.hex)
|
||||||
elif args.file:
|
|
||||||
test_file(args.file)
|
|
||||||
elif args.host:
|
elif args.host:
|
||||||
asyncio.run(listen_tcp(args.host, args.port))
|
asyncio.run(listen_tcp(args.host, args.port))
|
||||||
|
|
||||||
|
|
|
||||||
Binary file not shown.
Loading…
Reference in New Issue