Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mio/data/config/wireless/wireless-200px.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ adc_scale:
bitdepth: 8
battery_div_factor: 5
vin_div_factor: 11.3
battery_max_voltage: 10.0
vin_max_voltage: 20.0

runtime:
serial_buffer_queue_size: 10
Expand Down
139 changes: 93 additions & 46 deletions mio/models/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Literal, Optional, Union

from pydantic import Field, computed_field, field_validator
from pydantic import BaseModel, Field, PrivateAttr, field_validator

from mio import DEVICE_DIR
from mio.models import MiniscopeConfig
Expand All @@ -14,6 +14,54 @@
from mio.models.sinks import CSVWriterConfig, StreamPlotterConfig


class ScaledValue(BaseModel):
"""
A value that has been scaled from a raw value

Parameters
----------
raw: float
The raw value
scaled: float
The scaled value
"""

scaling_factor: float = Field(
...,
description="Scaling factor applied to the raw value",
)
maximum: Optional[float] = Field(
None,
description="Maximum value for the scaled value",
)
minimum: Optional[float] = Field(
None,
description="Minimum value for the scaled value",
)

def scale(self, value: int) -> float:
"""
Scale a raw value to a scaled value. If the scaled value is outside of the
minimum or maximum bounds, return -1.0

Parameters
----------
value : int
The raw value to scale

Returns
-------
float
The scaled value
"""
scaled_value = float(value) * self.scaling_factor
if (self.minimum is not None and scaled_value < self.minimum) or (
self.maximum is not None and scaled_value > self.maximum
):
return -1.0
return scaled_value


class ADCScaling(MiniscopeConfig):
"""
Configuration for the ADC scaling factors
Expand All @@ -35,30 +83,16 @@ class ADCScaling(MiniscopeConfig):
11.3,
description="Voltage divider factor for the Vin voltage",
)

def scale_battery_voltage(self, voltage_raw: float) -> float:
"""
Scale raw input ADC voltage to Volts

Args:
voltage_raw: Voltage as output by the ADC

Returns:
float: Scaled voltage
"""
return voltage_raw / 2**self.bitdepth * self.ref_voltage * self.battery_div_factor

def scale_input_voltage(self, voltage_raw: float) -> float:
"""
Scale raw input ADC voltage to Volts

Args:
voltage_raw: Voltage as output by the ADC

Returns:
float: Scaled voltage
"""
return voltage_raw / 2**self.bitdepth * self.ref_voltage * self.vin_div_factor
battery_max_voltage: float = Field(
10.0,
description="Maximum voltage of the battery."
"Scaled battery voltage will be -1 if it is greater than this value",
)
vin_max_voltage: float = Field(
20.0,
description="Maximum voltage of the Vin"
"Scaled Vin voltage will be -1 if it is greater than this value",
)


class StreamBufferHeaderFormat(BufferHeaderFormat):
Expand All @@ -82,48 +116,61 @@ class StreamBufferHeaderFormat(BufferHeaderFormat):
battery_voltage_raw: int
input_voltage_raw: int

_adc_scale: ADCScaling


class StreamBufferHeader(BufferHeader):
"""
Refinements of :class:`.BufferHeader` for
:class:`~mio.stream_daq.StreamDaq`

