test component for dlms_parser lib

main
Tomer27cz 2026-04-03 23:19:43 +02:00
parent 8d5caa54f1
commit 1186a8d38c
6 changed files with 413 additions and 0 deletions

View File

@ -0,0 +1,71 @@
import re
import esphome.codegen as cg
from esphome.components import uart
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_RECEIVE_TIMEOUT, PLATFORM_ESP32, PLATFORM_ESP8266
CODEOWNERS = ["@Tomer27cz"]
DEPENDENCIES = ["uart"]
CONF_DLMS_METER_ID = "dlms_meter_id"
CONF_OBIS_CODE = "obis_code"
CONF_DECRYPTION_KEY = "decryption_key"
CONF_CUSTOM_PATTERN = "custom_pattern"
dlms_meter_component_ns = cg.esphome_ns.namespace("dlms_meter_lib")
DlmsMeterLibComponent = dlms_meter_component_ns.class_(
"DlmsMeterLibComponent", cg.Component, uart.UARTDevice
)
def validate_key(value):
value = cv.string_strict(value)
if len(value) != 32:
raise cv.Invalid("Decryption key must be 32 hex characters (16 bytes)")
try:
bytes.fromhex(value)
except ValueError as exc:
raise cv.Invalid("Decryption key must be hex values from 00 to FF") from exc
return value
def obis_code(value):
value = cv.string(value)
# Validate standard OBIS format: A.B.C.D.E.F (e.g., 1.0.1.8.0.255)
match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value)
if match is None:
raise cv.Invalid(
f"{value} is not a valid OBIS code (expected format: A.B.C.D.E.F)"
)
return value
CONFIG_SCHEMA = cv.All(
cv.Schema(
{
cv.GenerateID(): cv.declare_id(DlmsMeterLibComponent),
cv.Optional(CONF_RECEIVE_TIMEOUT, default="50ms"): cv.positive_time_period_milliseconds,
cv.Required(CONF_DECRYPTION_KEY): validate_key,
cv.Optional(CONF_CUSTOM_PATTERN, default=""): cv.string,
}
)
.extend(uart.UART_DEVICE_SCHEMA)
.extend(cv.COMPONENT_SCHEMA),
cv.only_on([PLATFORM_ESP8266, PLATFORM_ESP32]),
)
FINAL_VALIDATE_SCHEMA = uart.final_validate_device_schema(
"dlms_meter_lib", require_rx=True
)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
await uart.register_uart_device(var, config)
cg.add(var.set_receive_timeout(config[CONF_RECEIVE_TIMEOUT]))
cg.add(var.set_decryption_key(config[CONF_DECRYPTION_KEY]))
cg.add(var.set_custom_pattern(config[CONF_CUSTOM_PATTERN]))
cg.add_library("dlms_parser", None, "https://github.com/esphome-libs/dlms_parser")

View File

@ -0,0 +1,21 @@
import esphome.codegen as cg
from esphome.components import binary_sensor
import esphome.config_validation as cv
from esphome.const import CONF_DLMS_METER_ID, CONF_OBIS_CODE
from . import DlmsMeterLibComponent, obis_code
DEPENDENCIES = ["dlms_meter_lib"]
CONFIG_SCHEMA = binary_sensor.binary_sensor_schema().extend(
{
cv.GenerateID(CONF_DLMS_METER_ID): cv.use_id(DlmsMeterLibComponent),
cv.Required(CONF_OBIS_CODE): obis_code,
}
)
async def to_code(config):
hub = await cg.get_variable(config[CONF_DLMS_METER_ID])
var = await binary_sensor.new_binary_sensor(config)
cg.add(hub.register_binary_sensor(config[CONF_OBIS_CODE], var))

View File

