Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

import nl.aerius.taskmanager.adaptor.WorkerProducer;
Expand All @@ -45,7 +46,7 @@
/**
* RabbitMQ implementation of a {@link WorkerProducer}.
*/
class RabbitMQWorkerProducer implements WorkerProducer {
class RabbitMQWorkerProducer implements WorkerProducer, ShutdownListener {

protected static final String WORKER_REPLY_AFFIX = ".reply";

Expand Down Expand Up @@ -76,15 +77,9 @@ public void addWorkerProducerHandler(final WorkerProducerHandler workerProducerH
this.workerProducerHandlers.add(workerProducerHandler);
}

@Override
public void start() {
tryStartReplyConsumer();
}

@Override
public void dispatchMessage(final Message message) throws IOException {
final RabbitMQMessage rabbitMQMessage = (RabbitMQMessage) message;
ensureChanne();
final BasicProperties.Builder forwardBuilder = rabbitMQMessage.getProperties().builder();
// new header map (even in case of existing headers, original can be a UnmodifiableMap)
final Map<String, Object> headers = rabbitMQMessage.getProperties().getHeaders() == null
Expand All @@ -110,13 +105,6 @@ private static void handleWorkDispatched(final Message message, final Map<String
}
}

private synchronized void ensureChanne() throws IOException {
if (channel == null || !channel.isOpen()) {
channel = factory.getConnection().createChannel();
channel.queueDeclare(workerQueueName, durable, false, false, RabbitMQQueueUtil.queueDeclareArguments(durable, queueType));
}
}

@Override
public void shutdown() {
isShutdown = true;
Expand All @@ -138,19 +126,21 @@ private String getWorkerReplyQueue() {
return workerQueueName + WORKER_REPLY_AFFIX;
}

private void tryStartReplyConsumer() {
@Override
public void start() {
boolean warn = true;
while (!isShutdown) {
Connection connection = null;
try {
connection = factory.getConnection();
startWorkerChannel(connection);
if (startReplyConsumer(connection)) {
LOG.info("Successfully (re)started reply consumer for queue {}", workerQueueName);
return;
}
} catch (final ShutdownSignalException | IOException e1) {
if (warn) {
LOG.warn("(Re)starting reply consumer for queue {} failed, retrying in a while: {}", workerQueueName,
LOG.warn("(Re)starting worker/reply consumer for queue {} failed, retrying in a while: {}", workerQueueName,
Optional.ofNullable(e1.getMessage()).orElse(Optional.ofNullable(e1.getCause()).map(Throwable::getMessage).orElse("Unknown")));
LOG.trace("(Re)starting failed with exception:", e1);
warn = false;
Expand All @@ -162,12 +152,11 @@ private void tryStartReplyConsumer() {
}
}

private static void delayRetry(final int retryTime) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(retryTime));
} catch (final InterruptedException ex) {
LOG.debug("Waiting interrupted", ex);
Thread.currentThread().interrupt();
private synchronized void startWorkerChannel(final Connection connection) throws IOException {
if (channel == null || !channel.isOpen()) {
channel = connection.createChannel();
channel.addShutdownListener(this);
channel.queueDeclare(workerQueueName, durable, false, false, RabbitMQQueueUtil.queueDeclareArguments(durable, queueType));
}
}

Expand Down Expand Up @@ -196,11 +185,28 @@ public void handleDelivery(final String consumerTag, final Envelope envelope, fi
return true;
}

private static void delayRetry(final int retryTime) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(retryTime));
} catch (final InterruptedException ex) {
LOG.debug("Waiting interrupted", ex);
Thread.currentThread().interrupt();
}
}

private static void handleWorkFinished(final WorkerProducerHandler handler, final BasicProperties properties) {
try {
handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders());
} catch (final RuntimeException e) {
LOG.error("Runtime exception during handleWorkFinished of {}", handler.getClass(), e);
}
}

@Override
public void shutdownCompleted(final ShutdownSignalException cause) {
if (!cause.isInitiatedByApplication()) {
channel = null;
start();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@
package nl.aerius.taskmanager.mq;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import nl.aerius.taskmanager.adaptor.WorkerProducer;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
Expand All @@ -48,7 +55,7 @@ class RabbitMQWorkerProducerTest extends AbstractRabbitMQTest {
void testForwardMessage() throws IOException, InterruptedException {
final byte[] sendBody = "4321".getBytes();

final WorkerProducer wp = adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, false, -1, null));
final WorkerProducer wp = createWorkerProducer();
wp.start();
final BasicProperties bp = new BasicProperties();
wp.dispatchMessage(new RabbitMQMessage(WORKER_QUEUE_NAME, null, 4321, bp, sendBody) {
Expand All @@ -70,4 +77,47 @@ public void handleDelivery(final String consumerTag, final Envelope envelope, fi
lock.tryAcquire(1, 5, TimeUnit.SECONDS);
assertArrayEquals(sendBody, data.getData(), "Test if body send");
}

@ParameterizedTest
@Timeout(value = 5, unit = TimeUnit.SECONDS)
@CsvSource({
// After shutdown completed signal not by the application channels should be recreated.
"false,false,2",
// After shutdown completed signal not by the application, but explicit shutdown was called channels should NOT be recreated.
"false,true,1",
// After shutdown completed signal by the application channels should NOT be recreated.
"true,false,1",
// After shutdown completed signal by the application and explicit shutdown was called channels should NOT be recreated.
"true,true,1"
})
/**
*
* @param initiatedByApplication if true as if shutdown initiated by the application
* @param shutdown if true if application called shutdown
* @param times number of times create/open channel should have been called
*/
void testRestart(final boolean initiatedByApplication, final boolean shutdown, final int times) throws IOException {
final Connection connection = factory.getConnection();
final WorkerProducer wp = createWorkerProducer();

if (wp instanceof final RabbitMQWorkerProducer rwp) {
// First start which should create the channels.
wp.start();
verify(connection, times(1)).createChannel(); // worker channel
verify(connection, times(1)).openChannel(); // worker reply channel

if (shutdown) {
wp.shutdown();
}
rwp.shutdownCompleted(new ShutdownSignalException(false, initiatedByApplication, null, wp));
verify(connection, times(times)).createChannel();
verify(connection, times(times)).openChannel();
} else {
fail("Expected worker producer to be of type RabbitMQWorkerProducer, but was: " + wp.getClass());
}
}

private WorkerProducer createWorkerProducer() {
return adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, false, -1, null));
}
}
Loading