diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index c58db32..f49e71c 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -18,10 +18,10 @@ import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,7 @@ class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler { private final String workerQueueName; private final List listeners = new ArrayList<>(); - private final Set runningTasks = new HashSet<>(); + private final Set runningTasks = ConcurrentHashMap.newKeySet(); private LocalDateTime firstProblem; @@ -55,12 +55,16 @@ public QueueWatchDog(final String workerQueueName) { } public void addQueueWatchDogListener(final QueueWatchDogListener listener) { - listeners.add(listener); + synchronized (runningTasks) { + listeners.add(listener); + } } @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { - runningTasks.add(messageId); + synchronized (runningTasks) { + runningTasks.add(messageId); + } } @Override @@ -69,10 +73,14 @@ public void onWorkerFinished(final String messageId, final Map m } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) { - if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { - LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName); - listeners.forEach(QueueWatchDogListener::reset); + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + synchronized (runningTasks) { + if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { + LOG.info("It looks like some tasks are zombies on {} worker queue. All tasks in state running are released (running:{}).", workerQueueName, + runningTasks.size()); + runningTasks.clear(); + listeners.forEach(QueueWatchDogListener::reset); + } } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java index b1302c6..532cfc6 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/config/EnvOverrideDeserializer.java @@ -101,6 +101,8 @@ private void applyEnvOverridesOnSetProperties(final ObjectNode node, final Strin node.put(entry.getKey(), envValue); } else if (entry.getValue().isObject()) { applyEnvOverrides((ObjectNode) entry.getValue(), fieldPath, resolveFieldType(targetClass, entry.getKey())); + } else { + LOG.debug("Unexpected entry value (in parentPath: {}) that is not an object: {}", parentPath, entry.getValue()); } } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 3e8cd0b..35d24ad 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -70,7 +70,7 @@ public synchronized void register(final int deltaUsedWorkers, final int numberOf total += delta * countFunction.applyAsDouble(numberOfWorkers, usedWorkers); totalMeasureTime += delta; last = newLast; - usedWorkers += deltaUsedWorkers; + usedWorkers = Math.max(0, usedWorkers + deltaUsedWorkers); } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 8d84ce4..cc4b6f4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -17,10 +17,8 @@ package nl.aerius.taskmanager.metrics; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -80,10 +78,6 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final Attributes attributesWorker; - // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue - // as it doesn't have any metrics on it anymore. - private final Set dispatchedTasks = new HashSet<>(); - public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter) { this.queueGroupName = queueGroupName; this.meter = meter; @@ -123,7 +117,6 @@ private DoubleGauge createGauge(final String name, final String description) { @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { - dispatchedTasks.add(messageId); final TaskMetrics taskMetrics = new TaskMetrics(messageMetaData); taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); @@ -136,12 +129,10 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - } @Override public void reset() { - dispatchedTasks.clear(); dispatchedQueueMetrics.entrySet().forEach(e -> e.getValue().process()); dispatchedWorkerMetrics.process(); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java index 4f0e6a8..7a9577e 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java @@ -17,6 +17,8 @@ package nl.aerius.taskmanager.metrics; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,9 @@ public class TaskManagerMetricsRegister implements WorkerProducerHandler, Worker private final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider; private final StartupGuard startupGuard; + // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue + // as it doesn't have any metrics on it anymore. + private final Set dispatchedTasks = ConcurrentHashMap.newKeySet(); private int numberOfWorkers; @@ -48,12 +53,19 @@ public TaskManagerMetricsRegister(final TaskManagerUsageMetricsProvider taskMana @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { - taskManagerUsageMetricsProvider.register(1, numberOfWorkers); + synchronized (dispatchedTasks) { + dispatchedTasks.add(messageId); + taskManagerUsageMetricsProvider.register(1, numberOfWorkers); + } } @Override public void onWorkerFinished(final String messageId, final Map messageMetaData) { - taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + synchronized (dispatchedTasks) { + if (dispatchedTasks.remove(messageId)) { + taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + } + } } @Override @@ -68,6 +80,9 @@ public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, fina @Override public void reset() { - taskManagerUsageMetricsProvider.reset(); + synchronized (dispatchedTasks) { + dispatchedTasks.clear(); + taskManagerUsageMetricsProvider.reset(); + } } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java index 88bff47..f370b26 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java @@ -63,6 +63,9 @@ protected LocalDateTime now() { now.set(now.get().plusMinutes(20)); qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); verify(listener, times(expected)).reset(); + // Call update again. This should not trigger reset again because we just called reset. + qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); + verify(listener, times(expected)).reset(); } private static List isDeadTests() { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java index 4a81d4e..c1864f4 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java @@ -61,7 +61,7 @@ void testOnWorkDispatched() { startUp(10, 0); register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); register.onWorkDispatched("2", createMap(QUEUE_2, 200L)); - verifytaskManagerUsageMetricsProvider(2, 2); + verifyTaskManagerUsageMetricsProvider(2, 2); } @Test @@ -70,7 +70,7 @@ void testOnWorkerFinished() { register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); register.onWorkerFinished("1", createMap(QUEUE_1, 100L)); register.onWorkerFinished("2", createMap(QUEUE_2, 200L)); - verifytaskManagerUsageMetricsProvider(3, -1); + verifyTaskManagerUsageMetricsProvider(2, 0); } @Test @@ -85,7 +85,7 @@ void testReset() throws InterruptedException { verify(taskManagerUsageMetricsProvider, times(1)).reset(); } - private void verifytaskManagerUsageMetricsProvider(final int times, final int sum) { + private void verifyTaskManagerUsageMetricsProvider(final int times, final int sum) { verify(taskManagerUsageMetricsProvider, times(times)).register(taskManagerUsagerMetricsProviderCaptor.capture(), anyInt()); assertEquals(sum, taskManagerUsagerMetricsProviderCaptor.getAllValues().stream().mapToInt(Integer::intValue).sum(),