From 04e88ef1a5b685376f59faeaeeca4f0cad1b60c0 Mon Sep 17 00:00:00 2001 From: t-sasatani <33111879+t-sasatani@users.noreply.github.com> Date: Wed, 27 Nov 2024 12:35:05 -0800 Subject: [PATCH 1/5] Add upper bound to ADC conversion results --- mio/data/config/wireless/wireless-200px.yml | 2 ++ mio/models/stream.py | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/mio/data/config/wireless/wireless-200px.yml b/mio/data/config/wireless/wireless-200px.yml index 75aa4ad3..fd3d31dc 100644 --- a/mio/data/config/wireless/wireless-200px.yml +++ b/mio/data/config/wireless/wireless-200px.yml @@ -38,6 +38,8 @@ adc_scale: bitdepth: 8 battery_div_factor: 5 vin_div_factor: 11.3 + battery_max_voltage: 10.0 + vin_max_voltage: 20.0 runtime: serial_buffer_queue_size: 10 diff --git a/mio/models/stream.py b/mio/models/stream.py index 61664c46..5ffe79b5 100644 --- a/mio/models/stream.py +++ b/mio/models/stream.py @@ -35,6 +35,16 @@ class ADCScaling(MiniscopeConfig): 11.3, description="Voltage divider factor for the Vin voltage", ) + battery_max_voltage: float = Field( + 10.0, + description="Maximum voltage of the battery." + "Scaled battery voltage will be 0 if it is greater than this value", + ) + vin_max_voltage: float = Field( + 20.0, + description="Maximum voltage of the Vin" + "Scaled Vin voltage will be 0 if it is greater than this value", + ) def scale_battery_voltage(self, voltage_raw: float) -> float: """ @@ -113,7 +123,8 @@ def battery_voltage(self) -> float: if self._adc_scaling is None: return self.battery_voltage_raw else: - return self._adc_scaling.scale_battery_voltage(self.battery_voltage_raw) + battery_voltage = self._adc_scaling.scale_battery_voltage(self.battery_voltage_raw) + return battery_voltage if battery_voltage < self._adc_scaling.battery_max_voltage else 0 @computed_field def input_voltage(self) -> float: @@ -123,7 +134,8 @@ def input_voltage(self) -> float: if self._adc_scaling is None: return self.input_voltage_raw else: - return self._adc_scaling.scale_input_voltage(self.input_voltage_raw) + vin_voltage = self._adc_scaling.scale_input_voltage(self.input_voltage_raw) + return vin_voltage if vin_voltage < self._adc_scaling.vin_max_voltage else 0 class StreamDevRuntime(MiniscopeConfig): From 2a8302ad384aee068902bf168373d4a886fc8676 Mon Sep 17 00:00:00 2001 From: t-sasatani <33111879+t-sasatani@users.noreply.github.com> Date: Wed, 27 Nov 2024 14:05:21 -0800 Subject: [PATCH 2/5] Modify test to comply with bounded computed voltages --- tests/test_models/test_model_stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_models/test_model_stream.py b/tests/test_models/test_model_stream.py index a62c3e42..371264ea 100644 --- a/tests/test_models/test_model_stream.py +++ b/tests/test_models/test_model_stream.py @@ -57,8 +57,8 @@ def test_adc_scaling(scale, config_override): example = config_override(CONFIG_DIR / "stream_daq_test_200px.yml", {"adc_scale": adc_scale}) instance_config = StreamDevConfig.from_yaml(example) - battery_voltage_raw = 200 - input_voltage_raw = 250 + battery_voltage_raw = 100 + input_voltage_raw = 150 instance_header = StreamBufferHeader( linked_list=0, From c713ac7c643ddc14d7fef7b435959e8bf030ab78 Mon Sep 17 00:00:00 2001 From: t-sasatani <33111879+t-sasatani@users.noreply.github.com> Date: Wed, 27 Nov 2024 14:05:47 -0800 Subject: [PATCH 3/5] black formatting --- mio/models/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mio/models/stream.py b/mio/models/stream.py index 5ffe79b5..33fbfe0a 100644 --- a/mio/models/stream.py +++ b/mio/models/stream.py @@ -38,12 +38,12 @@ class ADCScaling(MiniscopeConfig): battery_max_voltage: float = Field( 10.0, description="Maximum voltage of the battery." - "Scaled battery voltage will be 0 if it is greater than this value", + "Scaled battery voltage will be 0 if it is greater than this value", ) vin_max_voltage: float = Field( 20.0, description="Maximum voltage of the Vin" - "Scaled Vin voltage will be 0 if it is greater than this value", + "Scaled Vin voltage will be 0 if it is greater than this value", ) def scale_battery_voltage(self, voltage_raw: float) -> float: From 21674b78f127985e80398a0ce7ecdaa3e87d9da1 Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Mon, 3 Feb 2025 11:36:46 +0900 Subject: [PATCH 4/5] Add ScaledValue class, make ADC scaling mandatory --- mio/models/stream.py | 114 ++++++++++++++----------- mio/stream_daq.py | 1 - tests/test_models/test_model_stream.py | 51 +++++------ 3 files changed, 92 insertions(+), 74 deletions(-) diff --git a/mio/models/stream.py b/mio/models/stream.py index 33fbfe0a..7d2af165 100644 --- a/mio/models/stream.py +++ b/mio/models/stream.py @@ -5,7 +5,8 @@ from pathlib import Path from typing import Literal, Optional, Union -from pydantic import Field, computed_field, field_validator +import numpy as np +from pydantic import BaseModel, Field, PrivateAttr, field_validator from mio import DEVICE_DIR from mio.models import MiniscopeConfig @@ -14,6 +15,39 @@ from mio.models.sinks import CSVWriterConfig, StreamPlotterConfig +class ScaledValue(BaseModel): + """ + A value that has been scaled from a raw value + + Parameters + ---------- + raw: float + The raw value + scaled: float + The scaled value + """ + + scaling_factor: float = Field( + ..., + description="Scaling factor applied to the raw value", + ) + maximum: Optional[float] = Field( + None, + description="Maximum value for the scaled value", + ) + minimum: Optional[float] = Field( + None, + description="Minimum value for the scaled value", + ) + + def scale(self, value: int) -> float: + """ + The scaled value + """ + scaled_value = float(value) * self.scaling_factor + return np.clip(scaled_value, self.minimum, self.maximum) + + class ADCScaling(MiniscopeConfig): """ Configuration for the ADC scaling factors @@ -46,30 +80,6 @@ class ADCScaling(MiniscopeConfig): "Scaled Vin voltage will be 0 if it is greater than this value", ) - def scale_battery_voltage(self, voltage_raw: float) -> float: - """ - Scale raw input ADC voltage to Volts - - Args: - voltage_raw: Voltage as output by the ADC - - Returns: - float: Scaled voltage - """ - return voltage_raw / 2**self.bitdepth * self.ref_voltage * self.battery_div_factor - - def scale_input_voltage(self, voltage_raw: float) -> float: - """ - Scale raw input ADC voltage to Volts - - Args: - voltage_raw: Voltage as output by the ADC - - Returns: - float: Scaled voltage - """ - return voltage_raw / 2**self.bitdepth * self.ref_voltage * self.vin_div_factor - class StreamBufferHeaderFormat(BufferHeaderFormat): """ @@ -102,40 +112,48 @@ class StreamBufferHeader(BufferHeader): pixel_count: int battery_voltage_raw: int input_voltage_raw: int - _adc_scaling: ADCScaling = None - - @property - def adc_scaling(self) -> Optional[ADCScaling]: - """ - :class:`.ADCScaling` applied to voltage readings - """ - return self._adc_scaling - @adc_scaling.setter - def adc_scaling(self, scaling: ADCScaling) -> None: - self._adc_scaling = scaling + # This shouldn't have a default value and probably better to have this in the device config. + _adc_scale: ADCScaling = PrivateAttr(default_factory=ADCScaling) + + _battery_voltage_scaling: ScaledValue = PrivateAttr() + _input_voltage_scaling: ScaledValue = PrivateAttr() + + def __init__(self, **data: dict): + super().__init__(**data) + self._battery_voltage_scaling = ScaledValue( + scaling_factor=( + 1 + / (2**self._adc_scale.bitdepth) + * self._adc_scale.ref_voltage + * self._adc_scale.battery_div_factor + ), + maximum=self._adc_scale.battery_max_voltage, + ) + + self._input_voltage_scaling = ScaledValue( + scaling_factor=( + 1 + / (2**self._adc_scale.bitdepth) + * self._adc_scale.ref_voltage + * self._adc_scale.vin_div_factor + ), + maximum=self._adc_scale.vin_max_voltage, + ) - @computed_field + @property def battery_voltage(self) -> float: """ Scaled battery voltage in Volts. """ - if self._adc_scaling is None: - return self.battery_voltage_raw - else: - battery_voltage = self._adc_scaling.scale_battery_voltage(self.battery_voltage_raw) - return battery_voltage if battery_voltage < self._adc_scaling.battery_max_voltage else 0 + return self._battery_voltage_scaling.scale(self.battery_voltage_raw) - @computed_field + @property def input_voltage(self) -> float: """ Scaled input voltage in Volts. """ - if self._adc_scaling is None: - return self.input_voltage_raw - else: - vin_voltage = self._adc_scaling.scale_input_voltage(self.input_voltage_raw) - return vin_voltage if vin_voltage < self._adc_scaling.vin_max_voltage else 0 + return self._input_voltage_scaling.scale(self.input_voltage_raw) class StreamDevRuntime(MiniscopeConfig): diff --git a/mio/stream_daq.py b/mio/stream_daq.py index cd46a742..9b0b52cc 100644 --- a/mio/stream_daq.py +++ b/mio/stream_daq.py @@ -171,7 +171,6 @@ def _parse_header(self, buffer: bytes) -> Tuple[StreamBufferHeader, np.ndarray]: header_data = StreamBufferHeader.from_format( header.astype(int), self.header_fmt, construct=True ) - header_data.adc_scaling = self.config.adc_scale return header_data, payload diff --git a/tests/test_models/test_model_stream.py b/tests/test_models/test_model_stream.py index 371264ea..a4120f7f 100644 --- a/tests/test_models/test_model_stream.py +++ b/tests/test_models/test_model_stream.py @@ -42,24 +42,23 @@ def test_absolute_bitstream(): } -@pytest.mark.parametrize("scale", [None, 1, 2, _default_adc_scale["ref_voltage"]]) +@pytest.mark.parametrize("scale", [1, 2, _default_adc_scale["ref_voltage"]]) def test_adc_scaling(scale, config_override): - """ - Test that the ADC scaling factors are correctly parsed, - and that :class:`.ADCScaling` methods are correctly applied to their relevant values - """ - if scale is None: - adc_scale = None - else: - adc_scale = _default_adc_scale.copy() - adc_scale.update({"ref_voltage": scale}) - - example = config_override(CONFIG_DIR / "stream_daq_test_200px.yml", {"adc_scale": adc_scale}) - instance_config = StreamDevConfig.from_yaml(example) + + ref_voltage = scale + bitdepth = 8 + + battery_div_factor = 5.0 + vin_div_factor = 11.3 + battery_max_voltage = 10.0 + vin_max_voltage = 20.0 battery_voltage_raw = 100 input_voltage_raw = 150 + battery_factor = 1 / (2 ** bitdepth) * ref_voltage * battery_div_factor + vin_factor = 1 / (2 ** bitdepth) * ref_voltage * vin_div_factor + instance_header = StreamBufferHeader( linked_list=0, frame_num=0, @@ -72,16 +71,18 @@ def test_adc_scaling(scale, config_override): write_timestamp=0, battery_voltage_raw=battery_voltage_raw, input_voltage_raw=input_voltage_raw, + adc_scale=ADCScaling( + ref_voltage=ref_voltage, + bitdepth=bitdepth, + battery_div_factor=battery_div_factor, + vin_div_factor=vin_div_factor, + battery_max_voltage=battery_max_voltage, + vin_max_voltage=vin_max_voltage, + ), ) - instance_header.adc_scaling = instance_config.adc_scale - - if scale is None: - assert instance_header.battery_voltage == battery_voltage_raw - assert instance_header.input_voltage == input_voltage_raw - - else: - adcscale = ADCScaling(**adc_scale) - assert instance_header.battery_voltage == adcscale.scale_battery_voltage( - battery_voltage_raw - ) - assert instance_header.input_voltage == adcscale.scale_input_voltage(input_voltage_raw) + + expected_battery_voltage = min(battery_voltage_raw * battery_factor, battery_max_voltage) + expected_input_voltage = min(input_voltage_raw * vin_factor, vin_max_voltage) + + assert instance_header.battery_voltage == expected_battery_voltage + assert instance_header.input_voltage == expected_input_voltage \ No newline at end of file From 42c564f8a32b64824ff824470c8884645ae85e6f Mon Sep 17 00:00:00 2001 From: t-sasatani Date: Mon, 3 Feb 2025 12:07:45 +0900 Subject: [PATCH 5/5] Make invalid values -1 --- mio/models/stream.py | 37 +++++++++++++++++++------- tests/test_models/test_model_stream.py | 11 +++++--- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/mio/models/stream.py b/mio/models/stream.py index 7d2af165..644fdf07 100644 --- a/mio/models/stream.py +++ b/mio/models/stream.py @@ -5,7 +5,6 @@ from pathlib import Path from typing import Literal, Optional, Union -import numpy as np from pydantic import BaseModel, Field, PrivateAttr, field_validator from mio import DEVICE_DIR @@ -42,10 +41,25 @@ class ScaledValue(BaseModel): def scale(self, value: int) -> float: """ - The scaled value + Scale a raw value to a scaled value. If the scaled value is outside of the + minimum or maximum bounds, return -1.0 + + Parameters + ---------- + value : int + The raw value to scale + + Returns + ------- + float + The scaled value """ scaled_value = float(value) * self.scaling_factor - return np.clip(scaled_value, self.minimum, self.maximum) + if (self.minimum is not None and scaled_value < self.minimum) or ( + self.maximum is not None and scaled_value > self.maximum + ): + return -1.0 + return scaled_value class ADCScaling(MiniscopeConfig): @@ -72,12 +86,12 @@ class ADCScaling(MiniscopeConfig): battery_max_voltage: float = Field( 10.0, description="Maximum voltage of the battery." - "Scaled battery voltage will be 0 if it is greater than this value", + "Scaled battery voltage will be -1 if it is greater than this value", ) vin_max_voltage: float = Field( 20.0, description="Maximum voltage of the Vin" - "Scaled Vin voltage will be 0 if it is greater than this value", + "Scaled Vin voltage will be -1 if it is greater than this value", ) @@ -102,25 +116,29 @@ class StreamBufferHeaderFormat(BufferHeaderFormat): battery_voltage_raw: int input_voltage_raw: int + _adc_scale: ADCScaling + class StreamBufferHeader(BufferHeader): """ Refinements of :class:`.BufferHeader` for :class:`~mio.stream_daq.StreamDaq` + + .. todo:: + Get the scaling factors from the device configuration """ pixel_count: int battery_voltage_raw: int input_voltage_raw: int - # This shouldn't have a default value and probably better to have this in the device config. - _adc_scale: ADCScaling = PrivateAttr(default_factory=ADCScaling) - + _adc_scale: ADCScaling = PrivateAttr() _battery_voltage_scaling: ScaledValue = PrivateAttr() _input_voltage_scaling: ScaledValue = PrivateAttr() - def __init__(self, **data: dict): + def __init__(self, adc_scale: ADCScaling, **data: dict): super().__init__(**data) + self._adc_scale = adc_scale self._battery_voltage_scaling = ScaledValue( scaling_factor=( 1 @@ -130,7 +148,6 @@ def __init__(self, **data: dict): ), maximum=self._adc_scale.battery_max_voltage, ) - self._input_voltage_scaling = ScaledValue( scaling_factor=( 1 diff --git a/tests/test_models/test_model_stream.py b/tests/test_models/test_model_stream.py index a4120f7f..210731be 100644 --- a/tests/test_models/test_model_stream.py +++ b/tests/test_models/test_model_stream.py @@ -43,7 +43,7 @@ def test_absolute_bitstream(): @pytest.mark.parametrize("scale", [1, 2, _default_adc_scale["ref_voltage"]]) -def test_adc_scaling(scale, config_override): +def test_adc_scaling(scale): ref_voltage = scale bitdepth = 8 @@ -81,8 +81,13 @@ def test_adc_scaling(scale, config_override): ), ) - expected_battery_voltage = min(battery_voltage_raw * battery_factor, battery_max_voltage) - expected_input_voltage = min(input_voltage_raw * vin_factor, vin_max_voltage) + expected_battery_voltage = battery_voltage_raw * battery_factor + if expected_battery_voltage > battery_max_voltage: + expected_battery_voltage = -1 + + expected_input_voltage = input_voltage_raw * vin_factor + if expected_input_voltage > vin_max_voltage: + expected_input_voltage = -1 assert instance_header.battery_voltage == expected_battery_voltage assert instance_header.input_voltage == expected_input_voltage \ No newline at end of file