From f219e2254fb3670330023cfc4367e51db58a8e86 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Tue, 19 May 2026 11:10:21 +0200 Subject: [PATCH 1/2] AER-4020 Fixes to guarding against messages retreived after restart - In QueueWatchDog it should clear the runningTasks known on reset otherwise each time this method is subsequentially called it will retrigger the reset. - The keeping track of dispatchedTasks that was in PerformanceMetricsReporter should have been moved to TaskManagerMetricsRegister because there the register call has been moved to. - Added extra guard in LoadMetric to make sure it would never become negative. --- .../java/nl/aerius/taskmanager/QueueWatchDog.java | 6 ++++-- .../taskmanager/config/EnvOverrideDeserializer.java | 2 ++ .../nl/aerius/taskmanager/metrics/LoadMetric.java | 2 +- .../metrics/PerformanceMetricsReporter.java | 9 --------- .../metrics/TaskManagerMetricsRegister.java | 11 ++++++++++- .../java/nl/aerius/taskmanager/QueueWatchDogTest.java | 3 +++ .../metrics/TaskManagerMetricsRegisterTest.java | 6 +++--- 7 files changed, 23 insertions(+), 16 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index c58db32..34eb312 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -69,9 +69,11 @@ public void onWorkerFinished(final String messageId, final Map m } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { - LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName); + LOG.info("It looks like some tasks are zombies on {} worker queue. All tasks in state running are released (running:{}).", workerQueueName, + runningTasks.size()); + runningTasks.clear(); listeners.forEach(QueueWatchDogListener::reset); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java index b1302c6..532cfc6 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java @@ -101,6 +101,8 @@ private void applyEnvOverridesOnSetProperties(final ObjectNode node, final Strin node.put(entry.getKey(), envValue); } else if (entry.getValue().isObject()) { applyEnvOverrides((ObjectNode) entry.getValue(), fieldPath, resolveFieldType(targetClass, entry.getKey())); + } else { + LOG.debug("Unexpected entry value (in parentPath: {}) that is not an object: {}", parentPath, entry.getValue()); } } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 3e8cd0b..35d24ad 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -70,7 +70,7 @@ public synchronized void register(final int deltaUsedWorkers, final int numberOf total += delta * countFunction.applyAsDouble(numberOfWorkers, usedWorkers); totalMeasureTime += delta; last = newLast; - usedWorkers += deltaUsedWorkers; + usedWorkers = Math.max(0, usedWorkers + deltaUsedWorkers); } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 8d84ce4..cc4b6f4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -17,10 +17,8 @@ package nl.aerius.taskmanager.metrics; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -80,10 +78,6 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final Attributes attributesWorker; - // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue - // as it doesn't have any metrics on it anymore. - private final Set dispatchedTasks = new HashSet<>(); - public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter) { this.queueGroupName = queueGroupName; this.meter = meter; @@ -123,7 +117,6 @@ private DoubleGauge createGauge(final String name, final String description) { @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { - dispatchedTasks.add(messageId); final TaskMetrics taskMetrics = new TaskMetrics(messageMetaData); taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); @@ -136,12 +129,10 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - } @Override public void reset() { - dispatchedTasks.clear(); dispatchedQueueMetrics.entrySet().forEach(e -> e.getValue().process()); dispatchedWorkerMetrics.process(); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java index 4f0e6a8..e405f34 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java @@ -16,7 +16,9 @@ */ package nl.aerius.taskmanager.metrics; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,9 @@ public class TaskManagerMetricsRegister implements WorkerProducerHandler, Worker private final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider; private final StartupGuard startupGuard; + // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue + // as it doesn't have any metrics on it anymore. + private final Set dispatchedTasks = new HashSet<>(); private int numberOfWorkers; @@ -48,12 +53,15 @@ public TaskManagerMetricsRegister(final TaskManagerUsageMetricsProvider taskMana @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { + dispatchedTasks.add(messageId); taskManagerUsageMetricsProvider.register(1, numberOfWorkers); } @Override public void onWorkerFinished(final String messageId, final Map messageMetaData) { - taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + if (dispatchedTasks.remove(messageId)) { + taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + } } @Override @@ -68,6 +76,7 @@ public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, fina @Override public void reset() { + dispatchedTasks.clear(); taskManagerUsageMetricsProvider.reset(); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java index 88bff47..f370b26 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java @@ -63,6 +63,9 @@ protected LocalDateTime now() { now.set(now.get().plusMinutes(20)); qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); verify(listener, times(expected)).reset(); + // Call update again. This should not trigger reset again because we just called reset. + qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); + verify(listener, times(expected)).reset(); } private static List isDeadTests() { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java index 4a81d4e..c1864f4 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java @@ -61,7 +61,7 @@ void testOnWorkDispatched() { startUp(10, 0); register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); register.onWorkDispatched("2", createMap(QUEUE_2, 200L)); - verifytaskManagerUsageMetricsProvider(2, 2); + verifyTaskManagerUsageMetricsProvider(2, 2); } @Test @@ -70,7 +70,7 @@ void testOnWorkerFinished() { register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); register.onWorkerFinished("1", createMap(QUEUE_1, 100L)); register.onWorkerFinished("2", createMap(QUEUE_2, 200L)); - verifytaskManagerUsageMetricsProvider(3, -1); + verifyTaskManagerUsageMetricsProvider(2, 0); } @Test @@ -85,7 +85,7 @@ void testReset() throws InterruptedException { verify(taskManagerUsageMetricsProvider, times(1)).reset(); } - private void verifytaskManagerUsageMetricsProvider(final int times, final int sum) { + private void verifyTaskManagerUsageMetricsProvider(final int times, final int sum) { verify(taskManagerUsageMetricsProvider, times(times)).register(taskManagerUsagerMetricsProviderCaptor.capture(), anyInt()); assertEquals(sum, taskManagerUsagerMetricsProviderCaptor.getAllValues().stream().mapToInt(Integer::intValue).sum(), From d91484ada142cf5681c60268c0050465f3d35852 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Tue, 19 May 2026 14:37:04 +0200 Subject: [PATCH 2/2] Review comments Use concurrent hash sets and synchronize. --- .../nl/aerius/taskmanager/QueueWatchDog.java | 24 ++++++++++++------- .../metrics/TaskManagerMetricsRegister.java | 22 ++++++++++------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index 34eb312..f49e71c 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -18,10 +18,10 @@ import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,7 @@ class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler { private final String workerQueueName; private final List listeners = new ArrayList<>(); - private final Set runningTasks = new HashSet<>(); + private final Set runningTasks = ConcurrentHashMap.newKeySet(); private LocalDateTime firstProblem; @@ -55,12 +55,16 @@ public QueueWatchDog(final String workerQueueName) { } public void addQueueWatchDogListener(final QueueWatchDogListener listener) { - listeners.add(listener); + synchronized (runningTasks) { + listeners.add(listener); + } } @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { - runningTasks.add(messageId); + synchronized (runningTasks) { + runningTasks.add(messageId); + } } @Override @@ -70,11 +74,13 @@ public void onWorkerFinished(final String messageId, final Map m @Override public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { - if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { - LOG.info("It looks like some tasks are zombies on {} worker queue. All tasks in state running are released (running:{}).", workerQueueName, - runningTasks.size()); - runningTasks.clear(); - listeners.forEach(QueueWatchDogListener::reset); + synchronized (runningTasks) { + if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { + LOG.info("It looks like some tasks are zombies on {} worker queue. All tasks in state running are released (running:{}).", workerQueueName, + runningTasks.size()); + runningTasks.clear(); + listeners.forEach(QueueWatchDogListener::reset); + } } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java index e405f34..7a9577e 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java @@ -16,9 +16,9 @@ */ package nl.aerius.taskmanager.metrics; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ public class TaskManagerMetricsRegister implements WorkerProducerHandler, Worker private final StartupGuard startupGuard; // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue // as it doesn't have any metrics on it anymore. - private final Set dispatchedTasks = new HashSet<>(); + private final Set dispatchedTasks = ConcurrentHashMap.newKeySet(); private int numberOfWorkers; @@ -53,14 +53,18 @@ public TaskManagerMetricsRegister(final TaskManagerUsageMetricsProvider taskMana @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { - dispatchedTasks.add(messageId); - taskManagerUsageMetricsProvider.register(1, numberOfWorkers); + synchronized (dispatchedTasks) { + dispatchedTasks.add(messageId); + taskManagerUsageMetricsProvider.register(1, numberOfWorkers); + } } @Override public void onWorkerFinished(final String messageId, final Map messageMetaData) { - if (dispatchedTasks.remove(messageId)) { - taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + synchronized (dispatchedTasks) { + if (dispatchedTasks.remove(messageId)) { + taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + } } } @@ -76,7 +80,9 @@ public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, fina @Override public void reset() { - dispatchedTasks.clear(); - taskManagerUsageMetricsProvider.reset(); + synchronized (dispatchedTasks) { + dispatchedTasks.clear(); + taskManagerUsageMetricsProvider.reset(); + } } }