From 60487e2e3366063d81ed29d4c14482896e88c63b Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Wed, 8 Jan 2025 09:54:27 +0100 Subject: [PATCH] Adding support for different RabbitMQ queue types. Also updated RabbitMQ libraries to latest version available, and set TaskManager to java 17. --- README.md | 8 +++ source/pom.xml | 4 +- source/taskmanager/pom.xml | 4 ++ .../nl/aerius/taskmanager/TaskConsumer.java | 9 +-- .../nl/aerius/taskmanager/TaskManager.java | 32 +++++----- .../taskmanager/adaptor/AdaptorFactory.java | 7 ++- .../taskmanager/domain/QueueConfig.java | 20 +++++++ .../taskmanager/domain/RabbitMQQueueType.java | 29 +++++++++ .../taskmanager/domain/TaskSchedule.java | 11 ++++ .../mq/RabbitMQAdaptorFactory.java | 5 +- .../mq/RabbitMQMessageConsumer.java | 13 ++-- .../mq/RabbitMQMessageHandler.java | 16 +++-- .../taskmanager/mq/RabbitMQQueueUtil.java | 54 +++++++++++++++++ .../mq/RabbitMQWorkerProducer.java | 6 +- .../taskmanager/MockAdaptorFactory.java | 3 +- .../PriorityTaskSchedulerTest.java | 6 +- .../taskmanager/TaskDispatcherTest.java | 5 +- .../aerius/taskmanager/TaskManagerTest.java | 5 +- .../nl/aerius/taskmanager/WorkerPoolTest.java | 18 +++--- .../mq/RabbitMQMessageHandlerTest.java | 5 +- .../taskmanager/mq/RabbitMQQueueUtilTest.java | 60 +++++++++++++++++++ .../queue/priority-task-scheduler.ops.json | 1 + 22 files changed, 260 insertions(+), 61 deletions(-) create mode 100644 source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java create mode 100644 source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/RabbitMQQueueType.java create mode 100644 source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueUtil.java create mode 100644 source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueUtilTest.java diff --git a/README.md b/README.md index e66b79a..50f5199 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ The json format of the configuration files is as follows: { "workerQueueName": "", "durable" : + "queueType": "queues": [ { "queueName": "", @@ -109,6 +110,13 @@ Some queues can have derived messages on the queue, that are recreated by the pa For these queues it would make sense to not make them durable. RabbitMQ will require less storage space/IOPS and be faster as it won't need to depend on disk I/O for these queues. +The parameter `queueType` specifies the RabbitMQ type of the queues to be created. +If set it will set the argument `x-queue-type` when declaring queues. +If not set it will not set this arguments (used for backward compatibility, defaults to `classic`). +If `durable` is true queues will be set to `classic` (or default if not set) because other queue types are durable by default. +Changing the `queueType` configuration parameter if the queues are already created won't work. +If `queueType` needs to be changed, the queues need to be deleted first. + In `queues` there can be 1 or more queue configurations. Each queue configuration consists of 3 parameters: * `queueName`: The postfix of the client queue of which the task manager constructs the full queue name using the worker queue name. diff --git a/source/pom.xml b/source/pom.xml index 11627e2..1ae8a04 100644 --- a/source/pom.xml +++ b/source/pom.xml @@ -72,7 +72,7 @@ com.rabbitmq amqp-client - 5.16.0 + 5.24.0 @@ -287,7 +287,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.4.1 + 3.6.3 attach-javadocs diff --git a/source/taskmanager/pom.xml b/source/taskmanager/pom.xml index 7b9926d..20f4338 100644 --- a/source/taskmanager/pom.xml +++ b/source/taskmanager/pom.xml @@ -30,6 +30,10 @@ Taskmanager :: Application AERIUS Taskmanager actual manager/scheduler (standalone) + + 17 + + nl.aerius diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java index 3b9c9a4..2d55448 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskConsumer.java @@ -29,6 +29,7 @@ import nl.aerius.taskmanager.adaptor.TaskMessageHandler.MessageReceivedHandler; import nl.aerius.taskmanager.domain.Message; import nl.aerius.taskmanager.domain.MessageMetaData; +import nl.aerius.taskmanager.domain.QueueConfig; /** * Task manager part of retrieving tasks from the client queues and send them to the dispatcher, which in case will send them to the scheduler. @@ -48,12 +49,12 @@ class TaskConsumer implements MessageReceivedHandler { private Future messageHandlerFuture; @SuppressWarnings("unchecked") - public TaskConsumer(final ExecutorService executorService, final String taskQueueName, final boolean durable, - final ForwardTaskHandler forwardTaskHandler, final AdaptorFactory factory) throws IOException { + public TaskConsumer(final ExecutorService executorService, final QueueConfig queueConfig, final ForwardTaskHandler forwardTaskHandler, + final AdaptorFactory factory) throws IOException { this.executorService = executorService; - this.taskQueueName = taskQueueName; + this.taskQueueName = queueConfig.queueName(); this.forwardTaskHandler = forwardTaskHandler; - this.taskMessageHandler = factory.createTaskMessageHandler(taskQueueName, durable); + this.taskMessageHandler = factory.createTaskMessageHandler(queueConfig); taskMessageHandler.addMessageReceivedHandler(this); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 8960d5a..41047f4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -35,6 +35,8 @@ import nl.aerius.taskmanager.adaptor.AdaptorFactory; import nl.aerius.taskmanager.adaptor.WorkerProducer; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; +import nl.aerius.taskmanager.domain.QueueConfig; +import nl.aerius.taskmanager.domain.RabbitMQQueueType; import nl.aerius.taskmanager.domain.TaskQueue; import nl.aerius.taskmanager.domain.TaskSchedule; @@ -63,10 +65,9 @@ public TaskManager(final ExecutorService executorService, final AdaptorFactory f * Add or Update a new task scheduler. * * @param schedule scheduler configuration - * @throws IOException * @throws InterruptedException */ - public boolean updateTaskScheduler(final TaskSchedule schedule) throws IOException, InterruptedException { + public boolean updateTaskScheduler(final TaskSchedule schedule) throws InterruptedException { // Set up scheduler with worker pool final String workerQueueName = schedule.getWorkerQueueName(); if (!buckets.containsKey(workerQueueName)) { @@ -75,7 +76,7 @@ public boolean updateTaskScheduler(final TaskSchedule schedule) throws IOExce } final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName); - taskScheduleBucket.updateQueues(schedule.getQueues(), schedule.isDurable()); + taskScheduleBucket.updateQueues(schedule.getQueues(), schedule.isDurable(), schedule.getQueueType()); return taskScheduleBucket.isRunning(); } @@ -109,7 +110,7 @@ private class TaskScheduleBucket { private final TaskScheduler taskScheduler; private final String workerQueueName; - public TaskScheduleBucket(final String workerQueueName, final boolean durable) throws IOException, InterruptedException { + public TaskScheduleBucket(final String workerQueueName, final boolean durable) throws InterruptedException { this.workerQueueName = workerQueueName; taskScheduler = schedulerFactory.createScheduler(workerQueueName); LOG.info("Worker Queue Name:{} (durable:{})", workerQueueName, durable); @@ -134,36 +135,33 @@ public boolean isRunning() { return dispatcher.isRunning(); } - private void updateQueues(final List newTaskQueues, final boolean durable) { + private void updateQueues(final List newTaskQueues, final boolean durable, final RabbitMQQueueType rabbitMQQueueType) { final Map newTaskQueuesMap = newTaskQueues.stream().filter(Objects::nonNull) .collect(Collectors.toMap(TaskQueue::getQueueName, Function.identity())); // Remove queues that are not in the new list final List> removedQueues = taskConsumers.entrySet().stream().filter(e -> !newTaskQueuesMap.containsKey(e.getKey())) - .collect(Collectors.toList()); + .toList(); removedQueues.forEach(e -> removeTaskConsumer(e.getKey())); // Add and Update existing queues - newTaskQueues.stream().filter(Objects::nonNull).forEach(tc -> addOrUpdateTaskQueue(tc, durable)); + newTaskQueues.stream().filter(Objects::nonNull).forEach(tc -> addOrUpdateTaskQueue(tc, durable, rabbitMQQueueType)); } - private void addOrUpdateTaskQueue(final T taskQueueConfiguration, final boolean durable) { - final String taskQueueName = taskQueueConfiguration.getQueueName(); - addTaskConsumerIfAbsent(taskQueueName, durable); + private void addOrUpdateTaskQueue(final T taskQueueConfiguration, final boolean durable, final RabbitMQQueueType rabbitMQQueueType) { + addTaskConsumerIfAbsent(new QueueConfig(taskQueueConfiguration.getQueueName(), durable, rabbitMQQueueType)); taskScheduler.updateQueue(taskQueueConfiguration); } /** * Adds a task consumer. * - * @param taskQueueName queue name of the task consumer - * @param durable true if consumer queue should be created durable - * @throws IOException + * @param queueConfig Configuration parameters for the queue */ - public void addTaskConsumerIfAbsent(final String taskQueueName, final boolean durable) { - taskConsumers.computeIfAbsent(taskQueueName, tqn -> { + public void addTaskConsumerIfAbsent(final QueueConfig queueConfig) { + taskConsumers.computeIfAbsent(queueConfig.queueName(), tqn -> { try { - final TaskConsumer taskConsumer = new TaskConsumer(executorService, taskQueueName, durable, dispatcher, factory); + final TaskConsumer taskConsumer = new TaskConsumer(executorService, queueConfig, dispatcher, factory); taskConsumer.start(); - LOG.info("Started task queue {} (durable:{})", taskQueueName, durable); + LOG.info("Started task queue {} (durable:{}, queueType:{})", queueConfig.queueName(), queueConfig.durable(), queueConfig.queueType()); return taskConsumer; } catch (final IOException e) { throw new UncheckedIOException(e); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/AdaptorFactory.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/AdaptorFactory.java index 90bf9f2..85e5a80 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/AdaptorFactory.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/AdaptorFactory.java @@ -18,6 +18,8 @@ import java.io.IOException; +import nl.aerius.taskmanager.domain.QueueConfig; + /** * Interface between the task manager and implementing communication service. */ @@ -39,10 +41,9 @@ public interface AdaptorFactory { /** * Creates a new TaksMessageHandler for the given worker type and queue. - * @param taskQueueName queue name - * @param durable true if the queue created should be persistent during server restart + * @param queueConfig Configuration parameters for the queue * @return new TaksMessageHandler object * @throws IOException error in case or connection problems */ - TaskMessageHandler createTaskMessageHandler(String taskQueueName, boolean durable) throws IOException; + TaskMessageHandler createTaskMessageHandler(QueueConfig queueConfig) throws IOException; } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java new file mode 100644 index 0000000..ad40f24 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java @@ -0,0 +1,20 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.domain; + +public record QueueConfig(String queueName, boolean durable, RabbitMQQueueType queueType) { +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/RabbitMQQueueType.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/RabbitMQQueueType.java new file mode 100644 index 0000000..037e5bd --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/RabbitMQQueueType.java @@ -0,0 +1,29 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.domain; + +import java.util.Locale; + +public enum RabbitMQQueueType { + CLASSIC, + QUORUM, + STREAM; + + public String type() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java index d15377a..8e18d1b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Locale; /** * Base class for a single schedule configuration. @@ -29,6 +30,8 @@ public class TaskSchedule { private Boolean durable; + private RabbitMQQueueType queueType; + private List queues = new ArrayList<>(); public String getWorkerQueueName() { @@ -54,4 +57,12 @@ public void setDurable(final boolean durable) { public boolean isDurable() { return !Boolean.FALSE.equals(durable); } + + public RabbitMQQueueType getQueueType() { + return queueType; + } + + public void setQueueType(final String queueType) { + this.queueType = RabbitMQQueueType.valueOf(queueType.toUpperCase(Locale.ROOT)); + } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQAdaptorFactory.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQAdaptorFactory.java index eb21b97..17eab49 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQAdaptorFactory.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQAdaptorFactory.java @@ -24,6 +24,7 @@ import nl.aerius.taskmanager.adaptor.WorkerProducer; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.client.BrokerConnectionFactory; +import nl.aerius.taskmanager.domain.QueueConfig; /** * RabbitMQ implementation of the {@link AdaptorFactory}. @@ -45,8 +46,8 @@ public RabbitMQAdaptorFactory(final ScheduledExecutorService executorService, fi } @Override - public TaskMessageHandler createTaskMessageHandler(final String taskQueueName, final boolean durable) throws IOException { - return new RabbitMQMessageHandler(factory, taskQueueName, durable); + public TaskMessageHandler createTaskMessageHandler(final QueueConfig queueConfig) throws IOException { + return new RabbitMQMessageHandler(factory, queueConfig); } @Override diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageConsumer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageConsumer.java index fa8c38e..57bc18d 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageConsumer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageConsumer.java @@ -28,6 +28,8 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; +import nl.aerius.taskmanager.domain.QueueConfig; + /** * Implementation of RabbitMQ's DefaultConsumer for the taskmanager. * @@ -42,14 +44,14 @@ interface ConsumerCallback { private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConsumer.class); + private final QueueConfig queueConfig; private final String queueName; private final ConsumerCallback callback; - private final boolean durable; - RabbitMQMessageConsumer(final Channel channel, final String queueName, final boolean durable, final ConsumerCallback callback) { + RabbitMQMessageConsumer(final Channel channel, final QueueConfig queueConfig, final ConsumerCallback callback) { super(channel); - this.queueName = queueName; - this.durable = durable; + this.queueConfig = queueConfig; + this.queueName = queueConfig.queueName(); this.callback = callback; } @@ -57,7 +59,8 @@ public void startConsuming() throws IOException { LOG.debug("Starting consumer {}.", queueName); final Channel taskChannel = getChannel(); // ensure a durable channel exists - taskChannel.queueDeclare(queueName, durable, false, false, null); + taskChannel.queueDeclare(queueName, queueConfig.durable(), false, false, + RabbitMQQueueUtil.queueDeclareArguments(queueConfig.durable(), queueConfig.queueType())); //ensure only one message gets delivered at a time. taskChannel.basicQos(1); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java index 8421445..e82bb31 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandler.java @@ -28,6 +28,7 @@ import nl.aerius.taskmanager.adaptor.TaskMessageHandler; import nl.aerius.taskmanager.client.BrokerConnectionFactory; import nl.aerius.taskmanager.client.WorkerResultSender; +import nl.aerius.taskmanager.domain.QueueConfig; import nl.aerius.taskmanager.mq.RabbitMQMessageConsumer.ConsumerCallback; /** @@ -41,8 +42,8 @@ class RabbitMQMessageHandler implements TaskMessageHandler PERSISTENT_QUEUE_TYPES = EnumSet.of(RabbitMQQueueType.QUORUM, RabbitMQQueueType.STREAM); + + private static final String ARG_QUEUE_TYPE = "x-queue-type"; + + private RabbitMQQueueUtil() { + } + + /** + * Returns a map with queueDeclare arguments. + * If queueType is null assume backward compatibility and don't set x-queue-type. + * If durable always use classic type because other types are durable, and therefore not compatible with durable. + * + * @param durable if queue is durable + * @param queueType the queueType to be set + * @return + */ + public static Map queueDeclareArguments(final boolean durable, final RabbitMQQueueType queueType) { + if (queueType == null) { + return Map.of(); + } + final RabbitMQQueueType actualType = durable && PERSISTENT_QUEUE_TYPES.contains(queueType) ? RabbitMQQueueType.CLASSIC : queueType; + + return Map.of(ARG_QUEUE_TYPE, actualType.type()); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java index 580fc22..26261e2 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java @@ -36,6 +36,7 @@ import nl.aerius.taskmanager.client.BrokerConnectionFactory; import nl.aerius.taskmanager.client.QueueConstants; import nl.aerius.taskmanager.domain.Message; +import nl.aerius.taskmanager.domain.RabbitMQQueueType; /** * RabbitMQ implementation of a {@link WorkerProducer}. @@ -156,8 +157,7 @@ private static void delayRetry(final int retryTime) { private void startReplyConsumer(final Connection connection) throws IOException { final Channel replyChannel = connection.createChannel(); - // Create an exclusive reply queue with predefined name (so we can set - // a replyCC header). + // Create an exclusive reply queue with predefined name (so we can set a replyCC header). // Queue will be deleted once taskmanager is down. // reply queue is not durable because the system will 'reboot' after connection problems anyway. // Making it durable would only make sense if we'd keep track of tasks-in-progress during shutdown/startup. @@ -165,7 +165,7 @@ private void startReplyConsumer(final Connection connection) throws IOException replyChannel.queueDeclare(workerReplyQueue, false, true, true, null); // ensure the worker queue is around as well (so we can retrieve number of customers later on). // Worker queue is durable and non-exclusive with autodelete off. - replyChannel.queueDeclare(workerQueueName, durable, false, false, null); + replyChannel.queueDeclare(workerQueueName, durable, false, false, RabbitMQQueueUtil.queueDeclareArguments(durable, RabbitMQQueueType.QUORUM)); replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) { @Override public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockAdaptorFactory.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockAdaptorFactory.java index 83b355a..ecf8364 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockAdaptorFactory.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockAdaptorFactory.java @@ -24,6 +24,7 @@ import nl.aerius.taskmanager.adaptor.TaskMessageHandler; import nl.aerius.taskmanager.adaptor.WorkerProducer; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; +import nl.aerius.taskmanager.domain.QueueConfig; /** * Mock implementation of {@link AdaptorFactory}. @@ -44,7 +45,7 @@ public WorkerProducer createWorkerProducer(final String workerQueueName, final b } @Override - public TaskMessageHandler createTaskMessageHandler(final String taskQueueName, final boolean durable) throws IOException { + public TaskMessageHandler createTaskMessageHandler(final QueueConfig queueConfig) throws IOException { return mockTaskMessageHandler; } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java index cea1411..b92a5df 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java @@ -44,6 +44,7 @@ import nl.aerius.taskmanager.domain.MessageMetaData; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.PriorityTaskSchedule; +import nl.aerius.taskmanager.domain.QueueConfig; /** * Test class for {@link PriorityTaskScheduler}. @@ -65,7 +66,7 @@ class PriorityTaskSchedulerTest { private PriorityTaskScheduler scheduler; @BeforeEach - void setUp() throws IOException, InterruptedException { + void setUp() throws IOException { taskConsumer1 = createMockTaskConsumer(QUEUE1); taskConsumer2 = createMockTaskConsumer(QUEUE2); final TaskConsumer taskConsumer3 = createMockTaskConsumer(QUEUE3); @@ -273,7 +274,8 @@ public Task call() throws Exception { } private TaskConsumer createMockTaskConsumer(final String taskQueueName) throws IOException { - return new TaskConsumer(mock(ExecutorService.class), taskQueueName, false, mock(ForwardTaskHandler.class), new MockAdaptorFactory()) { + return new TaskConsumer(mock(ExecutorService.class), new QueueConfig(taskQueueName, false, null), mock(ForwardTaskHandler.class), + new MockAdaptorFactory()) { @Override public void messageDelivered(final MessageMetaData messageMetaData) { //no-op. diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index c002ba6..3ef0f56 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Timeout; import nl.aerius.taskmanager.TaskDispatcher.State; +import nl.aerius.taskmanager.domain.QueueConfig; /** * Test for {@link TaskDispatcher} class. @@ -51,14 +52,14 @@ class TaskDispatcherTest { private MockAdaptorFactory factory; @BeforeEach - void setUp() throws IOException, InterruptedException { + void setUp() throws IOException { executor = Executors.newCachedThreadPool(); final FIFOTaskScheduler scheduler = new FIFOTaskScheduler(); workerProducer = new MockWorkerProducer(); workerPool = new WorkerPool(WORKER_QUEUE_NAME_TEST, workerProducer, scheduler); dispatcher = new TaskDispatcher(WORKER_QUEUE_NAME_TEST, scheduler, workerPool); factory = new MockAdaptorFactory(); - taskConsumer = new TaskConsumer(executor, "testqueue", false, dispatcher, factory); + taskConsumer = new TaskConsumer(executor, new QueueConfig("testqueue", false, null), dispatcher, factory); } @AfterEach diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java index b2a7062..d794a4b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java @@ -16,6 +16,7 @@ */ package nl.aerius.taskmanager; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; @@ -32,6 +33,7 @@ import nl.aerius.taskmanager.adaptor.AdaptorFactory; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.PriorityTaskSchedule; +import nl.aerius.taskmanager.domain.RabbitMQQueueType; /** * Test class for {@link TaskManager}. @@ -44,7 +46,7 @@ class TaskManagerTest { private TaskManager taskManager; @BeforeEach - void setUp() throws IOException, InterruptedException { + void setUp() throws IOException { executor = Executors.newCachedThreadPool(); final AdaptorFactory factory = new MockAdaptorFactory(); final TaskSchedulerFactory schedulerFactory = new FIFOTaskScheduler.FIFOSchedulerFactory(); @@ -62,6 +64,7 @@ void after() throws InterruptedException { @Test void testAddScheduler() throws IOException, InterruptedException { assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + assertEquals(RabbitMQQueueType.STREAM, schedule.getQueueType(), "Should have queueType STREAM"); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index aa3cf38..26cdc5b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test; import nl.aerius.taskmanager.domain.MessageMetaData; +import nl.aerius.taskmanager.domain.QueueConfig; import nl.aerius.taskmanager.exception.NoFreeWorkersException; import nl.aerius.taskmanager.exception.TaskAlreadySentException; import nl.aerius.taskmanager.mq.RabbitMQMessageMetaData; @@ -48,7 +49,7 @@ class WorkerPoolTest { private int numberOfWorkers; @BeforeEach - void setUp() throws IOException, InterruptedException { + void setUp() throws IOException { numberOfWorkers = 0; workerUpdateHandler = new MockTaskFinishedHandler() { @Override @@ -57,7 +58,8 @@ public void onWorkerPoolSizeChange(final int numberOfWorkers) { } }; workerPool = new WorkerPool(WORKER_QUEUE_NAME_TEST, new MockWorkerProducer(), workerUpdateHandler); - taskConsumer = new TaskConsumer(mock(ExecutorService.class), "testqueue", false, mock(ForwardTaskHandler.class), new MockAdaptorFactory()) { + taskConsumer = new TaskConsumer(mock(ExecutorService.class), new QueueConfig("testqueue", false, null), mock(ForwardTaskHandler.class), + new MockAdaptorFactory()) { @Override public void messageDelivered(final MessageMetaData message) { WorkerPoolTest.this.message = (RabbitMQMessageMetaData) message; @@ -66,7 +68,7 @@ public void messageDelivered(final MessageMetaData message) { } @Test - void testWorkerPoolSizing() throws InterruptedException, IOException { + void testWorkerPoolSizing() throws IOException { assertSame(0, workerPool.getCurrentWorkerSize(), "Check if workerPool size is empty at start"); workerPool.onNumberOfWorkersUpdate(10, 0); assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is changed after sizing"); @@ -81,12 +83,12 @@ void testWorkerPoolSizing() throws InterruptedException, IOException { } @Test - void testNoFreeWorkers() throws IOException, InterruptedException { + void testNoFreeWorkers() { assertThrows(NoFreeWorkersException.class, () -> workerPool.sendTaskToWorker(createTask())); } @Test - void testWorkerPoolScaleDown() throws IOException, InterruptedException { + void testWorkerPoolScaleDown() throws IOException { workerPool.onNumberOfWorkersUpdate(5, 0); final Task task1 = createTask(); workerPool.sendTaskToWorker(task1); @@ -107,7 +109,7 @@ void testWorkerPoolScaleDown() throws IOException, InterruptedException { } @Test - void testReleaseTaskTwice() throws IOException, InterruptedException { + void testReleaseTaskTwice() throws IOException { workerPool.onNumberOfWorkersUpdate(2, 0); final Task task1 = createTask(); workerPool.sendTaskToWorker(task1); @@ -122,7 +124,7 @@ void testReleaseTaskTwice() throws IOException, InterruptedException { @Disabled("Exception is not thrown anymore, so test ignored for now") @Test - void testSendSameTaskTwice() throws IOException, InterruptedException { + void testSendSameTaskTwice() { assertThrows(TaskAlreadySentException.class, () -> { workerPool.onNumberOfWorkersUpdate(3, 0); final Task task1 = createTask(); @@ -132,7 +134,7 @@ void testSendSameTaskTwice() throws IOException, InterruptedException { } @Test - void testMessageDeliverd() throws IOException, InterruptedException { + void testMessageDeliverd() throws IOException { workerPool.onNumberOfWorkersUpdate(1, 0); final Task task1 = createTask(); workerPool.sendTaskToWorker(task1); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java index b18337a..e15b0f9 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java @@ -47,6 +47,7 @@ import nl.aerius.taskmanager.adaptor.TaskMessageHandler; import nl.aerius.taskmanager.adaptor.TaskMessageHandler.MessageReceivedHandler; import nl.aerius.taskmanager.domain.Message; +import nl.aerius.taskmanager.domain.QueueConfig; /** * Test class for {@link RabbitMQMessageHandler}. @@ -61,7 +62,7 @@ class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest { @Timeout(10000) void testMessageReceivedHandler() throws IOException, InterruptedException { final byte[] receivedBody = "4321".getBytes(); - final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(taskQueueName, false); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, null)); final Semaphore lock = new Semaphore(0); final DataDock data = new DataDock(); tmh.start(); @@ -107,7 +108,7 @@ void testReStart() throws IOException, InterruptedException { final AtomicInteger shutdownCallsCounter = new AtomicInteger(); final MessageReceivedHandler mockMessageReceivedHandler = mock(MessageReceivedHandler.class); - final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(taskQueueName, false); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, null)); ((RabbitMQMessageHandler) tmh).setRetryTimeMilliseconds(1L); doAnswer(invoke -> null).when(mockChannel).addShutdownListener(shutdownListenerCaptor.capture()); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueUtilTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueUtilTest.java new file mode 100644 index 0000000..c85e1ee --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueUtilTest.java @@ -0,0 +1,60 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.mq; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; + +import nl.aerius.taskmanager.domain.RabbitMQQueueType; + +/** + * Test class for {@link RabbitMQQueueUtil} + */ +class RabbitMQQueueUtilTest { + + @ParameterizedTest + @CsvSource({"true", "false"}) + void testQueueDeclareArgumentsLegacy(final boolean durable) { + assertTrue(RabbitMQQueueUtil.queueDeclareArguments(durable, null).isEmpty(), "Should return empty map if type is null, and durable: " + durable); + } + + @ParameterizedTest + @MethodSource("combinations") + void testQueueDeclareArguments(final boolean durable, final RabbitMQQueueType inputType, final RabbitMQQueueType expectedType) { + assertEquals(expectedType.type(), RabbitMQQueueUtil.queueDeclareArguments(durable, inputType).get("x-queue-type"), + "Should return the expected queue type"); + } + + private static List combinations() { + return List.of( + Arguments.of(false, RabbitMQQueueType.CLASSIC, RabbitMQQueueType.CLASSIC), + Arguments.of(false, RabbitMQQueueType.QUORUM, RabbitMQQueueType.QUORUM), + Arguments.of(false, RabbitMQQueueType.STREAM, RabbitMQQueueType.STREAM), + Arguments.of(true, RabbitMQQueueType.CLASSIC, RabbitMQQueueType.CLASSIC), + // when durable is true always return classic because only queue type compatible with classic + Arguments.of(true, RabbitMQQueueType.QUORUM, RabbitMQQueueType.CLASSIC), + Arguments.of(true, RabbitMQQueueType.STREAM, RabbitMQQueueType.CLASSIC) + ); + } +} diff --git a/source/taskmanager/src/test/resources/queue/priority-task-scheduler.ops.json b/source/taskmanager/src/test/resources/queue/priority-task-scheduler.ops.json index d6b50f8..626c7d6 100644 --- a/source/taskmanager/src/test/resources/queue/priority-task-scheduler.ops.json +++ b/source/taskmanager/src/test/resources/queue/priority-task-scheduler.ops.json @@ -1,5 +1,6 @@ { "workerQueueName": "ops", + "queueType": "stream", "queues": [ { "queueName": "calculation_ui",