.. todo::
Get the scaling factors from the device configuration
"""

pixel_count: int
battery_voltage_raw: int
input_voltage_raw: int
_adc_scaling: ADCScaling = None

@property
def adc_scaling(self) -> Optional[ADCScaling]:
"""
:class:`.ADCScaling` applied to voltage readings
"""
return self._adc_scaling

@adc_scaling.setter
def adc_scaling(self, scaling: ADCScaling) -> None:
self._adc_scaling = scaling
_adc_scale: ADCScaling = PrivateAttr()
_battery_voltage_scaling: ScaledValue = PrivateAttr()
_input_voltage_scaling: ScaledValue = PrivateAttr()

def __init__(self, adc_scale: ADCScaling, **data: dict):
super().__init__(**data)
self._adc_scale = adc_scale
self._battery_voltage_scaling = ScaledValue(
scaling_factor=(
1
/ (2**self._adc_scale.bitdepth)
* self._adc_scale.ref_voltage
* self._adc_scale.battery_div_factor
),
maximum=self._adc_scale.battery_max_voltage,
)
self._input_voltage_scaling = ScaledValue(
scaling_factor=(
1
/ (2**self._adc_scale.bitdepth)
* self._adc_scale.ref_voltage
* self._adc_scale.vin_div_factor
),
maximum=self._adc_scale.vin_max_voltage,
)
Comment on lines +135 to +159
Copy link
Copy Markdown
Collaborator Author

@t-sasatani t-sasatani Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Notes for whenever I can come back] Not done yet. Doing this for every header input feels like a waste and also makes the header-filling process inconsistent. So, it is probably better to define a helper class that is set once with the device config and converts StreamBufferHeader into scaled values.


@computed_field
@property
def battery_voltage(self) -> float:
"""
Scaled battery voltage in Volts.
"""
if self._adc_scaling is None:
return self.battery_voltage_raw
else:
return self._adc_scaling.scale_battery_voltage(self.battery_voltage_raw)
return self._battery_voltage_scaling.scale(self.battery_voltage_raw)

@computed_field
@property
def input_voltage(self) -> float:
"""
Scaled input voltage in Volts.
"""
if self._adc_scaling is None:
return self.input_voltage_raw
else:
return self._adc_scaling.scale_input_voltage(self.input_voltage_raw)
return self._input_voltage_scaling.scale(self.input_voltage_raw)


class StreamDevRuntime(MiniscopeConfig):
Expand Down
1 change: 0 additions & 1 deletion mio/stream_daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def _parse_header(self, buffer: bytes) -> Tuple[StreamBufferHeader, np.ndarray]:
header_data = StreamBufferHeader.from_format(
header.astype(int), self.header_fmt, construct=True
)
header_data.adc_scaling = self.config.adc_scale

return header_data, payload

Expand Down
64 changes: 35 additions & 29 deletions tests/test_models/test_model_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,22 @@ def test_absolute_bitstream():
}


@pytest.mark.parametrize("scale", [None, 1, 2, _default_adc_scale["ref_voltage"]])
def test_adc_scaling(scale, config_override):
"""
Test that the ADC scaling factors are correctly parsed,
and that :class:`.ADCScaling` methods are correctly applied to their relevant values
"""
if scale is None:
adc_scale = None
else:
adc_scale = _default_adc_scale.copy()
adc_scale.update({"ref_voltage": scale})

example = config_override(CONFIG_DIR / "stream_daq_test_200px.yml", {"adc_scale": adc_scale})
instance_config = StreamDevConfig.from_yaml(example)

battery_voltage_raw = 200
input_voltage_raw = 250
@pytest.mark.parametrize("scale", [1, 2, _default_adc_scale["ref_voltage"]])
def test_adc_scaling(scale):

ref_voltage = scale
bitdepth = 8

battery_div_factor = 5.0
vin_div_factor = 11.3
battery_max_voltage = 10.0
vin_max_voltage = 20.0

battery_voltage_raw = 100
input_voltage_raw = 150

battery_factor = 1 / (2 ** bitdepth) * ref_voltage * battery_div_factor
vin_factor = 1 / (2 ** bitdepth) * ref_voltage * vin_div_factor

instance_header = StreamBufferHeader(
linked_list=0,
Expand All @@ -72,16 +71,23 @@ def test_adc_scaling(scale, config_override):
write_timestamp=0,
battery_voltage_raw=battery_voltage_raw,
input_voltage_raw=input_voltage_raw,
adc_scale=ADCScaling(
ref_voltage=ref_voltage,
bitdepth=bitdepth,
battery_div_factor=battery_div_factor,
vin_div_factor=vin_div_factor,
battery_max_voltage=battery_max_voltage,
vin_max_voltage=vin_max_voltage,
),
)
instance_header.adc_scaling = instance_config.adc_scale

if scale is None:
assert instance_header.battery_voltage == battery_voltage_raw
assert instance_header.input_voltage == input_voltage_raw

else:
adcscale = ADCScaling(**adc_scale)
assert instance_header.battery_voltage == adcscale.scale_battery_voltage(
battery_voltage_raw
)
assert instance_header.input_voltage == adcscale.scale_input_voltage(input_voltage_raw)

expected_battery_voltage = battery_voltage_raw * battery_factor
if expected_battery_voltage > battery_max_voltage:
expected_battery_voltage = -1

expected_input_voltage = input_voltage_raw * vin_factor
if expected_input_voltage > vin_max_voltage:
expected_input_voltage = -1

assert instance_header.battery_voltage == expected_battery_voltage
assert instance_header.input_voltage == expected_input_voltage