From dc8bfd319be7cb40133e72e8131a29acc9037a2a Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sun, 3 May 2026 14:43:31 -0400 Subject: [PATCH 1/5] first attempt perf wins --- src/ezmsg/core/profiling.py | 190 +++++++++++++++++++++++++++--------- 1 file changed, 146 insertions(+), 44 deletions(-) diff --git a/src/ezmsg/core/profiling.py b/src/ezmsg/core/profiling.py index 27b9cdf8..063c6785 100644 --- a/src/ezmsg/core/profiling.py +++ b/src/ezmsg/core/profiling.py @@ -3,7 +3,6 @@ import time import heapq -from collections import deque from dataclasses import dataclass, field from typing import Callable, TypeAlias from uuid import UUID @@ -23,12 +22,113 @@ # Must return monotonic nanoseconds so *_ns metrics remain unit-consistent. PROFILE_TIME_TYPE: TypeAlias = Callable[[], int] PROFILE_TIME: PROFILE_TIME_TYPE = time.perf_counter_ns +TraceRecord: TypeAlias = tuple[ + int, + str, + str, + str, + float, + ProfileChannelType | None, + int | None, +] + + +@dataclass +class TraceRingBuffer: + maxlen: int + _timestamps_ns: list[int | None] = field(init=False) + _endpoint_ids: list[str | None] = field(init=False) + _topics: list[str | None] = field(init=False) + _metrics: list[str | None] = field(init=False) + _values: list[float | None] = field(init=False) + _channel_kinds: list[ProfileChannelType | None] = field(init=False) + _sample_seqs: list[int | None] = field(init=False) + _head: int = field(default=0, init=False) + _size: int = field(default=0, init=False) + + def __post_init__(self) -> None: + cap = max(1, int(self.maxlen)) + self.maxlen = cap + self._timestamps_ns = [None] * cap + self._endpoint_ids = [None] * cap + self._topics = [None] * cap + self._metrics = [None] * cap + self._values = [None] * cap + self._channel_kinds = [None] * cap + self._sample_seqs = [None] * cap + + def __bool__(self) -> bool: + return self._size > 0 + + def clear(self) -> None: + self._head = 0 + self._size = 0 + + def append(self, record: TraceRecord) -> None: + idx = (self._head + self._size) % self.maxlen + if self._size == self.maxlen: + idx = self._head + self._head = (self._head + 1) % self.maxlen + else: + self._size += 1 + ts, endpoint_id, topic, metric, value, channel_kind, sample_seq = record + self._timestamps_ns[idx] = ts + self._endpoint_ids[idx] = endpoint_id + self._topics[idx] = topic + self._metrics[idx] = metric + self._values[idx] = value + self._channel_kinds[idx] = channel_kind + self._sample_seqs[idx] = sample_seq + + def peek(self) -> TraceRecord: + if self._size == 0: + raise IndexError('peek from empty TraceRingBuffer') + idx = self._head + return ( + self._timestamps_ns[idx], + self._endpoint_ids[idx], + self._topics[idx], + self._metrics[idx], + self._values[idx], + self._channel_kinds[idx], + self._sample_seqs[idx], + ) + + def popleft(self) -> TraceRecord: + if self._size == 0: + raise IndexError('popleft from empty TraceRingBuffer') + idx = self._head + record = ( + self._timestamps_ns[idx], + self._endpoint_ids[idx], + self._topics[idx], + self._metrics[idx], + self._values[idx], + self._channel_kinds[idx], + self._sample_seqs[idx], + ) + self._head = (self._head + 1) % self.maxlen + self._size -= 1 + return record def _endpoint_id(topic: str, id: UUID) -> str: return f"{topic}:{id}" +def _trace_sample_from_record(record: TraceRecord) -> ProfilingTraceSample: + timestamp_ns, endpoint_id, topic, metric, value, channel_kind, sample_seq = record + return ProfilingTraceSample( + timestamp=float(timestamp_ns), + endpoint_id=endpoint_id, + topic=topic, + metric=metric, + value=value, + channel_kind=channel_kind, + sample_seq=sample_seq, + ) + + @dataclass class _PublisherMetrics: topic: str @@ -44,8 +144,8 @@ class _PublisherMetrics: _trace_counter: int = 0 _trace_publish_delta_enabled: bool = False _trace_backpressure_wait_enabled: bool = False - trace_samples: deque[ProfilingTraceSample] = field( - default_factory=lambda: deque(maxlen=TRACE_MAX_SAMPLES) + trace_samples: TraceRingBuffer = field( + default_factory=lambda: TraceRingBuffer(TRACE_MAX_SAMPLES) ) def record_publish(self, inflight: int, msg_seq: int | None = None) -> None: @@ -64,13 +164,14 @@ def record_publish(self, inflight: int, msg_seq: int | None = None) -> None: ) self._last_publish_ts_ns = now_ns self.trace_samples.append( - ProfilingTraceSample( - timestamp=float(now_ns), - endpoint_id=self.endpoint_id, - topic=self.topic, - metric="publish_delta_ns", - value=float(publish_delta_ns), - sample_seq=msg_seq, + ( + now_ns, + self.endpoint_id, + self.topic, + "publish_delta_ns", + float(publish_delta_ns), + None, + msg_seq, ) ) @@ -80,13 +181,14 @@ def record_backpressure_wait(self, wait_ns: int, msg_seq: int | None = None) -> now_ns = PROFILE_TIME() self.trace_samples.append( - ProfilingTraceSample( - timestamp=float(now_ns), - endpoint_id=self.endpoint_id, - topic=self.topic, - metric="backpressure_wait_ns", - value=float(wait_ns), - sample_seq=msg_seq, + ( + now_ns, + self.endpoint_id, + self.topic, + "backpressure_wait_ns", + float(wait_ns), + None, + msg_seq, ) ) @@ -135,8 +237,8 @@ class _SubscriberMetrics: _trace_counter: int = 0 _trace_lease_time_enabled: bool = False _trace_user_span_enabled: bool = False - trace_samples: deque[ProfilingTraceSample] = field( - default_factory=lambda: deque(maxlen=TRACE_MAX_SAMPLES) + trace_samples: TraceRingBuffer = field( + default_factory=lambda: TraceRingBuffer(TRACE_MAX_SAMPLES) ) def begin_message(self, channel_kind: ProfileChannelType) -> bool: @@ -176,14 +278,14 @@ def record_lease_time( now_ns = PROFILE_TIME() self.trace_samples.append( - ProfilingTraceSample( - timestamp=float(now_ns), - endpoint_id=self.endpoint_id, - topic=self.topic, - metric="lease_time_ns", - value=float(lease_ns), - channel_kind=channel_kind, - sample_seq=msg_seq, + ( + now_ns, + self.endpoint_id, + self.topic, + "lease_time_ns", + float(lease_ns), + channel_kind, + msg_seq, ) ) @@ -200,14 +302,14 @@ def record_user_span( now_ns = PROFILE_TIME() self.trace_samples.append( - ProfilingTraceSample( - timestamp=float(now_ns), - endpoint_id=self.endpoint_id, - topic=self.topic if label is None else f"{self.topic}:{label}", - metric="user_span_ns", - value=float(span_ns), - channel_kind=self.channel_kind_last, - sample_seq=msg_seq, + ( + now_ns, + self.endpoint_id, + self.topic if label is None else f"{self.topic}:{label}", + "user_span_ns", + float(span_ns), + self.channel_kind_last, + msg_seq, ) ) @@ -334,7 +436,7 @@ def trace_batch(self, max_samples: int = 1000) -> ProcessProfilingTraceBatch: samples: list[ProfilingTraceSample] = [] limit = max(1, int(max_samples)) - queues: list[deque[ProfilingTraceSample]] = [] + queues: list[TraceRingBuffer] = [] for metric in self._publishers.values(): if metric.trace_samples: queues.append(metric.trace_samples) @@ -345,13 +447,13 @@ def trace_batch(self, max_samples: int = 1000) -> ProcessProfilingTraceBatch: if len(queues) == 1: queue = queues[0] while queue and len(samples) < limit: - samples.append(queue.popleft()) + samples.append(_trace_sample_from_record(queue.popleft())) elif len(queues) > 1: heap: list[tuple[float, int, int]] = [] for idx, queue in enumerate(queues): - sample = queue[0] - seq = sample.sample_seq if sample.sample_seq is not None else -1 - heapq.heappush(heap, (sample.timestamp, seq, idx)) + sample = queue.peek() + seq = sample[6] if sample[6] is not None else -1 + heapq.heappush(heap, (float(sample[0]), seq, idx)) while heap and len(samples) < limit: _timestamp, _seq, queue_idx = heapq.heappop(heap) @@ -359,11 +461,11 @@ def trace_batch(self, max_samples: int = 1000) -> ProcessProfilingTraceBatch: if not queue: continue sample = queue.popleft() - samples.append(sample) + samples.append(_trace_sample_from_record(sample)) if queue: - nxt = queue[0] - nxt_seq = nxt.sample_seq if nxt.sample_seq is not None else -1 - heapq.heappush(heap, (nxt.timestamp, nxt_seq, queue_idx)) + nxt = queue.peek() + nxt_seq = nxt[6] if nxt[6] is not None else -1 + heapq.heappush(heap, (float(nxt[0]), nxt_seq, queue_idx)) return ProcessProfilingTraceBatch( process_id=self._process_id, From d8ada235b0c240af24c74ed30621a33f6310e0a6 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Thu, 7 May 2026 10:42:08 -0400 Subject: [PATCH 2/5] deque is faster --- src/ezmsg/core/profiling.py | 94 ++++--------------------------------- 1 file changed, 8 insertions(+), 86 deletions(-) diff --git a/src/ezmsg/core/profiling.py b/src/ezmsg/core/profiling.py index 063c6785..3ca722f3 100644 --- a/src/ezmsg/core/profiling.py +++ b/src/ezmsg/core/profiling.py @@ -3,6 +3,7 @@ import time import heapq +from collections import deque from dataclasses import dataclass, field from typing import Callable, TypeAlias from uuid import UUID @@ -33,85 +34,6 @@ ] -@dataclass -class TraceRingBuffer: - maxlen: int - _timestamps_ns: list[int | None] = field(init=False) - _endpoint_ids: list[str | None] = field(init=False) - _topics: list[str | None] = field(init=False) - _metrics: list[str | None] = field(init=False) - _values: list[float | None] = field(init=False) - _channel_kinds: list[ProfileChannelType | None] = field(init=False) - _sample_seqs: list[int | None] = field(init=False) - _head: int = field(default=0, init=False) - _size: int = field(default=0, init=False) - - def __post_init__(self) -> None: - cap = max(1, int(self.maxlen)) - self.maxlen = cap - self._timestamps_ns = [None] * cap - self._endpoint_ids = [None] * cap - self._topics = [None] * cap - self._metrics = [None] * cap - self._values = [None] * cap - self._channel_kinds = [None] * cap - self._sample_seqs = [None] * cap - - def __bool__(self) -> bool: - return self._size > 0 - - def clear(self) -> None: - self._head = 0 - self._size = 0 - - def append(self, record: TraceRecord) -> None: - idx = (self._head + self._size) % self.maxlen - if self._size == self.maxlen: - idx = self._head - self._head = (self._head + 1) % self.maxlen - else: - self._size += 1 - ts, endpoint_id, topic, metric, value, channel_kind, sample_seq = record - self._timestamps_ns[idx] = ts - self._endpoint_ids[idx] = endpoint_id - self._topics[idx] = topic - self._metrics[idx] = metric - self._values[idx] = value - self._channel_kinds[idx] = channel_kind - self._sample_seqs[idx] = sample_seq - - def peek(self) -> TraceRecord: - if self._size == 0: - raise IndexError('peek from empty TraceRingBuffer') - idx = self._head - return ( - self._timestamps_ns[idx], - self._endpoint_ids[idx], - self._topics[idx], - self._metrics[idx], - self._values[idx], - self._channel_kinds[idx], - self._sample_seqs[idx], - ) - - def popleft(self) -> TraceRecord: - if self._size == 0: - raise IndexError('popleft from empty TraceRingBuffer') - idx = self._head - record = ( - self._timestamps_ns[idx], - self._endpoint_ids[idx], - self._topics[idx], - self._metrics[idx], - self._values[idx], - self._channel_kinds[idx], - self._sample_seqs[idx], - ) - self._head = (self._head + 1) % self.maxlen - self._size -= 1 - return record - - def _endpoint_id(topic: str, id: UUID) -> str: return f"{topic}:{id}" @@ -144,8 +66,8 @@ class _PublisherMetrics: _trace_counter: int = 0 _trace_publish_delta_enabled: bool = False _trace_backpressure_wait_enabled: bool = False - trace_samples: TraceRingBuffer = field( - default_factory=lambda: TraceRingBuffer(TRACE_MAX_SAMPLES) + trace_samples: deque[TraceRecord] = field( + default_factory=lambda: deque(maxlen=TRACE_MAX_SAMPLES) ) def record_publish(self, inflight: int, msg_seq: int | None = None) -> None: @@ -237,8 +159,8 @@ class _SubscriberMetrics: _trace_counter: int = 0 _trace_lease_time_enabled: bool = False _trace_user_span_enabled: bool = False - trace_samples: TraceRingBuffer = field( - default_factory=lambda: TraceRingBuffer(TRACE_MAX_SAMPLES) + trace_samples: deque[TraceRecord] = field( + default_factory=lambda: deque(maxlen=TRACE_MAX_SAMPLES) ) def begin_message(self, channel_kind: ProfileChannelType) -> bool: @@ -436,7 +358,7 @@ def trace_batch(self, max_samples: int = 1000) -> ProcessProfilingTraceBatch: samples: list[ProfilingTraceSample] = [] limit = max(1, int(max_samples)) - queues: list[TraceRingBuffer] = [] + queues: list[deque[TraceRecord]] = [] for metric in self._publishers.values(): if metric.trace_samples: queues.append(metric.trace_samples) @@ -451,7 +373,7 @@ def trace_batch(self, max_samples: int = 1000) -> ProcessProfilingTraceBatch: elif len(queues) > 1: heap: list[tuple[float, int, int]] = [] for idx, queue in enumerate(queues): - sample = queue.peek() + sample = queue[0] seq = sample[6] if sample[6] is not None else -1 heapq.heappush(heap, (float(sample[0]), seq, idx)) @@ -463,7 +385,7 @@ def trace_batch(self, max_samples: int = 1000) -> ProcessProfilingTraceBatch: sample = queue.popleft() samples.append(_trace_sample_from_record(sample)) if queue: - nxt = queue.peek() + nxt = queue[0] nxt_seq = nxt[6] if nxt[6] is not None else -1 heapq.heappush(heap, (float(nxt[0]), nxt_seq, queue_idx)) From ea1a4b5c8bd35cca972a4b60520ad84ac2273501 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Thu, 7 May 2026 15:37:59 -0400 Subject: [PATCH 3/5] sample_mod = 1 shortcut --- src/ezmsg/core/profiling.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ezmsg/core/profiling.py b/src/ezmsg/core/profiling.py index 3ca722f3..4689de3a 100644 --- a/src/ezmsg/core/profiling.py +++ b/src/ezmsg/core/profiling.py @@ -76,9 +76,11 @@ def record_publish(self, inflight: int, msg_seq: int | None = None) -> None: if not self._trace_publish_delta_enabled: return - self._trace_counter += 1 - if self._trace_counter % max(1, self.trace_sample_mod) != 0: - return + sample_mod = self.trace_sample_mod + if sample_mod != 1: + self._trace_counter += 1 + if self._trace_counter % sample_mod != 0: + return now_ns = PROFILE_TIME() publish_delta_ns = ( @@ -170,8 +172,12 @@ def begin_message(self, channel_kind: ProfileChannelType) -> bool: if not (self._trace_lease_time_enabled or self._trace_user_span_enabled): return False + sample_mod = self.trace_sample_mod + if sample_mod == 1: + return True + self._trace_counter += 1 - return self._trace_counter % max(1, self.trace_sample_mod) == 0 + return self._trace_counter % sample_mod == 0 def record_receive( self, From 2327ef1fc4ef0758fc0375f05563e773a5cfbb45 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Thu, 7 May 2026 16:05:47 -0400 Subject: [PATCH 4/5] subscriber bookkeeping fixes --- src/ezmsg/core/profiling.py | 56 +++++++++++++++++++++++++++++++++---- src/ezmsg/core/subclient.py | 29 +++++++++---------- 2 files changed, 64 insertions(+), 21 deletions(-) diff --git a/src/ezmsg/core/profiling.py b/src/ezmsg/core/profiling.py index 4689de3a..1c60a18c 100644 --- a/src/ezmsg/core/profiling.py +++ b/src/ezmsg/core/profiling.py @@ -165,19 +165,27 @@ class _SubscriberMetrics: default_factory=lambda: deque(maxlen=TRACE_MAX_SAMPLES) ) - def begin_message(self, channel_kind: ProfileChannelType) -> bool: + def trace_receive_state( + self, channel_kind: ProfileChannelType + ) -> tuple[bool, bool, bool]: self.messages_received_total += 1 self.channel_kind_last = channel_kind - if not (self._trace_lease_time_enabled or self._trace_user_span_enabled): - return False + trace_lease = self._trace_lease_time_enabled + trace_user_span = self._trace_user_span_enabled + if not (trace_lease or trace_user_span): + return False, trace_lease, trace_user_span sample_mod = self.trace_sample_mod if sample_mod == 1: - return True + return True, trace_lease, trace_user_span self._trace_counter += 1 - return self._trace_counter % sample_mod == 0 + return (self._trace_counter % sample_mod == 0), trace_lease, trace_user_span + + def begin_message(self, channel_kind: ProfileChannelType) -> bool: + sampled, _trace_lease, _trace_user_span = self.trace_receive_state(channel_kind) + return sampled def record_receive( self, @@ -217,6 +225,25 @@ def record_lease_time( ) ) + def append_lease_time( + self, + channel_kind: ProfileChannelType, + lease_ns: int, + msg_seq: int | None = None, + ) -> None: + now_ns = PROFILE_TIME() + self.trace_samples.append( + ( + now_ns, + self.endpoint_id, + self.topic, + "lease_time_ns", + float(lease_ns), + channel_kind, + msg_seq, + ) + ) + def record_user_span( self, span_ns: int, @@ -241,6 +268,25 @@ def record_user_span( ) ) + def append_user_span( + self, + span_ns: int, + label: str | None, + msg_seq: int | None = None, + ) -> None: + now_ns = PROFILE_TIME() + self.trace_samples.append( + ( + now_ns, + self.endpoint_id, + self.topic if label is None else f"{self.topic}:{label}", + "user_span_ns", + float(span_ns), + self.channel_kind_last, + msg_seq, + ) + ) + def snapshot( self, now_ns: int, diff --git a/src/ezmsg/core/subclient.py b/src/ezmsg/core/subclient.py index a3ad3246..e64c9bc4 100644 --- a/src/ezmsg/core/subclient.py +++ b/src/ezmsg/core/subclient.py @@ -304,37 +304,34 @@ async def recv_zero_copy(self) -> typing.AsyncGenerator[typing.Any, None]: channel = self._channels[pub_id] channel_kind = channel.channel_kind self._active_msg_seq = msg_id - self._active_trace_sampled = self._profile.begin_message(channel_kind) + sampled, trace_lease, trace_user_span = self._profile.trace_receive_state( + channel_kind + ) + self._active_trace_sampled = sampled try: - trace_lease = self._profile._trace_lease_time_enabled start_ns = PROFILE_TIME() if trace_lease else None with channel.get(msg_id, self.id) as msg: yield msg - lease_ns = None - if trace_lease and start_ns is not None: - lease_ns = PROFILE_TIME() - start_ns - self._profile.record_lease_time( - channel_kind, - lease_ns, - msg_seq=msg_id, - sampled=self._active_trace_sampled, - ) + if trace_lease and sampled and start_ns is not None: + self._profile.append_lease_time( + channel_kind, + PROFILE_TIME() - start_ns, + msg_seq=msg_id, + ) finally: self._active_msg_seq = None self._active_trace_sampled = False def begin_profile(self) -> int: - if not self._profile._trace_user_span_enabled or not self._active_trace_sampled: + if not self._active_trace_sampled: return 0 return PROFILE_TIME() def end_profile(self, start_ns: int, label: str | None = None) -> None: if start_ns <= 0: return - end_ns = PROFILE_TIME() - self._profile.record_user_span( - end_ns - start_ns, + self._profile.append_user_span( + PROFILE_TIME() - start_ns, label, msg_seq=self._active_msg_seq, - sampled=self._active_trace_sampled, ) From afe5231e312927f2fd4ca6a320e47247507b3f5d Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Thu, 7 May 2026 16:15:45 -0400 Subject: [PATCH 5/5] inlining profiling --- src/ezmsg/core/pubclient.py | 30 +++++++++++++++++++++++++++++- src/ezmsg/core/subclient.py | 32 +++++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/ezmsg/core/pubclient.py b/src/ezmsg/core/pubclient.py index 706c6c7a..11abbe7c 100644 --- a/src/ezmsg/core/pubclient.py +++ b/src/ezmsg/core/pubclient.py @@ -565,7 +565,35 @@ async def broadcast(self, obj: Any) -> None: f"Publisher {self.id}: Channel {channel.id} connection fail" ) - self._profile.record_publish(self._backpressure.pressure, msg_seq=self._msg_id) + profile = self._profile + inflight = self._backpressure.pressure + profile.messages_published_total += 1 + profile.inflight_messages_current = inflight + + if profile._trace_publish_delta_enabled: + sample_mod = profile.trace_sample_mod + sampled = True + if sample_mod != 1: + profile._trace_counter += 1 + sampled = (profile._trace_counter % sample_mod) == 0 + if sampled: + now_ns = PROFILE_TIME() + last_publish_ts_ns = profile._last_publish_ts_ns + publish_delta_ns = ( + 0 if last_publish_ts_ns is None else now_ns - last_publish_ts_ns + ) + profile._last_publish_ts_ns = now_ns + profile.trace_samples.append( + ( + now_ns, + profile.endpoint_id, + profile.topic, + "publish_delta_ns", + float(publish_delta_ns), + None, + self._msg_id, + ) + ) self._msg_id += 1 def _should_use_local_fast_path(self) -> bool: diff --git a/src/ezmsg/core/subclient.py b/src/ezmsg/core/subclient.py index e64c9bc4..4ae67f0f 100644 --- a/src/ezmsg/core/subclient.py +++ b/src/ezmsg/core/subclient.py @@ -304,7 +304,7 @@ async def recv_zero_copy(self) -> typing.AsyncGenerator[typing.Any, None]: channel = self._channels[pub_id] channel_kind = channel.channel_kind self._active_msg_seq = msg_id - sampled, trace_lease, trace_user_span = self._profile.trace_receive_state( + sampled, trace_lease, _trace_user_span = self._profile.trace_receive_state( channel_kind ) self._active_trace_sampled = sampled @@ -313,10 +313,17 @@ async def recv_zero_copy(self) -> typing.AsyncGenerator[typing.Any, None]: with channel.get(msg_id, self.id) as msg: yield msg if trace_lease and sampled and start_ns is not None: - self._profile.append_lease_time( - channel_kind, - PROFILE_TIME() - start_ns, - msg_seq=msg_id, + now_ns = PROFILE_TIME() + self._profile.trace_samples.append( + ( + now_ns, + self._profile.endpoint_id, + self._profile.topic, + "lease_time_ns", + float(now_ns - start_ns), + channel_kind, + msg_id, + ) ) finally: self._active_msg_seq = None @@ -330,8 +337,15 @@ def begin_profile(self) -> int: def end_profile(self, start_ns: int, label: str | None = None) -> None: if start_ns <= 0: return - self._profile.append_user_span( - PROFILE_TIME() - start_ns, - label, - msg_seq=self._active_msg_seq, + now_ns = PROFILE_TIME() + self._profile.trace_samples.append( + ( + now_ns, + self._profile.endpoint_id, + self._profile.topic if label is None else f"{self._profile.topic}:{label}", + "user_span_ns", + float(now_ns - start_ns), + self._profile.channel_kind_last, + self._active_msg_seq, + ) )