diff --git a/luxtronik/cfi/vector.py b/luxtronik/cfi/vector.py index 16f8d4b4..8b6c9892 100644 --- a/luxtronik/cfi/vector.py +++ b/luxtronik/cfi/vector.py @@ -13,8 +13,71 @@ class DataVectorConfig(DataVector): """Specialized DataVector for Luxtronik configuration fields.""" + def _init_instance(self, safe): + """Re-usable method to initialize all instance variables.""" + super()._init_instance(safe) + def __init__(self, safe=True): - super().__init__() - self.safe = safe + """ + Initialize the data-vector instance. + Creates field objects for definitions and stores them in the data vector. + + Args: + safe (bool): If true, prevent fields marked as + not secure from being written to. + """ + self._init_instance(safe) + + # Add all available fields for d in self.definitions: - self._data.add(d, d.create_field()) \ No newline at end of file + self._data.add(d, d.create_field()) + + @classmethod + def empty(cls, safe=True): + """ + Initialize the data-vector instance without any fields. + + Args: + safe (bool): If true, prevent fields marked as + not secure from being written to. + """ + obj = cls.__new__(cls) # this don't call __init__() + obj._init_instance(safe) + return obj + + def add(self, def_field_name_or_idx, alias=None): + """ + Adds an additional field to this data vector. + Mainly used for data vectors created via `empty()` + to read/write individual fields. Existing fields will not be overwritten. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field to add. Either by definition, name or index, or the field itself. + alias (Hashable | None): Alias, which can be used to access the field again. + + Returns: + Base | None: The added field object if this could be added or + the existing field, otherwise None. In case a field + + Note: + It is not possible to add fields which are not defined. + To add custom fields, add them to the used `LuxtronikDefinitionsList` + (`cls.definitions`) first. + If multiple fields added for the same index/name, the last added takes precedence. + """ + # Look-up the related definition + definition, field = self._get_definition(def_field_name_or_idx, True) + if definition is None: + return None + + # Check if the field already exists + existing_field = self._data.get(definition, None) + if existing_field is not None: + return existing_field + + # Add a (new) field + if field is None: + field = definition.create_field() + self._data.add_sorted(definition, field, alias) + return field \ No newline at end of file diff --git a/luxtronik/constants.py b/luxtronik/constants.py index 894b3d31..65185d75 100644 --- a/luxtronik/constants.py +++ b/luxtronik/constants.py @@ -17,7 +17,3 @@ # Since version 3.92.0, all unavailable 16 bit signed data fields # have been returning this value (0x7FFF) LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE: Final = 32767 - -LUXTRONIK_NAME_CHECK_NONE: Final = "none" -LUXTRONIK_NAME_CHECK_PREFERRED: Final = "preferred" -LUXTRONIK_NAME_CHECK_OBSOLETE: Final = "obsolete" diff --git a/luxtronik/data_vector.py b/luxtronik/data_vector.py index 45077dbd..efa2c41d 100644 --- a/luxtronik/data_vector.py +++ b/luxtronik/data_vector.py @@ -2,13 +2,9 @@ import logging -from luxtronik.constants import ( - LUXTRONIK_NAME_CHECK_PREFERRED, - LUXTRONIK_NAME_CHECK_OBSOLETE, -) - from luxtronik.collections import LuxtronikFieldsDictionary -from luxtronik.datatypes import Base +from luxtronik.datatypes import Base, Unknown +from luxtronik.definitions import LuxtronikDefinition LOGGER = logging.getLogger(__name__) @@ -19,15 +15,104 @@ ############################################################################### class DataVector: - """Class that holds a vector of data entries.""" + """ + Class that holds a vector of data fields. + + Provides access to fields by name, index or alias. + To use aliases, they must first be registered here (locally = only valid + for this vector) or directly in the `LuxtronikDefinitionsList` + (globally = valid for all newly created vector). + """ name = "DataVector" + # DataVector specific list of definitions as `LuxtronikDefinitionsList` + definitions = None # override this + _obsolete = {} + +# Field construction methods ################################################## + + @classmethod + def create_unknown_field(cls, idx): + """ + Create an unknown field object. + Be careful! The used controller firmware + may not support this field. + + Args: + idx (int): Register index. + + Returns: + Unknown: A field instance of type `Unknown`. + """ + return Unknown(f"unknown_{cls.name}_{idx}", False) + + @classmethod + def create_any_field(cls, def_name_or_idx): + """ + Create a field object from an available definition + (= included in class variable `cls.definitions`). + Be careful! The used controller firmware + may not support this field. + + If `def_name_or_idx` + - is a definition -> create the field from the provided definition + - is a name -> lookup the definition by name and create the field + - is a idx -> lookup definition by index and create the field + + Args: + def_name_or_idx (LuxtronikDefinition | str | int): Definitions object, + field name or register index. + + Returns: + Base | None: The created field, or None if not found or not valid. + """ + if isinstance(def_name_or_idx, LuxtronikDefinition): + definition = def_name_or_idx + else: + # The definitions object hold all available definitions + definition = cls.definitions.get(def_name_or_idx) + if definition is not None and definition.valid: + return definition.create_field() + return None + + def create_field(self, def_name_or_idx): + """ + Create a field object from a version-dependent definition (= included in + class variable `cls.definitions` and is valid for `self.version`). + + If `def_name_or_idx` + - is a definition -> create the field from the provided definition + - is a name -> lookup the definition by name and create the field + - is a idx -> lookup definition by index and create the field + + Args: + def_name_or_idx (str | int): Definitions object, + field name or register index. + + Returns: + Base | None: The created field, or None if not found or not valid. + """ + definition, _ = self._get_definition(def_name_or_idx, False) + if definition is not None and definition.valid: + return definition.create_field() + return None + + +# constructor, magic methods and iterators #################################### + + def _init_instance(self, safe): + """Re-usable method to initialize all instance variables.""" + self.safe = safe + + # Dictionary that holds all fields + self._data = LuxtronikFieldsDictionary() + def __init__(self): """Initialize DataVector class.""" - self._data = LuxtronikFieldsDictionary() + self._init_instance(True) @property def data(self): @@ -86,73 +171,110 @@ def items(self): """ return iter(self._data.items()) - def _name_lookup(self, name): + +# Alias methods ############################################################### + + def register_alias(self, def_field_name_or_idx, alias): + """ + Forward the `LuxtronikFieldsDictionary.register_alias` method. + Please check its documentation. + """ + return self._data.register_alias(def_field_name_or_idx, alias) + + +# Get and set methods ######################################################### + + def _get_definition(self, def_field_name_or_idx, all_not_version_dependent): """ - Try to find the index using the given field name. + Look-up a definition by name, index, a field instance or by the definition itself. + + If `def_field_name_or_idx` + - is a definition -> lookup the definition by the definition's name + - is a field -> lookup the definition by the field's name + - is a name -> lookup the field by the name + - is a idx -> lookup the field by the index Args: - name (string): Field name. + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Definition object, field object, field name or register index. + all_not_version_dependent (bool): If true, look up the definition + within the `cls.definitions` otherwise within `self._data` (which + contain all definitions related to all added fields) Returns: - tuple[int | None, str | None]: - 0: Index found or None - 1: New preferred name, if available, otherwise None + tuple[LuxtronikDefinition | None, Base | None]: + A definition-field-pair tuple: + Index 0: Return the found or given definitions, otherwise None + Index 1: Return the given field, otherwise None """ - obsolete_entry = self._obsolete.get(name, None) - if obsolete_entry: - return None, obsolete_entry - for definition, field in self._data.items(): - check_result = field.check_name(name) - if check_result == LUXTRONIK_NAME_CHECK_PREFERRED: - return definition.index, None - elif check_result == LUXTRONIK_NAME_CHECK_OBSOLETE: - return definition.index, field.name - return None, None - - def _lookup(self, target, with_index=False): - """ - Lookup an entry - - "target" could either be its id or its name. - - In case "with_index" is set, also the index is returned. - """ - if isinstance(target, str): - try: - # Try to get entry by id - target_index = int(target) - except ValueError: - # Get entry by name - target_index, new_name = self._name_lookup(target) - if new_name is not None: - raise KeyError(f"The name '{target}' is obsolete! Use '{new_name}' instead.") - elif isinstance(target, int): - # Get entry by id - target_index = target - else: - target_index = None + definition = def_field_name_or_idx + field = None + if isinstance(def_field_name_or_idx, Base): + # In case we got a field, search for the description by the field name + definition = def_field_name_or_idx.name + field = def_field_name_or_idx + if not isinstance(def_field_name_or_idx, LuxtronikDefinition): + if all_not_version_dependent: + # definitions contains all available definitions + definition = self.definitions.get(definition) + else: + # _data.def_dict contains only valid and previously added definitions + definition = self._data.def_dict.get(definition) + return definition, field + + def get(self, def_field_name_or_idx, default=None): + """ + Retrieve an added field by definition, field, name or register index. + Triggers a key error when we try to query obsolete fields. + + If `def_field_name_or_idx` + - is a definition -> lookup the field by the definition + - is a field -> lookup the field by the field's name + - is a name -> lookup the field by the name + - is a idx -> lookup the field by the index + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Definition, name, or register index to be used to search for the field. - target_entry = self._data.get(target_index, None) - if target_entry is None: - LOGGER.warning("entry '%s' not found", target) - if with_index: - return target_index, target_entry - return target_entry + Returns: + Base | None: The field found or the provided default if not found. - def get(self, target): - """Get entry by id or name.""" - entry = self._lookup(target) - return entry + Note: + If multiple fields added for the same index/name, + the last added takes precedence. + """ + # check for obsolete + obsolete_entry = self._obsolete.get(def_field_name_or_idx, None) + if obsolete_entry: + raise KeyError(f"The name '{def_field_name_or_idx}' is obsolete! Use '{obsolete_entry}' instead.") + # look-up the field + field = self._data.get(def_field_name_or_idx, default) + if field is None: + LOGGER.warning(f"entry '{def_field_name_or_idx}' not found") + return field - def set(self, target, value): + def set(self, def_field_name_or_idx, value): """ - Set the value of a field to the given value. + Set the data of a field to the given value. The value is set, even if the field marked as non-writeable. No data validation is performed either. + + If `def_field_name_or_idx` + - is a definition -> lookup the field by the definition and set the value + - is a field -> set the value of this field + - is a name -> lookup the field by the name and set the value + - is a idx -> lookup the field by the index and set the value + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | int | str): + Definition, name, or register index to be used to search for the field. + It is also possible to pass the field itself. + value (int | List[int]): Value to set """ - field = target + field = def_field_name_or_idx if not isinstance(field, Base): - field = self.get(target) + field = self.get(def_field_name_or_idx) if field is not None: field.value = value \ No newline at end of file diff --git a/luxtronik/datatypes.py b/luxtronik/datatypes.py index f4ee08f6..9f66aee6 100755 --- a/luxtronik/datatypes.py +++ b/luxtronik/datatypes.py @@ -5,11 +5,6 @@ import socket import struct -from luxtronik.constants import ( - LUXTRONIK_NAME_CHECK_NONE, - LUXTRONIK_NAME_CHECK_PREFERRED, - LUXTRONIK_NAME_CHECK_OBSOLETE, -) from luxtronik.common import classproperty from functools import total_ordering @@ -65,19 +60,6 @@ def name(self): """Return the (most common) name of the entry.""" return self._names[0] - def check_name(self, name): - """ - Check whether a name matches one of the supported entry names. - The result string can be used to trigger a exception for obsolete names. - """ - name_lower = name.lower() - if name_lower == self.name.lower(): - return LUXTRONIK_NAME_CHECK_PREFERRED - elif name_lower in (n.lower() for n in self._names): - return LUXTRONIK_NAME_CHECK_OBSOLETE - else: - return LUXTRONIK_NAME_CHECK_NONE - @property def value(self): """Return the stored value converted from heatpump units.""" diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index c191799b..a4550ae8 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -2,10 +2,7 @@ import logging from luxtronik.common import version_in_range -from luxtronik.collections import LuxtronikFieldsDictionary from luxtronik.data_vector import DataVector -from luxtronik.datatypes import Base, Unknown -from luxtronik.definitions import LuxtronikDefinition from luxtronik.shi.constants import LUXTRONIK_LATEST_SHI_VERSION from luxtronik.shi.contiguous import ContiguousDataBlockList @@ -18,82 +15,13 @@ ############################################################################### class DataVectorSmartHome(DataVector): - """ - Specialized DataVector for Luxtronik smart home fields. - - Provides access to fields by name, index or alias. - To use aliases, they must first be registered here (locally = only valid - for this vector) or directly in the `LuxtronikDefinitionsList` - (globally = valid for all newly created vector). - """ - - # DataVector specific list of definitions as `LuxtronikDefinitionsList` - definitions = None # override this - -# Field construction methods ################################################## - - @classmethod - def create_unknown_field(cls, idx): - """ - Create an unknown field object. - Be careful! The used controller firmware - may not support this field. - - Args: - idx (int): Register index. - - Returns: - Unknown: A field instance of type 'Unknown'. - """ - return Unknown(f"unknown_{cls.name}_{idx}", False) - - @classmethod - def create_any_field(cls, name_or_idx): - """ - Create a field object from an available definition - (= included in class variable `cls.definitions`). - Be careful! The used controller firmware - may not support this field. - - Args: - name_or_idx (str | int): Field name or register index. - - Returns: - Base | None: The created field, or None if not found or not valid. - """ - # The definitions object hold all available definitions - definition = cls.definitions.get(name_or_idx) - if definition is not None and definition.valid: - return definition.create_field() - return None - - def create_field(self, name_or_idx): - """ - Create a field object from a version-dependent definition (= included in - class variable `cls.definitions` and is valid for `self.version`). - - Args: - name_or_idx (str | int): Field name or register index. - - Returns: - Base | None: The created field, or None if not found or not valid. - """ - definition, _ = self._get_definition(name_or_idx, False) - if definition is not None and definition.valid: - return definition.create_field() - return None - - -# Constructors and magic methods ############################################## + """Specialized DataVector for Luxtronik smart home fields.""" def _init_instance(self, version, safe): """Re-usable method to initialize all instance variables.""" - self.safe = safe + super()._init_instance(safe) self._version = version - # Dictionary that holds all fields - self._data = LuxtronikFieldsDictionary() - # Instead of re-create the block-list on every read, we just update it # on first time used or on next time used if some fields are added. self._read_blocks_up_to_date = False @@ -143,46 +71,10 @@ def empty(cls, version=LUXTRONIK_LATEST_SHI_VERSION, safe=True): obj._init_instance(version, safe) return obj - -# properties and access methods ############################################### - @property def version(self): return self._version - -# Find, add and alias methods ################################################# - - def _get_definition(self, def_field_name_or_idx, all_not_version_dependent): - """ - Look-up a definition by name, index, a field instance or by the definition itself. - - Args: - def_field_name_or_idx (LuxtronikDefinition | Base | str | int): - Definition object, field object, field name or register index. - all_not_version_dependent (bool): If true, look up the definition - within the `cls.definitions` otherwise within `self.def_dict` (which - contain all definitions related to all added fields) - - Returns: - tuple[LuxtronikDefinition | None, Base | None]: - A definition-field-pair tuple: - Index 0: Return the found or given definitions, otherwise None - Index 1: Return the given field, otherwise None - """ - definition = def_field_name_or_idx - field = None - if isinstance(def_field_name_or_idx, Base): - definition = def_field_name_or_idx.name - field = def_field_name_or_idx - if not isinstance(def_field_name_or_idx, LuxtronikDefinition): - if all_not_version_dependent: - definition = self.definitions.get(definition) - else: - # def_dict contains only valid and addable definitions - definition = self._data.def_dict.get(definition) - return definition, field - def add(self, def_field_name_or_idx, alias=None): """ Adds an additional version-dependent field (= included in class variable @@ -224,26 +116,6 @@ def add(self, def_field_name_or_idx, alias=None): return field return None - def register_alias(self, def_field_name_or_idx, alias): - """ - Add an alternative name (or anything hashable else) - that can be used to access a specific field. - - Args: - def_field_name_or_idx (LuxtronikDefinition | Base | str | int): - Field to which the alias is to be added. - Either by definition, name, register index, or the field itself. - alias (Hashable): Alias, which can be used to access the field again. - - Returns: - Base | None: The field to which the alias was added, - or None if not possible - """ - return self._data.register_alias(def_field_name_or_idx, alias) - - -# Data-blocks methods ######################################################### - def update_read_blocks(self): """ (Re-)Create the data block list (`ContiguousDataBlockList`) for read-operations. @@ -257,22 +129,3 @@ def update_read_blocks(self): self._read_blocks.collect(definition, field) self._read_blocks_up_to_date = True - -# Data and access methods ##################################################### - - def get(self, def_name_or_idx, default=None): - """ - Retrieve a field by definition, name or register index. - - Args: - def_name_or_idx (LuxtronikDefinition | str | int): - Definition, name, or register index to be used to search for the field. - - Returns: - Base | None: The field found or the provided default if not found. - - Note: - If multiple fields added for the same index/name, - the last added takes precedence. - """ - return self._data.get(def_name_or_idx, default) \ No newline at end of file diff --git a/tests/cfi/test_cfi_parameters.py b/tests/cfi/test_cfi_parameters.py index 20a327e1..74a95021 100644 --- a/tests/cfi/test_cfi_parameters.py +++ b/tests/cfi/test_cfi_parameters.py @@ -44,26 +44,18 @@ def test_get(self): assert parameters.get("0").name == s assert parameters.get(s).name == s - def test__lookup(self): - """Test cases for _lookup""" - parameters = Parameters() - s = "ID_Transfert_LuxNet" - assert parameters._lookup(0).name == s - assert parameters._lookup("0").name == s - assert parameters._lookup(s).name == s - - p0 = parameters._lookup(0) - assert parameters._lookup(0, True) == (0, p0) - assert parameters._lookup("0", True) == (0, p0) - assert parameters._lookup(s, True) == (0, p0) + p0 = parameters.get(0) + assert parameters[0] is p0 + assert parameters["0"] is p0 + assert parameters[s] is p0 # Look for a name which does not exist s = "ID_BarFoo" - assert parameters._lookup(s, True)[0] is None + assert parameters.get(s) is None # Look for something which is not an int and not a string j = 0.0 - assert parameters._lookup(j) is None + assert parameters.get(j) is None def test___iter__(self): """Test cases for __iter__""" diff --git a/tests/cfi/test_cfi_vector.py b/tests/cfi/test_cfi_vector.py new file mode 100644 index 00000000..a97fa18b --- /dev/null +++ b/tests/cfi/test_cfi_vector.py @@ -0,0 +1,110 @@ +from luxtronik.datatypes import Base +from luxtronik.definitions import LuxtronikDefinitionsList +from luxtronik.cfi.vector import DataVectorConfig + + +############################################################################### +# Tests +############################################################################### + +def_list = [ + { + "index": 5, + "count": 1, + "names": ["field_5_bit1"], + "bit_offset": 0, + "bit_count": 1, + "type": Base, + "writeable": False, + }, + { + "index": 5, + "count": 1, + "names": ["field_5_bit2"], + "bit_offset": 1, + "bit_count": 3, + "type": Base, + "writeable": False, + }, + { + "index": 5, + "count": 1, + "names": ["field_5_all"], + "type": Base, + "writeable": False, + }, + { + "index": 7, + "count": 2, + "names": ["field_7"], + "type": Base, + "writeable": True, + }, + { + "index": 9, + "count": 2, + "names": ["field_9"], + "type": Base, + "writeable": True, + } +] +TEST_DEFINITIONS = LuxtronikDefinitionsList(def_list, 'foo', 100, 'INT32') + +FIELD_11_DICT = { + "index": 11, + "count": 1, + "names": ["field_11"], + "type": Base, + "writeable": True, +} +FIELD_12_DICT = { + "index": 12, + "count": 1, + "names": ["field_12"], + "type": Base, + "writeable": True, +} + +class DataVectorTest(DataVectorConfig): + name = 'foo' + definitions = TEST_DEFINITIONS + +class TestDataVector: + + def test_add(self): + data_vector = DataVectorTest.empty() + assert len(data_vector) == 0 + + # In case the definitions are added after the creation + data_vector.definitions.add(FIELD_11_DICT) + data_vector.definitions.add(FIELD_12_DICT) + + # Add available index + field = data_vector.add(11) + assert len(data_vector) == 1 + assert 11 in data_vector + assert field.name == 'field_11' + + # Add not available index (not existing) + field = data_vector.add(13) + assert 'field_6' not in data_vector + assert field is None + assert len(data_vector) == 1 + + # Re-add available index + field = data_vector.add(11) + assert len(data_vector) == 1 + assert field.name == 'field_11' + + # Add available field + field_12 = Base('field_12', False) + field = data_vector.add(field_12) + assert 12 in data_vector + assert len(data_vector) == 2 + assert field == field_12 + + # Re-add available field + field = data_vector.add(field_12) + assert field_12 in data_vector + assert len(data_vector) == 2 + assert field == field_12 diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 4a0f80c3..2b831523 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -1916,14 +1916,20 @@ def test_compatibilities(self): for mapping, data_vector, caption in values: print_caption = True for old_name, old_idx in mapping.items(): - cur_idx, new_name = data_vector._name_lookup(old_name) - cur_name = data_vector.get(old_idx).name - if cur_idx != old_idx or (new_name is not None and new_name != cur_name): + # Try to get the definition of the "old name" + try: + def_by_name = data_vector.definitions.get(old_name) + except Exception: + def_by_name = None + if def_by_name is None or def_by_name.index != old_idx: # We do not use assert here, in order to catch all incompatibilities at once. if print_caption: print(f"### Incompatibilities - {caption}:") print_caption = False - print(f'"{old_name}" is not registered for {old_idx}: "{cur_name}",') + if def_by_name: + print(f'"{old_name}" is not registered for {old_idx}: "{def_by_name.name}",') + else: + print(f'"{old_name}" not found,') ok = False assert ok, "Found incompatibilities. Please consider to add them to compatibilities.py" diff --git a/tests/test_data_vector.py b/tests/test_data_vector.py index b8cae68f..f8772a03 100644 --- a/tests/test_data_vector.py +++ b/tests/test_data_vector.py @@ -34,7 +34,7 @@ class TestDataVector: @pytest.mark.parametrize("name, exception_expected", [ ("foo", False), - ("bar", True), + ("bar", False), ("baz", True), ("qux", False), ]) diff --git a/tests/test_datatypes.py b/tests/test_datatypes.py index 5da5cc7e..81038f7d 100644 --- a/tests/test_datatypes.py +++ b/tests/test_datatypes.py @@ -4,12 +4,6 @@ import datetime -from luxtronik.constants import ( - LUXTRONIK_NAME_CHECK_NONE, - LUXTRONIK_NAME_CHECK_PREFERRED, - LUXTRONIK_NAME_CHECK_OBSOLETE, -) - from luxtronik.datatypes import ( Base, BitMaskBase, @@ -121,14 +115,6 @@ def test_empty_name(self): except Exception: pass - def test_check_name(self): - """Test cases for check_name() function""" - base = Base(["foo", "bar"]) - - assert base.check_name("foo") == LUXTRONIK_NAME_CHECK_PREFERRED - assert base.check_name("bar") == LUXTRONIK_NAME_CHECK_OBSOLETE - assert base.check_name("baz") == LUXTRONIK_NAME_CHECK_NONE - def test_value_property(self): """Test case for value property"""