diff --git a/mio/data/config/wireless/wireless-200px.yml b/mio/data/config/wireless/wireless-200px.yml index 75aa4ad3..fd3d31dc 100644 --- a/mio/data/config/wireless/wireless-200px.yml +++ b/mio/data/config/wireless/wireless-200px.yml @@ -38,6 +38,8 @@ adc_scale: bitdepth: 8 battery_div_factor: 5 vin_div_factor: 11.3 + battery_max_voltage: 10.0 + vin_max_voltage: 20.0 runtime: serial_buffer_queue_size: 10 diff --git a/mio/models/stream.py b/mio/models/stream.py index 61664c46..644fdf07 100644 --- a/mio/models/stream.py +++ b/mio/models/stream.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Literal, Optional, Union -from pydantic import Field, computed_field, field_validator +from pydantic import BaseModel, Field, PrivateAttr, field_validator from mio import DEVICE_DIR from mio.models import MiniscopeConfig @@ -14,6 +14,54 @@ from mio.models.sinks import CSVWriterConfig, StreamPlotterConfig +class ScaledValue(BaseModel): + """ + A value that has been scaled from a raw value + + Parameters + ---------- + raw: float + The raw value + scaled: float + The scaled value + """ + + scaling_factor: float = Field( + ..., + description="Scaling factor applied to the raw value", + ) + maximum: Optional[float] = Field( + None, + description="Maximum value for the scaled value", + ) + minimum: Optional[float] = Field( + None, + description="Minimum value for the scaled value", + ) + + def scale(self, value: int) -> float: + """ + Scale a raw value to a scaled value. If the scaled value is outside of the + minimum or maximum bounds, return -1.0 + + Parameters + ---------- + value : int + The raw value to scale + + Returns + ------- + float + The scaled value + """ + scaled_value = float(value) * self.scaling_factor + if (self.minimum is not None and scaled_value < self.minimum) or ( + self.maximum is not None and scaled_value > self.maximum + ): + return -1.0 + return scaled_value + + class ADCScaling(MiniscopeConfig): """ Configuration for the ADC scaling factors @@ -35,30 +83,16 @@ class ADCScaling(MiniscopeConfig): 11.3, description="Voltage divider factor for the Vin voltage", ) - - def scale_battery_voltage(self, voltage_raw: float) -> float: - """ - Scale raw input ADC voltage to Volts - - Args: - voltage_raw: Voltage as output by the ADC - - Returns: - float: Scaled voltage - """ - return voltage_raw / 2**self.bitdepth * self.ref_voltage * self.battery_div_factor - - def scale_input_voltage(self, voltage_raw: float) -> float: - """ - Scale raw input ADC voltage to Volts - - Args: - voltage_raw: Voltage as output by the ADC - - Returns: - float: Scaled voltage - """ - return voltage_raw / 2**self.bitdepth * self.ref_voltage * self.vin_div_factor + battery_max_voltage: float = Field( + 10.0, + description="Maximum voltage of the battery." + "Scaled battery voltage will be -1 if it is greater than this value", + ) + vin_max_voltage: float = Field( + 20.0, + description="Maximum voltage of the Vin" + "Scaled Vin voltage will be -1 if it is greater than this value", + ) class StreamBufferHeaderFormat(BufferHeaderFormat): @@ -82,48 +116,61 @@ class StreamBufferHeaderFormat(BufferHeaderFormat): battery_voltage_raw: int input_voltage_raw: int + _adc_scale: ADCScaling + class StreamBufferHeader(BufferHeader): """ Refinements of :class:`.BufferHeader` for :class:`~mio.stream_daq.StreamDaq` + + .. todo:: + Get the scaling factors from the device configuration """ pixel_count: int battery_voltage_raw: int input_voltage_raw: int - _adc_scaling: ADCScaling = None - - @property - def adc_scaling(self) -> Optional[ADCScaling]: - """ - :class:`.ADCScaling` applied to voltage readings - """ - return self._adc_scaling - @adc_scaling.setter - def adc_scaling(self, scaling: ADCScaling) -> None: - self._adc_scaling = scaling + _adc_scale: ADCScaling = PrivateAttr() + _battery_voltage_scaling: ScaledValue = PrivateAttr() + _input_voltage_scaling: ScaledValue = PrivateAttr() + + def __init__(self, adc_scale: ADCScaling, **data: dict): + super().__init__(**data) + self._adc_scale = adc_scale + self._battery_voltage_scaling = ScaledValue( + scaling_factor=( + 1 + / (2**self._adc_scale.bitdepth) + * self._adc_scale.ref_voltage + * self._adc_scale.battery_div_factor + ), + maximum=self._adc_scale.battery_max_voltage, + ) + self._input_voltage_scaling = ScaledValue( + scaling_factor=( + 1 + / (2**self._adc_scale.bitdepth) + * self._adc_scale.ref_voltage + * self._adc_scale.vin_div_factor + ), + maximum=self._adc_scale.vin_max_voltage, + ) - @computed_field + @property def battery_voltage(self) -> float: """ Scaled battery voltage in Volts. """ - if self._adc_scaling is None: - return self.battery_voltage_raw - else: - return self._adc_scaling.scale_battery_voltage(self.battery_voltage_raw) + return self._battery_voltage_scaling.scale(self.battery_voltage_raw) - @computed_field + @property def input_voltage(self) -> float: """ Scaled input voltage in Volts. """ - if self._adc_scaling is None: - return self.input_voltage_raw - else: - return self._adc_scaling.scale_input_voltage(self.input_voltage_raw) + return self._input_voltage_scaling.scale(self.input_voltage_raw) class StreamDevRuntime(MiniscopeConfig): diff --git a/mio/stream_daq.py b/mio/stream_daq.py index cd46a742..9b0b52cc 100644 --- a/mio/stream_daq.py +++ b/mio/stream_daq.py @@ -171,7 +171,6 @@ def _parse_header(self, buffer: bytes) -> Tuple[StreamBufferHeader, np.ndarray]: header_data = StreamBufferHeader.from_format( header.astype(int), self.header_fmt, construct=True ) - header_data.adc_scaling = self.config.adc_scale return header_data, payload diff --git a/tests/test_models/test_model_stream.py b/tests/test_models/test_model_stream.py index a62c3e42..210731be 100644 --- a/tests/test_models/test_model_stream.py +++ b/tests/test_models/test_model_stream.py @@ -42,23 +42,22 @@ def test_absolute_bitstream(): } -@pytest.mark.parametrize("scale", [None, 1, 2, _default_adc_scale["ref_voltage"]]) -def test_adc_scaling(scale, config_override): - """ - Test that the ADC scaling factors are correctly parsed, - and that :class:`.ADCScaling` methods are correctly applied to their relevant values - """ - if scale is None: - adc_scale = None - else: - adc_scale = _default_adc_scale.copy() - adc_scale.update({"ref_voltage": scale}) - - example = config_override(CONFIG_DIR / "stream_daq_test_200px.yml", {"adc_scale": adc_scale}) - instance_config = StreamDevConfig.from_yaml(example) - - battery_voltage_raw = 200 - input_voltage_raw = 250 +@pytest.mark.parametrize("scale", [1, 2, _default_adc_scale["ref_voltage"]]) +def test_adc_scaling(scale): + + ref_voltage = scale + bitdepth = 8 + + battery_div_factor = 5.0 + vin_div_factor = 11.3 + battery_max_voltage = 10.0 + vin_max_voltage = 20.0 + + battery_voltage_raw = 100 + input_voltage_raw = 150 + + battery_factor = 1 / (2 ** bitdepth) * ref_voltage * battery_div_factor + vin_factor = 1 / (2 ** bitdepth) * ref_voltage * vin_div_factor instance_header = StreamBufferHeader( linked_list=0, @@ -72,16 +71,23 @@ def test_adc_scaling(scale, config_override): write_timestamp=0, battery_voltage_raw=battery_voltage_raw, input_voltage_raw=input_voltage_raw, + adc_scale=ADCScaling( + ref_voltage=ref_voltage, + bitdepth=bitdepth, + battery_div_factor=battery_div_factor, + vin_div_factor=vin_div_factor, + battery_max_voltage=battery_max_voltage, + vin_max_voltage=vin_max_voltage, + ), ) - instance_header.adc_scaling = instance_config.adc_scale - - if scale is None: - assert instance_header.battery_voltage == battery_voltage_raw - assert instance_header.input_voltage == input_voltage_raw - - else: - adcscale = ADCScaling(**adc_scale) - assert instance_header.battery_voltage == adcscale.scale_battery_voltage( - battery_voltage_raw - ) - assert instance_header.input_voltage == adcscale.scale_input_voltage(input_voltage_raw) + + expected_battery_voltage = battery_voltage_raw * battery_factor + if expected_battery_voltage > battery_max_voltage: + expected_battery_voltage = -1 + + expected_input_voltage = input_voltage_raw * vin_factor + if expected_input_voltage > vin_max_voltage: + expected_input_voltage = -1 + + assert instance_header.battery_voltage == expected_battery_voltage + assert instance_header.input_voltage == expected_input_voltage \ No newline at end of file