@ -0,0 +1,181 @@
#include "dlms_meter_lib.h"
#include <cstdio>
namespace esphome::dlms_meter_lib
{
static constexpr const char *TAG = "dlms_meter_lib";
static void log_callback(dlms_parser::LogLevel level, const char *fmt, va_list args) {
static char buf[256];
vsnprintf(buf, sizeof(buf), fmt, args);
switch (level) {
case dlms_parser::LogLevel::ERROR:
ESP_LOGE(TAG, "%s", buf);
break;
case dlms_parser::LogLevel::WARNING:
ESP_LOGW(TAG, "%s", buf);
break;
case dlms_parser::LogLevel::INFO:
ESP_LOGI(TAG, "%s", buf);
break;
case dlms_parser::LogLevel::VERBOSE:
ESP_LOGV(TAG, "%s", buf);
break;
default:
ESP_LOGVV(TAG, "%s", buf);
break;
}
}
void DlmsMeterLibComponent::setup() {
dlms_parser::Logger::set_log_function(log_callback);
this->parser_.load_default_patterns();
this->rx_buffer_ = std::make_unique<uint8_t[]>(MAX_RX_BUFFER_SIZE);
this->rx_buffer_len_ = 0;
}
void DlmsMeterLibComponent::dump_config() {
ESP_LOGCONFIG(TAG, "DLMS METER LIB Component:");
ESP_LOGCONFIG(TAG, " Receive Timeout: %u ms", this->receive_timeout_ms_);
if (!this->custom_pattern_.empty()) {
ESP_LOGCONFIG(TAG, " Custom Pattern: %s", this->custom_pattern_.c_str());
}
#ifdef USE_SENSOR
for (const auto &entry : this->sensors_) {
LOG_SENSOR(" ", "Numeric Sensor (OBIS)", entry.sensor);
ESP_LOGCONFIG(TAG, " OBIS: %s", entry.obis.c_str());
}
#endif
#ifdef USE_TEXT_SENSOR
for (const auto &entry : this->text_sensors_) {
LOG_TEXT_SENSOR(" ", "Text Sensor (OBIS)", entry.sensor);
ESP_LOGCONFIG(TAG, " OBIS: %s", entry.obis.c_str());
}
#endif
#ifdef USE_BINARY_SENSOR
for (const auto &entry : this->binary_sensors_) {
LOG_BINARY_SENSOR(" ", "Binary Sensor (OBIS)", entry.sensor);
ESP_LOGCONFIG(TAG, " OBIS: %s", entry.obis.c_str());
}
#endif
}
void DlmsMeterLibComponent::set_decryption_key(const char *hex_key) {
auto key = dlms_parser::Aes128GcmDecryptionKey::from_hex(hex_key);
if (key) {
this->parser_.set_decryption_key(*key);
} else {
ESP_LOGE(TAG, "Invalid decryption key");
}
}
void DlmsMeterLibComponent::loop() {
this->read_rx_buffer_();
if (this->receiving_ && App.get_loop_component_start_time() - this->last_rx_char_time_ > this->receive_timeout_ms_) {
this->process_frame_();
}
}
void DlmsMeterLibComponent::read_rx_buffer_() {
int available = this->available();
if (available == 0)
return;
this->receiving_ = true;
this->last_rx_char_time_ = App.get_loop_component_start_time();
while (this->available()) {
if (this->rx_buffer_len_ >= MAX_RX_BUFFER_SIZE) {
ESP_LOGW(TAG, "RX Buffer overflow. Frame too large! Truncating.");
break;
}
uint8_t byte;
if (this->read_byte(&byte)) {
this->rx_buffer_[this->rx_buffer_len_++] = byte;
} else {
ESP_LOGW(TAG, "Failed to read byte from UART.");
break;
}
}
}
void DlmsMeterLibComponent::process_frame_() {
if (this->rx_buffer_len_ == 0)
return;
ESP_LOGD(TAG, "PUSH frame size: %zu bytes", this->rx_buffer_len_);
auto callback = [this](const char *obis_code, float float_val, const char *str_val, bool is_numeric) {
this->on_data_(obis_code, float_val, str_val, is_numeric);
};
this->parser_->parse(this->rx_buffer_.get(), this->rx_buffer_len_, callback);
this->rx_buffer_len_ = 0;
}
void DlmsMeterLibComponent::on_data_(const char *obis_code, float float_val, const char *str_val, bool is_numeric) {
int updated_count = 0;
#ifdef USE_SENSOR
if (is_numeric) {
for (const auto &entry : this->sensors_) {
if (entry.obis == obis_code) {
ESP_LOGD(TAG, "Found sensor for OBIS code %s: '%s'", obis_code, entry.sensor->get_name().c_str());
ESP_LOGD(TAG, "Publishing data");
entry.sensor->publish_state(float_val);
updated_count++;
}
}
}
#endif
#ifdef USE_TEXT_SENSOR
for (const auto &entry : this->text_sensors_) {
if (entry.obis == obis_code) {
ESP_LOGD(TAG, "Found sensor for OBIS code %s: '%s'", obis_code, entry.sensor->get_name().c_str());
ESP_LOGD(TAG, "Publishing data");
entry.sensor->publish_state(str_val);
updated_count++;
}
}
#endif
#ifdef USE_BINARY_SENSOR
if (is_numeric) {
bool state = float_val != 0.0f;
for (const auto &entry : this->binary_sensors_) {
if (entry.obis == obis_code) {
ESP_LOGD(TAG, "Found sensor for OBIS code %s: '%s'", obis_code, entry.sensor->get_name().c_str());
ESP_LOGD(TAG, "Publishing data");
entry.sensor->publish_state(state);
updated_count++;
}
}
}
#endif
if (updated_count == 0) {
ESP_LOGV(TAG, "Received OBIS %s, but no sensors are registered for it.", obis_code);
}
}
#ifdef USE_SENSOR
void DlmsMeterLibComponent::register_sensor(const std::string &obis_code, sensor::Sensor *sensor) {
this->sensors_.push_back({obis_code, sensor});
}
#endif
#ifdef USE_TEXT_SENSOR
void DlmsMeterLibComponent::register_text_sensor(const std::string &obis_code, text_sensor::TextSensor *sensor) {
this->text_sensors_.push_back({obis_code, sensor});
}
#endif
#ifdef USE_BINARY_SENSOR
void DlmsMeterLibComponent::register_binary_sensor(const std::string &obis_code, binary_sensor::BinarySensor *sensor) {
this->binary_sensors_.push_back({obis_code, sensor});
}
#endif
} // namespace esphome::dlms_meter

