Skip to content

Commit

Permalink
AER-1413 Improved stability to handle connection errors (#60)
Browse files Browse the repository at this point in the history
Both starting and the shutdownhandler can get a ShutdownSignalException.
This could result in both the start trying to restart as well as via the TaskConsumer shutdownhandler to get a seconds call to start.
This changes is intended to not get a second start call when a ShutdownSignalException is thrown during start of the consumer.
  • Loading branch information
Hilbrand authored Mar 23, 2023
1 parent b6b1f4b commit 2599094
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 29 deletions.
10 changes: 10 additions & 0 deletions source/taskmanager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package nl.aerius.taskmanager;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +45,8 @@ class TaskConsumer implements MessageReceivedHandler {

private boolean running = true;

private Future<?> messageHandlerFuture;

@SuppressWarnings("unchecked")
public TaskConsumer(final ExecutorService executorService, final String taskQueueName, final boolean durable,
final ForwardTaskHandler forwardTaskHandler, final AdaptorFactory factory) throws IOException {
Expand Down Expand Up @@ -110,8 +114,17 @@ public void messageDeliveryAborted(final Message<MessageMetaData> message, final
taskMessageHandler.messageDeliveryAborted(message, exception);
}

public void start() {
executorService.submit(() -> {
public synchronized void start() {
if (messageHandlerFuture != null && !messageHandlerFuture.isDone()) {
try {
messageHandlerFuture.get();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
LOG.info("TaskConsumer shutdown {} got exception.", taskQueueName, e);
}
}
messageHandlerFuture = executorService.submit(() -> {
try {
taskMessageHandler.start();
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ class RabbitMQMessageHandler implements TaskMessageHandler<RabbitMQMessageMetaDa
private MessageReceivedHandler messageReceivedHandler;
private RabbitMQMessageConsumer consumer;
private boolean isShutdown;
private boolean warned;

/**
* Set a boolean that is set as long as we're trying to (re)connect to RabbitMQ.
*/
private final AtomicBoolean tryConnecting = new AtomicBoolean();

private final AtomicBoolean tryStartingConsuming = new AtomicBoolean();

/**
* Time to wait before retrying connection.
*/
private long retryTimeMilliseconds = TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_SECONDS);

/**
* Constructor.
Expand All @@ -74,13 +82,31 @@ public void addMessageReceivedHandler(final MessageReceivedHandler messageReceiv

@Override
public void start() throws IOException {
tryStartConsuming();
tryStartingConsuming.set(true);
while (!isShutdown) {
try {
stopAndStartConsumer();
LOG.info("Successfully (re)started consumer for {}", taskQueueName);
if (consumer.getChannel().isOpen()) {
tryStartingConsuming.set(false);
break;
}
} catch (final ShutdownSignalException | IOException e1) {
if (!warned) {
LOG.warn("(Re)starting consumer for {} failed, retrying in a while", taskQueueName, e1);
warned = true;
}
delayRetry();
}
}
}

@Override
public void shutDown() throws IOException {
isShutdown = true;
consumer.stopConsuming();
if (consumer != null) {
consumer.stopConsuming();
}
}

@Override
Expand All @@ -107,27 +133,13 @@ public void onMessageReceived(final RabbitMQMessage message) {
}
}


private void tryStartConsuming() {
boolean warn = true;
while (!isShutdown) {
try {
stopAndStartConsumer();
LOG.info("Successfully (re)started consumer for {}", taskQueueName);
break;
} catch (final ShutdownSignalException | IOException e1) {
if (warn) {
LOG.warn("(Re)starting consumer for {} failed, retrying in a while", taskQueueName, e1);
warn = false;
}
delayRetry(DEFAULT_RETRY_SECONDS);
}
}
public void setRetryTimeMilliseconds(final long retryTimeMilliseconds) {
this.retryTimeMilliseconds = retryTimeMilliseconds;
}

private void delayRetry(final int retryTime) {
private void delayRetry() {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(retryTime));
Thread.sleep(retryTimeMilliseconds);
} catch (final InterruptedException ex) {
LOG.debug("Waiting interrupted", ex);
Thread.currentThread().interrupt();
Expand All @@ -144,17 +156,20 @@ private void stopAndStartConsumer() throws IOException {
taskQueueName,
durable,
this);
consumer.getChannel().addShutdownListener(e -> handleShutdownSignal(e));
consumer.getChannel().addShutdownListener(this::handleShutdownSignal);
consumer.startConsuming();
tryConnecting.set(false);
warned = false;
}
}

private void handleShutdownSignal(final ShutdownSignalException sig) {
if (tryConnecting.compareAndSet(false, true)) {
if (messageReceivedHandler != null) {
messageReceivedHandler.handleShutdownSignal();
}
private void handleShutdownSignal(final ShutdownSignalException ssg) {
if (ssg != null && ssg.isInitiatedByApplication()) {
return;
}
if (!tryStartingConsuming.get() && tryConnecting.compareAndSet(false, true) && messageReceivedHandler != null) {
delayRetry();
messageReceivedHandler.handleShutdownSignal();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,32 @@
package nl.aerius.taskmanager.mq;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.junit.jupiter.MockitoExtension;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

import nl.aerius.taskmanager.adaptor.TaskMessageHandler;
import nl.aerius.taskmanager.adaptor.TaskMessageHandler.MessageReceivedHandler;
Expand All @@ -34,12 +51,15 @@
/**
* Test class for {@link RabbitMQMessageHandler}.
*/
@ExtendWith(MockitoExtension.class)
class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest {
final String taskQueueName = "queue1";

private @Captor ArgumentCaptor<ShutdownListener> shutdownListenerCaptor;

@Test
@Timeout(10000)
void testMessageReceivedHandler() throws IOException, InterruptedException {
final String taskQueueName = "queue1";
final byte[] receivedBody = "4321".getBytes();
final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(taskQueueName, false);
final Semaphore lock = new Semaphore(0);
Expand All @@ -63,4 +83,91 @@ public void handleShutdownSignal() {
lock.tryAcquire(1, 5, TimeUnit.SECONDS);
assertArrayEquals(receivedBody, data.getData(), "Test if body received");
}

/**
* Unit test to test startup process with failing startup.
* Flow:
* <ol>
* <li>Start and throw an IOException in basicConsume which is called on start.
* <li>Should retry in start and end start without exceptions.
* <li>Run start again, it should run without exceptions.
* <li>Trigger a ShutdownSignalException, messageReceivedHandler#handleShutdownSignal. This method shouldn't have been called before.
* </ol>
*/
@Test
@Timeout(1000)
void testReStart() throws IOException, InterruptedException {
// Lock used in the tryStartConsuming method to wait for test to allow to continue running
final Semaphore tryStartConsumingLock = new Semaphore(0);
// Lock to let the test wait till the TaskMessageHandler thread runs has started the consumer
final Semaphore verifyTryStartConsumingLock = new Semaphore(0);
// Counter to keep track how many times basicConsume has been called.
final AtomicInteger throwCounter = new AtomicInteger();
// Counter to keep track how many times shutdownCompleted method has been called.
final AtomicInteger shutdownCallsCounter = new AtomicInteger();

final MessageReceivedHandler mockMessageReceivedHandler = mock(MessageReceivedHandler.class);
final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(taskQueueName, false);

((RabbitMQMessageHandler) tmh).setRetryTimeMilliseconds(1L);
doAnswer(invoke -> null).when(mockChannel).addShutdownListener(shutdownListenerCaptor.capture());
doAnswer(invoke -> {
verifyTryStartConsumingLock.release();
tryStartConsumingLock.acquire();
// stop consumer one time with an exception.
if (throwCounter.incrementAndGet() < 2) {
shutdownCallsCounter.incrementAndGet();
final ShutdownSignalException exception = new ShutdownSignalException(false, false, null, null);

// This will mock the shutdown handler is called.
shutdownListenerCaptor.getValue().shutdownCompleted(exception);
// This will mock starting the consumer failed with an exception.
throw exception;
}
return null;
}).when(mockChannel).basicConsume(anyString(), eq(false), anyString(), any());
tmh.addMessageReceivedHandler(mockMessageReceivedHandler);
final Thread thread = new Thread(() -> {
try {
// First start. Should retry once and then complete without error.
tmh.start();
// Second start. Should complete without error.
tmh.start();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});

thread.setDaemon(true);
thread.start();
// Wait till TaskMessageHandler has called basicConsume in the consumer.
verifyTryStartConsumingLock.acquire();
// Release the consumer start lock, it should throw an IOException and not call the shutdown handler.
triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler, 0);
// Release the second time, it should not throw an IOException this time, but just finish start without issue.
triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler, 0);
// Release the second start call. It should just finished normally.
tryStartConsumingLock.release();
// Wait for thread to finish.
thread.join();

// Trigger an ShutdownSignalException.
shutdownListenerCaptor.getValue().shutdownCompleted(new ShutdownSignalException(false, false, null, null));

verify(mockMessageReceivedHandler, only()).handleShutdownSignal();
assertEquals(1, shutdownCallsCounter.get(), "Consumer basicConsume should have thrown the expected number of IOExceptions");
assertEquals(3, throwCounter.get(), "Consumer basicConsume should have been called this of 3 times");

}

private static void triggerRestartConsumer(final Semaphore tryStartConsumingLock, final Semaphore verifyTryStartConsumingLock,
final MessageReceivedHandler mockMessageReceivedHandler, final int expectedNumberMessageReceivedHandlerShutdownCalled)
throws InterruptedException {
// Let the consumer basicConsume continue.
tryStartConsumingLock.release();
// Consumer should have restarted.
verifyTryStartConsumingLock.acquire();
// handleShutdownSignal should not have been called.
verify(mockMessageReceivedHandler, never()).handleShutdownSignal();
}
}

0 comments on commit 2599094

Please sign in to comment.