From c90f22d870ca0a99ec043c6a870a3bb1fa5a81f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 4 Feb 2026 13:03:47 +0000 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=20Added=20threshold=20definition?= =?UTF-8?q?=20capability=20to=20dispatcher?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- simvue/{factory => }/dispatch/__init__.py | 16 ++-- simvue/{factory => }/dispatch/base.py | 26 ++++++- simvue/{factory => }/dispatch/direct.py | 20 ++++- simvue/{factory => }/dispatch/queued.py | 46 ++++++++--- simvue/exception.py | 11 ++- simvue/factory/__init__.py | 6 -- simvue/run.py | 63 +++++++++++---- tests/functional/test_dispatch.py | 95 ++++++++++++++++------- 8 files changed, 213 insertions(+), 70 deletions(-) rename simvue/{factory => }/dispatch/__init__.py (83%) rename simvue/{factory => }/dispatch/base.py (54%) rename simvue/{factory => }/dispatch/direct.py (74%) rename simvue/{factory => }/dispatch/queued.py (73%) delete mode 100644 simvue/factory/__init__.py diff --git a/simvue/factory/dispatch/__init__.py b/simvue/dispatch/__init__.py similarity index 83% rename from simvue/factory/dispatch/__init__.py rename to simvue/dispatch/__init__.py index 490b34c5..a10e2089 100644 --- a/simvue/factory/dispatch/__init__.py +++ b/simvue/dispatch/__init__.py @@ -1,6 +1,4 @@ -""" -Dispatch -======== +"""Dispatch Contains factory method for selecting dispatcher type based on Simvue Configuration """ @@ -20,11 +18,11 @@ def Dispatcher( mode: typing.Literal["direct", "queued"], - callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None], + callback: typing.Callable[[list[typing.Any], str], None], object_types: list[str], termination_trigger: "Event", name: str | None = None, - **kwargs, + thresholds: dict[str, int | float] | None = None, ) -> "DispatcherBaseClass": """Returns instance of dispatcher based on configuration @@ -46,6 +44,10 @@ def Dispatcher( event which triggers termination of the dispatcher name : str | None, optional name for the underlying thread, default None + thresholds: dict[str, int | float] | None, default None + if metadata is provided during item addition, specify + thresholds under which dispatch of an item is permitted, + default is None Returns ------- @@ -58,7 +60,7 @@ def Dispatcher( callback=callback, object_types=object_types, termination_trigger=termination_trigger, - **kwargs, + thresholds=thresholds, ) else: logger.debug("Using queued dispatch for metric and queue sending") @@ -67,5 +69,5 @@ def Dispatcher( object_types=object_types, termination_trigger=termination_trigger, name=name, - **kwargs, + thresholds=thresholds, ) diff --git a/simvue/factory/dispatch/base.py b/simvue/dispatch/base.py similarity index 54% rename from simvue/factory/dispatch/base.py rename to simvue/dispatch/base.py index 0ef85244..2413cd92 100644 --- a/simvue/factory/dispatch/base.py +++ b/simvue/dispatch/base.py @@ -2,23 +2,41 @@ import abc import typing +from simvue.exception import ObjectDispatchError + class DispatcherBaseClass(abc.ABC): def __init__( self, + *, callback: typing.Callable[[list[typing.Any], str], None], object_types: list[str], termination_trigger: threading.Event, - **_, + thresholds: dict[str, int | float] | None = None, ) -> None: super().__init__() + self._thresholds: dict[str, int | float] = thresholds or {} self._object_types: list[str] = object_types self._termination_trigger = termination_trigger self._callback = callback - @abc.abstractmethod - def add_item(self, item: typing.Any, object_type: str, *_, **__) -> None: - pass + def add_item( + self, + item: typing.Any, + object_type: str, + metadata: dict[str, int | float] | None = None, + *_, + **__, + ) -> None: + _ = item + _ = object_type + if not metadata: + return + for key, threshold in self._thresholds.items(): + if key in metadata and metadata[key] > threshold: + raise ObjectDispatchError( + label=key, threshold=threshold, value=metadata[key] + ) @abc.abstractmethod def run(self) -> None: diff --git a/simvue/factory/dispatch/direct.py b/simvue/dispatch/direct.py similarity index 74% rename from simvue/factory/dispatch/direct.py rename to simvue/dispatch/direct.py index a4d1540e..09bd4214 100644 --- a/simvue/factory/dispatch/direct.py +++ b/simvue/dispatch/direct.py @@ -9,10 +9,11 @@ class DirectDispatcher(DispatcherBaseClass): def __init__( self, + *, callback: typing.Callable[[list[typing.Any], str], None], object_types: list[str], termination_trigger: threading.Event, - **_, + thresholds: dict[str, int | float] | None = None, ) -> None: """Initialise a new DirectDispatcher instance @@ -24,16 +25,31 @@ def __init__( categories, this is mainly used for creation of queues in a QueueDispatcher termination_trigger : Event event which triggers termination of the dispatcher + thresholds: int | float + if metadata is provided during item addition, specify + thresholds under which dispatch of an item is permitted, + default is None """ super().__init__( callback=callback, object_types=object_types, termination_trigger=termination_trigger, + thresholds=thresholds, ) - def add_item(self, item: typing.Any, object_type: str, *_, **__) -> None: + def add_item( + self, + item: typing.Any, + *, + object_type: str, + metadata: dict[str, int | float] | None = None, + **__, + ) -> bool: """Execute callback on the given item""" + if not super().add_item(item, object_type, metadata): + return False self._callback([item], object_type) + return True def run(self) -> None: """Run does not execute anything in this context""" diff --git a/simvue/factory/dispatch/queued.py b/simvue/dispatch/queued.py similarity index 73% rename from simvue/factory/dispatch/queued.py rename to simvue/dispatch/queued.py index ba042d6d..5387f69f 100644 --- a/simvue/factory/dispatch/queued.py +++ b/simvue/dispatch/queued.py @@ -33,12 +33,14 @@ class QueuedDispatcher(threading.Thread, DispatcherBaseClass): def __init__( self, + *, callback: typing.Callable[[list[typing.Any], str], None], object_types: list[str], termination_trigger: threading.Event, name: str | None = None, max_buffer_size: int = MAX_BUFFER_SIZE, max_read_rate: float = MAX_REQUESTS_PER_SECOND, + thresholds: dict[str, int | float] | None = None, ) -> None: """ Initialise a new queue based dispatcher @@ -58,34 +60,47 @@ def __init__( maximum number of items allowed in created buffer. max_read_rate : float maximum rate at which the callback can be executed + thresholds: dict[str, int | float] | None, optional + if metadata is provided during item addition, specify + thresholds within which a single dispatch is permitted, + default is None """ DispatcherBaseClass.__init__( self, callback=callback, object_types=object_types, termination_trigger=termination_trigger, + thresholds=thresholds, ) super().__init__(name=name, daemon=True) - self._termination_trigger = termination_trigger - self._callback = callback - self._queues = {label: queue.Queue() for label in object_types} - self._max_read_rate = max_read_rate - self._max_buffer_size = max_buffer_size - self._send_timer = 0 + self._termination_trigger: threading.Event = termination_trigger + self._callback: typing.Callable[[list[typing.Any], str], None] = callback + self._queues: dict[str, queue.Queue[typing.Any]] = { + label: queue.Queue() for label in object_types + } + self._max_read_rate: float = max_read_rate + self._max_buffer_size: int = max_buffer_size + self._send_timer: int = 0 def add_item( - self, item: typing.Any, object_type: str, blocking: bool = True + self, + item: typing.Any, + *, + object_type: str, + blocking: bool = True, + metadata: dict[str, int | float] | None = None, ) -> None: """Add an item to the specified queue with/without blocking""" + super().add_item(item, object_type, metadata) if self._termination_trigger.is_set(): raise RuntimeError( f"Cannot append item '{item}' to queue '{object_type}', " - "termination called." + + "termination called." ) if object_type not in self._queues: raise KeyError(f"No queue '{object_type}' found") - self._queues[object_type].put(item, block=blocking) + self._queues[object_type].put((item, metadata or {}), block=blocking) @property def empty(self) -> bool: @@ -111,12 +126,23 @@ def _create_buffer(self, queue_label: str) -> list[typing.Any]: The length of the buffer is constrained. """ _buffer: list[typing.Any] = [] + _criteria: dict[str, int | float] = {} + _threshold_totals: dict[str, float] = {k: 0 for k in self._thresholds} while ( not self._queues[queue_label].empty() and len(_buffer) < self._max_buffer_size + and all( + _threshold_totals[key] < self._thresholds[key] + for key in _threshold_totals + ) ): - _item = self._queues[queue_label].get(block=False) + _item, _metadata = typing.cast( + "tuple[typing.Any, dict[str, int | float]]", + self._queues[queue_label].get(block=False), + ) + for key in _threshold_totals: + _threshold_totals[key] += _metadata.get(key, 0) _buffer.append(_item) self._queues[queue_label].task_done() diff --git a/simvue/exception.py b/simvue/exception.py index 3dc5e65e..09360a75 100644 --- a/simvue/exception.py +++ b/simvue/exception.py @@ -21,4 +21,13 @@ def __init__(self, obj_type: str, name: str, extra: str | None = None) -> None: class SimvueRunError(RuntimeError): """A special sub-class of runtime error specifically for Simvue run errors""" - pass + +class ObjectDispatchError(Exception): + """Raised if object dispatch failed due to condition.""" + + def __init__(self, label: str, threshold: int | float, value: int | float) -> None: + self.msg = ( + f"Object dispatch failed, {label} " + + f"of {value} exceeds maximum permitted value of {threshold}" + ) + super().__init__(self.msg) diff --git a/simvue/factory/__init__.py b/simvue/factory/__init__.py deleted file mode 100644 index 5b2a241a..00000000 --- a/simvue/factory/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -Factory -======= - -Contains functions which select class types based on Simvue configuration -""" diff --git a/simvue/run.py b/simvue/run.py index 7e3e96fb..968b13a4 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -33,13 +33,13 @@ from simvue.api.objects.alert.fetch import Alert from simvue.api.objects.folder import Folder from simvue.api.objects.grids import GridMetrics -from simvue.exception import ObjectNotFoundError, SimvueRunError +from simvue.exception import ObjectNotFoundError, SimvueRunError, ObjectDispatchError from simvue.utilities import prettify_pydantic from .config.user import SimvueConfiguration -from .factory.dispatch import Dispatcher +from .dispatch import Dispatcher from .executor import Executor, get_current_shell from .metrics import SystemResourceMeasurement from .models import ( @@ -75,10 +75,11 @@ if typing.TYPE_CHECKING: - from .factory.dispatch import DispatcherBaseClass + from .dispatch import DispatcherBaseClass HEARTBEAT_INTERVAL: int = 60 RESOURCES_METRIC_PREFIX: str = "resources" +MAXIMUM_GRID_METRIC_SIZE: int = 10**6 logger = logging.getLogger(__name__) @@ -175,6 +176,7 @@ def __init__( self._grids: dict[str, str] = {} self._suppress_errors: bool = False self._queue_blocking: bool = False + self._failed_metric_counter: int = 0 self._status: ( typing.Literal[ "created", "running", "completed", "failed", "terminated", "lost" @@ -545,6 +547,7 @@ def _start(self) -> bool: mode=self._dispatch_mode, termination_trigger=self._shutdown_event, object_types=["events", "metrics_regular", "metrics_tensor"], + thresholds=dict(object_size=MAXIMUM_GRID_METRIC_SIZE), callback=self._create_dispatch_callback(), ) @@ -1289,7 +1292,9 @@ def log_event( return False _data = {"message": message, "timestamp": timestamp} - self._dispatcher.add_item(_data, "events", self._queue_blocking) + self._dispatcher.add_item( + _data, object_type="events", blocking=self._queue_blocking + ) return True @@ -1333,13 +1338,23 @@ def _add_metrics_to_dispatch( "timestamp": simvue_timestamp(timestamp), "step": step if step is not None else self._step, } - self._dispatcher.add_item(_data, "metrics_regular", self._queue_blocking) + + try: + self._dispatcher.add_item( + _data, + object_type="metrics_regular", + blocking=self._queue_blocking, + metadata=dict(object_size=len(metrics)), + ) + except ObjectDispatchError as e: + logger.warning(f"Failed to log metric {id(_data)}: {e.msg}") + self._failed_metric_counter += 1 return True def _add_tensors_to_dispatch( self, - tensors: dict[str, int | float], + tensors: dict[str, numpy.ndarray], *, step: int | None = None, time: float | None = None, @@ -1381,7 +1396,16 @@ def _add_tensors_to_dispatch( "metric": tensor, } - self._dispatcher.add_item(_data, "metrics_tensor", self._queue_blocking) + try: + self._dispatcher.add_item( + _data, + object_type="metrics_tensor", + blocking=self._queue_blocking, + metadata=dict(object_size=array.size), + ) + except ObjectDispatchError as e: + logger.warning(f"Failed to grid metric {id(_data)}: {e.msg}") + self._failed_metric_counter += 1 return True @@ -1501,17 +1525,23 @@ def log_metrics( # TODO: When metrics and grids are combined into a single entity # this can be removed. For now need to separate tensor based metrics # from regular - _tensor_metrics: list[numpy.ndarray] = {} - _regular_metrics: list[numpy.ndarray] = {} + _tensor_metrics: dict[str, numpy.ndarray] = {} + _regular_metrics: dict[str, int | float] = {} # Classify metrics into regular and tensor based for label, metric in metrics.items(): if isinstance(metric, numpy.ndarray): + if metric.size > MAXIMUM_GRID_METRIC_SIZE: + logger.warning( + f"Cannot log grid metric {label}, " + + f"size {metric.size} exceeds limit of {MAXIMUM_GRID_METRIC_SIZE}" + ) + continue if label not in self._grids: logger.warning( f"Metric '{label}' is not assigned to a grid, " - "using default axis range [0, 1] for all axes " - "and assuming constant interval." + + "using default axis range [0, 1] for all axes " + + "and assuming constant interval." ) _axes_ticks = [numpy.linspace(0, 1, n) for n in metric.shape] self.assign_metric_to_grid( @@ -1523,7 +1553,7 @@ def log_metrics( if metric.ndim != (_ndims := self._grids[label]["dimensionality"]): self._error( f"Cannot log tensor '{label}', " - f"dimensionality incompatibility: {metric.ndim} != {_ndims}" + + f"dimensionality incompatibility: {metric.ndim} != {_ndims}" ) _tensor_metrics[label] = metric else: @@ -1864,11 +1894,18 @@ def _tidy_run(self) -> None: _error_msg = f":\n{_error_msg}" click.secho( "[simvue] Process executor terminated with non-zero exit status " - f"{_non_zero}{_error_msg}", + + f"{_non_zero}{_error_msg}", fg="red" if self._term_color else None, bold=self._term_color, ) sys.exit(_non_zero) + if self._failed_metric_counter: + click.secho( + "[simvue] Run completed with {self._failed_metric_counter} failed metrics.", + fg="yellow" if self._term_color else None, + bold=self._term_color, + ) + sys.exit(1) @skip_if_failed("_aborted", "_suppress_errors", False) def close(self) -> bool: diff --git a/tests/functional/test_dispatch.py b/tests/functional/test_dispatch.py index 3c007cf6..9dee00f8 100644 --- a/tests/functional/test_dispatch.py +++ b/tests/functional/test_dispatch.py @@ -7,26 +7,32 @@ from concurrent.futures import ThreadPoolExecutor -from simvue.factory.dispatch.queued import QueuedDispatcher +from simvue.dispatch.queued import QueuedDispatcher -from simvue.factory.dispatch.direct import DirectDispatcher +from simvue.dispatch.direct import DirectDispatcher +from simvue.exception import ObjectDispatchError # FIXME: Update the layout of these tests @pytest.mark.dispatch -@pytest.mark.parametrize("overload_buffer", (True, False), ids=("overload", "normal")) +@pytest.mark.parametrize("scenario", ("overload_buffer", "normal", "size_threshold_single", "size_threshold_total")) @pytest.mark.parametrize( "append_during_dispatch", (True, False), ids=("pre_append", "append") ) @pytest.mark.parametrize("multiple", (True, False), ids=("multiple", "single")) def test_queued_dispatcher( - overload_buffer: bool, multiple: bool, append_during_dispatch: bool + scenario: typing.Literal["overload_buffer", "normal", "size_threshold_single", "size_threshold_total"], multiple: bool, append_during_dispatch: bool ) -> None: buffer_size: int = 10 - n_elements: int = 2 * buffer_size if overload_buffer else buffer_size - 1 + if scenario == "overload_buffer": + n_elements = 2 * buffer_size + elif scenario in ("size_threshold_total", "size_threshold_single"): + n_elements = 1 + else: + n_elements = buffer_size - 1 max_read_rate: float = 0.2 - time_threshold: float = 1 + (1 / max_read_rate) if overload_buffer else 1 + time_threshold: float = 1 + (1 / max_read_rate) if scenario == "overload_buffer" else 1 start_time = time.time() @@ -40,6 +46,8 @@ def test_queued_dispatcher( event = Event() dispatchers: list[QueuedDispatcher] = [] + thresholds = {"max_size" : 10} if scenario in ("size_threshold_single", "size_threshold_total") else None + for variable in variables: check_dict[variable] = {"counter": 0} @@ -50,21 +58,33 @@ def callback( dispatchers.append( QueuedDispatcher( - callback, - [variable], - event, + callback=callback, + object_types=[variable], + termination_trigger=event, max_buffer_size=buffer_size, max_read_rate=max_read_rate, - name=f"Queued_Dispatcher_{variable}" + name=f"Queued_Dispatcher_{variable}", + thresholds=thresholds ) ) if not append_during_dispatch: for i in range(n_elements): for variable, dispatcher in zip(variables, dispatchers): - dispatcher.add_item( - {string.ascii_uppercase[i % 26]: i}, variable, False - ) + sizes = [10] + if scenario == "size_threshold_total": + sizes = [1, 8, 7, 2] + if scenario == "size_threshold_single": + with pytest.raises(ObjectDispatchError): + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, object_type=variable, blocking=False, metadata={"max_size": 12} + ) + return + + for size in sizes: + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, object_type=variable, blocking=False, metadata={"max_size": size} + ) for dispatcher in dispatchers: dispatcher.start() @@ -72,22 +92,43 @@ def callback( if append_during_dispatch: for i in range(n_elements): for variable, dispatcher in zip(variables, dispatchers): - dispatcher.add_item( - {string.ascii_uppercase[i % 26]: i}, variable, False - ) + sizes = [10] + if scenario == "size_threshold_total": + sizes = [1, 8, 7, 2] + if scenario == "size_threshold_single": + with pytest.raises(ObjectDispatchError): + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, object_type=variable, blocking=False, metadata={"max_size": 12} + ) + return - while not dispatcher.empty: + for size in sizes: + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, object_type=variable, blocking=False, metadata={"max_size": size} + ) + + counter = 0 + + while not dispatcher.empty and counter < 100: + counter += 1 time.sleep(0.1) + if counter >= 100: + raise AssertionError("Failed to empty dispatch queue") + event.set() dispatcher.join() time.sleep(0.1) for variable in variables: - assert check_dict[variable]["counter"] >= (2 if overload_buffer else 1), ( + assert check_dict[variable]["counter"] >= (2 if scenario in ("overload_buffer", "size_threshold_total") else 1), ( f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}" ) + + if scenario in ("size_threshold_single", "size_threshold_total"): + return + assert time.time() - start_time < time_threshold @@ -121,9 +162,9 @@ def _main( ) -> bool: term_event = Event() dispatcher = QueuedDispatcher( - dispatch_callback(index), - [variable] if isinstance(variable, str) else variable, - term_event, + callback=dispatch_callback(index), + object_types=[variable] if isinstance(variable, str) else variable, + termination_trigger=term_event, max_buffer_size=buffer_size, max_read_rate=max_read_rate, name=f"test_nested_queued_dispatch" @@ -135,12 +176,12 @@ def _main( for i in range(n_elements): if isinstance(variable, str): dispatcher.add_item( - {string.ascii_uppercase[i % 26]: i}, variable, False + {string.ascii_uppercase[i % 26]: i}, object_type=variable, blocking=False ) else: for var in variable: dispatcher.add_item( - {string.ascii_uppercase[i % 26]: i}, var, False + {string.ascii_uppercase[i % 26]: i}, object_type=var, blocking=False ) except RuntimeError: res_queue.put("AARGHGHGHGHAHSHGHSDHFSEDHSE") @@ -199,7 +240,7 @@ def test_queued_dispatch_error_adding_item_after_termination() -> None: trigger.set() with pytest.raises(RuntimeError): - dispatcher.add_item("blah", "q", False) + dispatcher.add_item("blah", object_type="q", blocking=False) def test_queued_dispatch_error_attempting_to_use_non_existent_queue() -> None: @@ -215,7 +256,7 @@ def test_queued_dispatch_error_attempting_to_use_non_existent_queue() -> None: dispatcher.start() with pytest.raises(KeyError): - dispatcher.add_item("blah", "z", False) + dispatcher.add_item("blah", object_type="z", blocking=False) trigger.set() @@ -246,11 +287,11 @@ def callback( ) -> None: args[var]["counter"] += 1 - dispatchers.append(DirectDispatcher(callback, [variable], event)) + dispatchers.append(DirectDispatcher(callback=callback, object_types=[variable], termination_trigger=event)) for i in range(n_elements): for variable, dispatcher in zip(variables, dispatchers): - dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable) + dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, object_type=variable) event.set() From b065616f13c5803b7471dab19e244fd7cc1c731d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 4 Feb 2026 13:12:03 +0000 Subject: [PATCH 2/3] Added additional tests and fixed inconsistency across dispatchers --- simvue/dispatch/direct.py | 6 ++---- tests/functional/test_dispatch.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/simvue/dispatch/direct.py b/simvue/dispatch/direct.py index 09bd4214..2226a780 100644 --- a/simvue/dispatch/direct.py +++ b/simvue/dispatch/direct.py @@ -44,12 +44,10 @@ def add_item( object_type: str, metadata: dict[str, int | float] | None = None, **__, - ) -> bool: + ) -> None: """Execute callback on the given item""" - if not super().add_item(item, object_type, metadata): - return False + super().add_item(item, object_type, metadata) self._callback([item], object_type) - return True def run(self) -> None: """Run does not execute anything in this context""" diff --git a/tests/functional/test_dispatch.py b/tests/functional/test_dispatch.py index 9dee00f8..20cde450 100644 --- a/tests/functional/test_dispatch.py +++ b/tests/functional/test_dispatch.py @@ -262,8 +262,8 @@ def test_queued_dispatch_error_attempting_to_use_non_existent_queue() -> None: @pytest.mark.dispatch -@pytest.mark.parametrize("multiple", (True, False), ids=("multiple", "single")) -def test_direct_dispatcher(multiple: bool) -> None: +@pytest.mark.parametrize("scenario", ("multiple", "single", "max_exceed")) +def test_direct_dispatcher(scenario: typing.Literal["multiple", "single", "max_exceed"]) -> None: n_elements: int = 10 time_threshold: float = 1 @@ -273,12 +273,14 @@ def test_direct_dispatcher(multiple: bool) -> None: variables = ["lemons"] - if multiple: + if scenario == "multiple": variables.append("limes") event = Event() dispatchers: list[DirectDispatcher] = [] + thresholds = {} if scenario != "max_exceed" else {"max_size": 10} + for variable in variables: check_dict[variable] = {"counter": 0} @@ -287,10 +289,14 @@ def callback( ) -> None: args[var]["counter"] += 1 - dispatchers.append(DirectDispatcher(callback=callback, object_types=[variable], termination_trigger=event)) + dispatchers.append(DirectDispatcher(callback=callback, object_types=[variable], termination_trigger=event, thresholds=thresholds)) for i in range(n_elements): for variable, dispatcher in zip(variables, dispatchers): + if scenario == "max_exceed": + with pytest.raises(ObjectDispatchError): + dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, object_type=variable, metadata={"max_size": 12}) + return dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, object_type=variable) event.set() From 43c6133bca308bfb21a3f6c8bcb89400452a3630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 4 Feb 2026 13:58:20 +0000 Subject: [PATCH 3/3] Fix base class structure for dispatch.add_item --- simvue/dispatch/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simvue/dispatch/base.py b/simvue/dispatch/base.py index 2413cd92..b9eb94d3 100644 --- a/simvue/dispatch/base.py +++ b/simvue/dispatch/base.py @@ -23,9 +23,9 @@ def __init__( def add_item( self, item: typing.Any, + *, object_type: str, metadata: dict[str, int | float] | None = None, - *_, **__, ) -> None: _ = item