View File

@ -0,0 +1,98 @@
#include "esphome/core/component.h"
#include "esphome/core/defines.h"
#include "esphome/core/log.h"
#ifdef USE_SENSOR
#include "esphome/components/sensor/sensor.h"
#endif
#ifdef USE_TEXT_SENSOR
#include "esphome/components/text_sensor/text_sensor.h"
#endif
#ifdef USE_BINARY_SENSOR
#include "esphome/components/binary_sensor/binary_sensor.h"
#endif
#include <dlms_parser/dlms_parser.h>
#if defined(USE_ESP8266_FRAMEWORK_ARDUINO)
#include <dlms_parser/decryption/aes_128_gcm_decryptor_bearssl.h>
#elif defined(USE_ESP32)
#include <esp_idf_version.h>
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(6, 0, 0)
#include <dlms_parser/decryption/aes_128_gcm_decryptor_tfpsa.h>
#else
#include <dlms_parser/decryption/aes_128_gcm_decryptor_mbedtls.h>
#endif
#else
#error "Unsupported platform for dlms_meter_lib"
#endif
#include <vector>
#include <string>
#include <array>
#include <memory>
namespace esphome::dlms_meter_lib {
class DlmsMeterLibComponent : public Component, public uart::UARTDevice {
public:
DlmsMeterLibComponent() = default;
void setup() override;
void dump_config() override;
void loop() override;
void set_decryption_key(const char *hex_key);
void set_receive_timeout(uint32_t timeout_ms) { this->receive_timeout_ms_ = timeout_ms; }
void set_custom_pattern(const std::string &pattern) { this->custom_pattern_ = pattern; }
#ifdef USE_SENSOR
void register_sensor(const std::string &obis_code, sensor::Sensor *sensor);
#endif
#ifdef USE_TEXT_SENSOR
void register_text_sensor(const std::string &obis_code, text_sensor::TextSensor *sensor);
#endif
#ifdef USE_BINARY_SENSOR
void register_binary_sensor(const std::string &obis_code, binary_sensor::BinarySensor *sensor);
#endif
protected:
void read_rx_buffer_();
void process_frame_();
void on_data_(const char *obis_code, float float_val, const char *str_val, bool is_numeric);
static constexpr size_t MAX_RX_BUFFER_SIZE = 2048;
std::unique_ptr<uint8_t[]> rx_buffer_;
size_t rx_buffer_len_{0};
uint32_t last_rx_char_time_{0};
bool receiving_{false};
uint32_t receive_timeout_ms_{50};
std::string custom_pattern_{""};
std::unique_ptr<DlmsParser> parser_;
#ifdef USE_SENSOR
struct NumericSensorEntry {
std::string obis;
sensor::Sensor *sensor;
};
std::vector<NumericSensorEntry> sensors_;
#endif
#ifdef USE_TEXT_SENSOR
struct TextSensorEntry {
std::string obis;
text_sensor::TextSensor *sensor;
};
std::vector<TextSensorEntry> text_sensors_;
#endif
#ifdef USE_BINARY_SENSOR
struct BinarySensorEntry {
std::string obis;
binary_sensor::BinarySensor *sensor;
};
std::vector<BinarySensorEntry> binary_sensors_;
#endif
};
} // namespace esphome::dlms_meter_lib

View File

@ -0,0 +1,21 @@
import esphome.codegen as cg
from esphome.components import sensor
import esphome.config_validation as cv
from esphome.const import CONF_DLMS_METER_ID, CONF_OBIS_CODE
from . import DlmsMeterLibComponent, obis_code
DEPENDENCIES = ["dlms_meter_lib"]
CONFIG_SCHEMA = sensor.sensor_schema().extend(
{
cv.GenerateID(CONF_DLMS_METER_ID): cv.use_id(DlmsMeterLibComponent),
cv.Required(CONF_OBIS_CODE): obis_code,
}
)
async def to_code(config):
hub = await cg.get_variable(config[CONF_DLMS_METER_ID])
var = await sensor.new_sensor(config)
cg.add(hub.register_sensor(config[CONF_OBIS_CODE], var))

View File

@ -0,0 +1,21 @@
import esphome.codegen as cg
from esphome.components import text_sensor
import esphome.config_validation as cv
from esphome.const import CONF_DLMS_METER_ID, CONF_OBIS_CODE
from . import DlmsMeterLibComponent, obis_code
DEPENDENCIES = ["dlms_meter_lib"]
CONFIG_SCHEMA = text_sensor.text_sensor_schema().extend(
{
cv.GenerateID(CONF_DLMS_METER_ID): cv.use_id(DlmsMeterLibComponent),
cv.Required(CONF_OBIS_CODE): obis_code,
}
)
async def to_code(config):
hub = await cg.get_variable(config[CONF_DLMS_METER_ID])
var = await text_sensor.new_text_sensor(config)
cg.add(hub.register_text_sensor(config[CONF_OBIS_CODE], var))