Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Dispatch
========
"""Dispatch

Contains factory method for selecting dispatcher type based on Simvue Configuration
"""
Expand All @@ -20,11 +18,11 @@

def Dispatcher(
mode: typing.Literal["direct", "queued"],
callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None],
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: "Event",
name: str | None = None,
**kwargs,
thresholds: dict[str, int | float] | None = None,
) -> "DispatcherBaseClass":
"""Returns instance of dispatcher based on configuration

Expand All @@ -46,6 +44,10 @@ def Dispatcher(
event which triggers termination of the dispatcher
name : str | None, optional
name for the underlying thread, default None
thresholds: dict[str, int | float] | None, default None
if metadata is provided during item addition, specify
thresholds under which dispatch of an item is permitted,
default is None

Returns
-------
Expand All @@ -58,7 +60,7 @@ def Dispatcher(
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
**kwargs,
thresholds=thresholds,
)
else:
logger.debug("Using queued dispatch for metric and queue sending")
Expand All @@ -67,5 +69,5 @@ def Dispatcher(
object_types=object_types,
termination_trigger=termination_trigger,
name=name,
**kwargs,
thresholds=thresholds,
)
26 changes: 22 additions & 4 deletions simvue/factory/dispatch/base.py → simvue/dispatch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,41 @@
import abc
import typing

from simvue.exception import ObjectDispatchError


class DispatcherBaseClass(abc.ABC):
def __init__(
self,
*,
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: threading.Event,
**_,
thresholds: dict[str, int | float] | None = None,
) -> None:
super().__init__()
self._thresholds: dict[str, int | float] = thresholds or {}
self._object_types: list[str] = object_types
self._termination_trigger = termination_trigger
self._callback = callback

@abc.abstractmethod
def add_item(self, item: typing.Any, object_type: str, *_, **__) -> None:
pass
def add_item(
self,
item: typing.Any,
*,
object_type: str,
metadata: dict[str, int | float] | None = None,
**__,
) -> None:
_ = item
_ = object_type
if not metadata:
return
for key, threshold in self._thresholds.items():
if key in metadata and metadata[key] > threshold:
raise ObjectDispatchError(
label=key, threshold=threshold, value=metadata[key]
)

@abc.abstractmethod
def run(self) -> None:
Expand Down
18 changes: 16 additions & 2 deletions simvue/factory/dispatch/direct.py → simvue/dispatch/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ class DirectDispatcher(DispatcherBaseClass):

def __init__(
self,
*,
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: threading.Event,
**_,
thresholds: dict[str, int | float] | None = None,
) -> None:
"""Initialise a new DirectDispatcher instance

Expand All @@ -24,15 +25,28 @@ def __init__(
categories, this is mainly used for creation of queues in a QueueDispatcher
termination_trigger : Event
event which triggers termination of the dispatcher
thresholds: int | float
if metadata is provided during item addition, specify
thresholds under which dispatch of an item is permitted,
default is None
"""
super().__init__(
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
thresholds=thresholds,
)

def add_item(self, item: typing.Any, object_type: str, *_, **__) -> None:
def add_item(
self,
item: typing.Any,
*,
object_type: str,
metadata: dict[str, int | float] | None = None,
**__,
) -> None:
"""Execute callback on the given item"""
super().add_item(item, object_type, metadata)
self._callback([item], object_type)

def run(self) -> None:
Expand Down
46 changes: 36 additions & 10 deletions simvue/factory/dispatch/queued.py → simvue/dispatch/queued.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ class QueuedDispatcher(threading.Thread, DispatcherBaseClass):

def __init__(
self,
*,
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: threading.Event,
name: str | None = None,
max_buffer_size: int = MAX_BUFFER_SIZE,
max_read_rate: float = MAX_REQUESTS_PER_SECOND,
thresholds: dict[str, int | float] | None = None,
) -> None:
"""
Initialise a new queue based dispatcher
Expand All @@ -58,34 +60,47 @@ def __init__(
maximum number of items allowed in created buffer.
max_read_rate : float
maximum rate at which the callback can be executed
thresholds: dict[str, int | float] | None, optional
if metadata is provided during item addition, specify
thresholds within which a single dispatch is permitted,
default is None
"""
DispatcherBaseClass.__init__(
self,
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
thresholds=thresholds,
)
super().__init__(name=name, daemon=True)

self._termination_trigger = termination_trigger
self._callback = callback
self._queues = {label: queue.Queue() for label in object_types}
self._max_read_rate = max_read_rate
self._max_buffer_size = max_buffer_size
self._send_timer = 0
self._termination_trigger: threading.Event = termination_trigger
self._callback: typing.Callable[[list[typing.Any], str], None] = callback
self._queues: dict[str, queue.Queue[typing.Any]] = {
label: queue.Queue() for label in object_types
}
self._max_read_rate: float = max_read_rate
self._max_buffer_size: int = max_buffer_size
self._send_timer: int = 0

def add_item(
self, item: typing.Any, object_type: str, blocking: bool = True
self,
item: typing.Any,
*,
object_type: str,
blocking: bool = True,
metadata: dict[str, int | float] | None = None,
) -> None:
"""Add an item to the specified queue with/without blocking"""
super().add_item(item, object_type, metadata)
if self._termination_trigger.is_set():
raise RuntimeError(
f"Cannot append item '{item}' to queue '{object_type}', "
"termination called."
+ "termination called."
)
if object_type not in self._queues:
raise KeyError(f"No queue '{object_type}' found")
self._queues[object_type].put(item, block=blocking)
self._queues[object_type].put((item, metadata or {}), block=blocking)

@property
def empty(self) -> bool:
Expand All @@ -111,12 +126,23 @@ def _create_buffer(self, queue_label: str) -> list[typing.Any]:
The length of the buffer is constrained.
"""
_buffer: list[typing.Any] = []
_criteria: dict[str, int | float] = {}
_threshold_totals: dict[str, float] = {k: 0 for k in self._thresholds}

while (
not self._queues[queue_label].empty()
and len(_buffer) < self._max_buffer_size
and all(
_threshold_totals[key] < self._thresholds[key]
for key in _threshold_totals
)
):
_item = self._queues[queue_label].get(block=False)
_item, _metadata = typing.cast(
"tuple[typing.Any, dict[str, int | float]]",
self._queues[queue_label].get(block=False),
)
for key in _threshold_totals:
_threshold_totals[key] += _metadata.get(key, 0)
_buffer.append(_item)
self._queues[queue_label].task_done()

Expand Down
11 changes: 10 additions & 1 deletion simvue/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,13 @@ def __init__(self, obj_type: str, name: str, extra: str | None = None) -> None:
class SimvueRunError(RuntimeError):
"""A special sub-class of runtime error specifically for Simvue run errors"""

pass

class ObjectDispatchError(Exception):
"""Raised if object dispatch failed due to condition."""

def __init__(self, label: str, threshold: int | float, value: int | float) -> None:
self.msg = (
f"Object dispatch failed, {label} "
+ f"of {value} exceeds maximum permitted value of {threshold}"
)
super().__init__(self.msg)
6 changes: 0 additions & 6 deletions simvue/factory/__init__.py

This file was deleted.

Loading
Loading