From 2599094b04d4a1d032f369222c556cbe60eb9c14 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Thu, 23 Mar 2023 15:35:23 +0100 Subject: [PATCH] AER-1413 Improved stability to handle connection errors (#60) Both starting and the shutdownhandler can get a ShutdownSignalException. This could result in both the start trying to restart as well as via the TaskConsumer shutdownhandler to get a seconds call to start. This changes is intended to not get a second start call when a ShutdownSignalException is thrown during start of the consumer. --- source/taskmanager/pom.xml | 10 ++ .../nl/aerius/taskmanager/TaskConsumer.java | 17 ++- .../mq/RabbitMQMessageHandler.java | 67 ++++++----- .../mq/RabbitMQMessageHandlerTest.java | 109 +++++++++++++++++- 4 files changed, 174 insertions(+), 29 deletions(-) diff --git a/source/taskmanager/pom.xml b/source/taskmanager/pom.xml index b57799d..4719712 100644 --- a/source/taskmanager/pom.xml +++ b/source/taskmanager/pom.xml @@ -72,6 +72,16 @@ junit-jupiter test + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java index 64ff798..3b9c9a4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java @@ -17,7 +17,9 @@ package nl.aerius.taskmanager; import java.io.IOException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +45,8 @@ class TaskConsumer implements MessageReceivedHandler { private boolean running = true; + private Future messageHandlerFuture; + @SuppressWarnings("unchecked") public TaskConsumer(final ExecutorService executorService, final String taskQueueName, final boolean durable, final ForwardTaskHandler forwardTaskHandler, final AdaptorFactory factory) throws IOException { @@ -110,8 +114,17 @@ public void messageDeliveryAborted(final Message message, final taskMessageHandler.messageDeliveryAborted(message, exception); } - public void start() { - executorService.submit(() -> { + public synchronized void start() { + if (messageHandlerFuture != null && !messageHandlerFuture.isDone()) { + try { + messageHandlerFuture.get(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (final ExecutionException e) { + LOG.info("TaskConsumer shutdown {} got exception.", taskQueueName, e); + } + } + messageHandlerFuture = executorService.submit(() -> { try { taskMessageHandler.start(); } catch (final IOException e) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java index 66249d0..e3c451f 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java @@ -47,11 +47,19 @@ class RabbitMQMessageHandler implements TaskMessageHandler handleShutdownSignal(e)); + consumer.getChannel().addShutdownListener(this::handleShutdownSignal); consumer.startConsuming(); tryConnecting.set(false); + warned = false; } } - private void handleShutdownSignal(final ShutdownSignalException sig) { - if (tryConnecting.compareAndSet(false, true)) { - if (messageReceivedHandler != null) { - messageReceivedHandler.handleShutdownSignal(); - } + private void handleShutdownSignal(final ShutdownSignalException ssg) { + if (ssg != null && ssg.isInitiatedByApplication()) { + return; + } + if (!tryStartingConsuming.get() && tryConnecting.compareAndSet(false, true) && messageReceivedHandler != null) { + delayRetry(); + messageReceivedHandler.handleShutdownSignal(); } } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java index 8c2624a..b18337a 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java @@ -17,15 +17,32 @@ package nl.aerius.taskmanager.mq; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.junit.jupiter.MockitoExtension; import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; import nl.aerius.taskmanager.adaptor.TaskMessageHandler; import nl.aerius.taskmanager.adaptor.TaskMessageHandler.MessageReceivedHandler; @@ -34,12 +51,15 @@ /** * Test class for {@link RabbitMQMessageHandler}. */ +@ExtendWith(MockitoExtension.class) class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest { + final String taskQueueName = "queue1"; + + private @Captor ArgumentCaptor shutdownListenerCaptor; @Test @Timeout(10000) void testMessageReceivedHandler() throws IOException, InterruptedException { - final String taskQueueName = "queue1"; final byte[] receivedBody = "4321".getBytes(); final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(taskQueueName, false); final Semaphore lock = new Semaphore(0); @@ -63,4 +83,91 @@ public void handleShutdownSignal() { lock.tryAcquire(1, 5, TimeUnit.SECONDS); assertArrayEquals(receivedBody, data.getData(), "Test if body received"); } + + /** + * Unit test to test startup process with failing startup. + * Flow: + *
    + *
  1. Start and throw an IOException in basicConsume which is called on start. + *
  2. Should retry in start and end start without exceptions. + *
  3. Run start again, it should run without exceptions. + *
  4. Trigger a ShutdownSignalException, messageReceivedHandler#handleShutdownSignal. This method shouldn't have been called before. + *
+ */ + @Test + @Timeout(1000) + void testReStart() throws IOException, InterruptedException { + // Lock used in the tryStartConsuming method to wait for test to allow to continue running + final Semaphore tryStartConsumingLock = new Semaphore(0); + // Lock to let the test wait till the TaskMessageHandler thread runs has started the consumer + final Semaphore verifyTryStartConsumingLock = new Semaphore(0); + // Counter to keep track how many times basicConsume has been called. + final AtomicInteger throwCounter = new AtomicInteger(); + // Counter to keep track how many times shutdownCompleted method has been called. + final AtomicInteger shutdownCallsCounter = new AtomicInteger(); + + final MessageReceivedHandler mockMessageReceivedHandler = mock(MessageReceivedHandler.class); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(taskQueueName, false); + + ((RabbitMQMessageHandler) tmh).setRetryTimeMilliseconds(1L); + doAnswer(invoke -> null).when(mockChannel).addShutdownListener(shutdownListenerCaptor.capture()); + doAnswer(invoke -> { + verifyTryStartConsumingLock.release(); + tryStartConsumingLock.acquire(); + // stop consumer one time with an exception. + if (throwCounter.incrementAndGet() < 2) { + shutdownCallsCounter.incrementAndGet(); + final ShutdownSignalException exception = new ShutdownSignalException(false, false, null, null); + + // This will mock the shutdown handler is called. + shutdownListenerCaptor.getValue().shutdownCompleted(exception); + // This will mock starting the consumer failed with an exception. + throw exception; + } + return null; + }).when(mockChannel).basicConsume(anyString(), eq(false), anyString(), any()); + tmh.addMessageReceivedHandler(mockMessageReceivedHandler); + final Thread thread = new Thread(() -> { + try { + // First start. Should retry once and then complete without error. + tmh.start(); + // Second start. Should complete without error. + tmh.start(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + }); + + thread.setDaemon(true); + thread.start(); + // Wait till TaskMessageHandler has called basicConsume in the consumer. + verifyTryStartConsumingLock.acquire(); + // Release the consumer start lock, it should throw an IOException and not call the shutdown handler. + triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler, 0); + // Release the second time, it should not throw an IOException this time, but just finish start without issue. + triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler, 0); + // Release the second start call. It should just finished normally. + tryStartConsumingLock.release(); + // Wait for thread to finish. + thread.join(); + + // Trigger an ShutdownSignalException. + shutdownListenerCaptor.getValue().shutdownCompleted(new ShutdownSignalException(false, false, null, null)); + + verify(mockMessageReceivedHandler, only()).handleShutdownSignal(); + assertEquals(1, shutdownCallsCounter.get(), "Consumer basicConsume should have thrown the expected number of IOExceptions"); + assertEquals(3, throwCounter.get(), "Consumer basicConsume should have been called this of 3 times"); + + } + + private static void triggerRestartConsumer(final Semaphore tryStartConsumingLock, final Semaphore verifyTryStartConsumingLock, + final MessageReceivedHandler mockMessageReceivedHandler, final int expectedNumberMessageReceivedHandlerShutdownCalled) + throws InterruptedException { + // Let the consumer basicConsume continue. + tryStartConsumingLock.release(); + // Consumer should have restarted. + verifyTryStartConsumingLock.acquire(); + // handleShutdownSignal should not have been called. + verify(mockMessageReceivedHandler, never()).handleShutdownSignal(); + } }