Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,7 +46,7 @@ class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler {

private final String workerQueueName;
private final List<QueueWatchDogListener> listeners = new ArrayList<>();
private final Set<String> runningTasks = new HashSet<>();
private final Set<String> runningTasks = ConcurrentHashMap.newKeySet();

private LocalDateTime firstProblem;

Expand All @@ -55,12 +55,16 @@ public QueueWatchDog(final String workerQueueName) {
}

public void addQueueWatchDogListener(final QueueWatchDogListener listener) {
listeners.add(listener);
synchronized (runningTasks) {
listeners.add(listener);
}
}

@Override
public void onWorkDispatched(final String messageId, final Map<String, Object> messageMetaData) {
runningTasks.add(messageId);
synchronized (runningTasks) {
runningTasks.add(messageId);
}
}

@Override
Expand All @@ -69,10 +73,14 @@ public void onWorkerFinished(final String messageId, final Map<String, Object> m
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) {
if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) {
LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
listeners.forEach(QueueWatchDogListener::reset);
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) {
synchronized (runningTasks) {
if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) {
LOG.info("It looks like some tasks are zombies on {} worker queue. All tasks in state running are released (running:{}).", workerQueueName,
runningTasks.size());
runningTasks.clear();
listeners.forEach(QueueWatchDogListener::reset);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ private void applyEnvOverridesOnSetProperties(final ObjectNode node, final Strin
node.put(entry.getKey(), envValue);
} else if (entry.getValue().isObject()) {
applyEnvOverrides((ObjectNode) entry.getValue(), fieldPath, resolveFieldType(targetClass, entry.getKey()));
} else {
LOG.debug("Unexpected entry value (in parentPath: {}) that is not an object: {}", parentPath, entry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public synchronized void register(final int deltaUsedWorkers, final int numberOf
total += delta * countFunction.applyAsDouble(numberOfWorkers, usedWorkers);
totalMeasureTime += delta;
last = newLast;
usedWorkers += deltaUsedWorkers;
usedWorkers = Math.max(0, usedWorkers + deltaUsedWorkers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package nl.aerius.taskmanager.metrics;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -80,10 +78,6 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW

private final Attributes attributesWorker;

// Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue
// as it doesn't have any metrics on it anymore.
private final Set<String> dispatchedTasks = new HashSet<>();

public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter) {
this.queueGroupName = queueGroupName;
this.meter = meter;
Expand Down Expand Up @@ -123,7 +117,6 @@ private DoubleGauge createGauge(final String name, final String description) {

@Override
public void onWorkDispatched(final String messageId, final Map<String, Object> messageMetaData) {
dispatchedTasks.add(messageId);
final TaskMetrics taskMetrics = new TaskMetrics(messageMetaData);
taskMetrics.determineDuration();
dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics);
Expand All @@ -136,12 +129,10 @@ public synchronized void onWorkerFinished(final String messageId, final Map<Stri
taskMetrics.determineDuration();
workQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics);
workWorkerMetrics.register(taskMetrics);

}

@Override
public void reset() {
dispatchedTasks.clear();
dispatchedQueueMetrics.entrySet().forEach(e -> e.getValue().process());
dispatchedWorkerMetrics.process();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package nl.aerius.taskmanager.metrics;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +40,9 @@ public class TaskManagerMetricsRegister implements WorkerProducerHandler, Worker

private final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider;
private final StartupGuard startupGuard;
// Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue
// as it doesn't have any metrics on it anymore.
private final Set<String> dispatchedTasks = ConcurrentHashMap.newKeySet();

private int numberOfWorkers;

Expand All @@ -48,12 +53,19 @@ public TaskManagerMetricsRegister(final TaskManagerUsageMetricsProvider taskMana

@Override
public void onWorkDispatched(final String messageId, final Map<String, Object> messageMetaData) {
taskManagerUsageMetricsProvider.register(1, numberOfWorkers);
synchronized (dispatchedTasks) {
dispatchedTasks.add(messageId);
taskManagerUsageMetricsProvider.register(1, numberOfWorkers);
}
}

@Override
public void onWorkerFinished(final String messageId, final Map<String, Object> messageMetaData) {
taskManagerUsageMetricsProvider.register(-1, numberOfWorkers);
synchronized (dispatchedTasks) {
if (dispatchedTasks.remove(messageId)) {
taskManagerUsageMetricsProvider.register(-1, numberOfWorkers);
}
}
}

@Override
Expand All @@ -68,6 +80,9 @@ public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, fina

@Override
public void reset() {
taskManagerUsageMetricsProvider.reset();
synchronized (dispatchedTasks) {
dispatchedTasks.clear();
taskManagerUsageMetricsProvider.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ protected LocalDateTime now() {
now.set(now.get().plusMinutes(20));
qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0);
verify(listener, times(expected)).reset();
// Call update again. This should not trigger reset again because we just called reset.
qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0);
verify(listener, times(expected)).reset();
}

private static List<Arguments> isDeadTests() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void testOnWorkDispatched() {
startUp(10, 0);
register.onWorkDispatched("1", createMap(QUEUE_1, 100L));
register.onWorkDispatched("2", createMap(QUEUE_2, 200L));
verifytaskManagerUsageMetricsProvider(2, 2);
verifyTaskManagerUsageMetricsProvider(2, 2);
}

@Test
Expand All @@ -70,7 +70,7 @@ void testOnWorkerFinished() {
register.onWorkDispatched("1", createMap(QUEUE_1, 100L));
register.onWorkerFinished("1", createMap(QUEUE_1, 100L));
register.onWorkerFinished("2", createMap(QUEUE_2, 200L));
verifytaskManagerUsageMetricsProvider(3, -1);
verifyTaskManagerUsageMetricsProvider(2, 0);
}

@Test
Expand All @@ -85,7 +85,7 @@ void testReset() throws InterruptedException {
verify(taskManagerUsageMetricsProvider, times(1)).reset();
}

private void verifytaskManagerUsageMetricsProvider(final int times, final int sum) {
private void verifyTaskManagerUsageMetricsProvider(final int times, final int sum) {
verify(taskManagerUsageMetricsProvider, times(times)).register(taskManagerUsagerMetricsProviderCaptor.capture(),
anyInt());
assertEquals(sum, taskManagerUsagerMetricsProviderCaptor.getAllValues().stream().mapToInt(Integer::intValue).sum(),
Expand Down
Loading