Skip to content

Commit

Permalink
Adding support for different RabbitMQ queue types. (#77)
Browse files Browse the repository at this point in the history
Also updated RabbitMQ libraries to latest version available, and set TaskManager to java 17.
  • Loading branch information
Hilbrand authored Jan 9, 2025
1 parent be5d40d commit f5792fc
Show file tree
Hide file tree
Showing 23 changed files with 263 additions and 65 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The json format of the configuration files is as follows:
{
"workerQueueName": "<type of the queue>",
"durable" : <true|false>
"queueType": <classic|quorum|stream>
"queues": [
{
"queueName": "<client queue name>",
Expand All @@ -109,6 +110,13 @@ Some queues can have derived messages on the queue, that are recreated by the pa
For these queues it would make sense to not make them durable.
RabbitMQ will require less storage space/IOPS and be faster as it won't need to depend on disk I/O for these queues.

The parameter `queueType` specifies the RabbitMQ type of the queues to be created.
If set it will set the argument `x-queue-type` when declaring queues.
If not set it will not set this arguments (used for backward compatibility, defaults to `classic`).
If `durable` is true queues will be set to `classic` (or default if not set) because other queue types are durable by default.
Changing the `queueType` configuration parameter if the queues are already created won't work.
If `queueType` needs to be changed, the queues need to be deleted first.

In `queues` there can be 1 or more queue configurations.
Each queue configuration consists of 3 parameters:
* `queueName`: The postfix of the client queue of which the task manager constructs the full queue name using the worker queue name.
Expand Down
4 changes: 2 additions & 2 deletions source/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Builder part for maven packaging.
FROM maven:3-openjdk-11 as builder
FROM maven:3-openjdk-17 as builder

WORKDIR /app

Expand All @@ -13,7 +13,7 @@ RUN mv -v "$(ls taskmanager/target/taskmanager-*.jar | grep -v -e '-javadoc' -e
&& mvn clean

# Final image containing only the application
FROM openjdk:11-jre-buster
FROM eclipse-temurin:17

# Set up a non root account to run containers as
RUN groupadd -r taskmanager \
Expand Down
6 changes: 2 additions & 4 deletions source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
</scm>

<properties>
<java.version>11</java.version>

<spotless.version>2.28.0</spotless.version>
<dropwizard.metrics.version>4.2.13</dropwizard.metrics.version>
<slf4j.version>2.0.5</slf4j.version>
Expand All @@ -72,7 +70,7 @@
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
<version>5.24.0</version>
</dependency>

<!-- CLI -->
Expand Down Expand Up @@ -287,7 +285,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.4.1</version>
<version>3.6.3</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand Down
4 changes: 4 additions & 0 deletions source/taskmanager-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<name>Taskmanager :: Client Library</name>
<description>The Library for Taskmanager Client code</description>

<properties>
<java.version>11</java.version>
</properties>

<dependencies>
<!-- RabbitMQ -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import nl.aerius.taskmanager.adaptor.TaskMessageHandler.MessageReceivedHandler;
import nl.aerius.taskmanager.domain.Message;
import nl.aerius.taskmanager.domain.MessageMetaData;
import nl.aerius.taskmanager.domain.QueueConfig;

/**
* Task manager part of retrieving tasks from the client queues and send them to the dispatcher, which in case will send them to the scheduler.
Expand All @@ -48,12 +49,12 @@ class TaskConsumer implements MessageReceivedHandler {
private Future<?> messageHandlerFuture;

@SuppressWarnings("unchecked")
public TaskConsumer(final ExecutorService executorService, final String taskQueueName, final boolean durable,
final ForwardTaskHandler forwardTaskHandler, final AdaptorFactory factory) throws IOException {
public TaskConsumer(final ExecutorService executorService, final QueueConfig queueConfig, final ForwardTaskHandler forwardTaskHandler,
final AdaptorFactory factory) throws IOException {
this.executorService = executorService;
this.taskQueueName = taskQueueName;
this.taskQueueName = queueConfig.queueName();
this.forwardTaskHandler = forwardTaskHandler;
this.taskMessageHandler = factory.createTaskMessageHandler(taskQueueName, durable);
this.taskMessageHandler = factory.createTaskMessageHandler(queueConfig);
taskMessageHandler.addMessageReceivedHandler(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import nl.aerius.taskmanager.adaptor.AdaptorFactory;
import nl.aerius.taskmanager.adaptor.WorkerProducer;
import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy;
import nl.aerius.taskmanager.domain.QueueConfig;
import nl.aerius.taskmanager.domain.RabbitMQQueueType;
import nl.aerius.taskmanager.domain.TaskQueue;
import nl.aerius.taskmanager.domain.TaskSchedule;

Expand Down Expand Up @@ -63,10 +65,9 @@ public TaskManager(final ExecutorService executorService, final AdaptorFactory f
* Add or Update a new task scheduler.
*
* @param schedule scheduler configuration
* @throws IOException
* @throws InterruptedException
*/
public boolean updateTaskScheduler(final TaskSchedule<T> schedule) throws IOException, InterruptedException {
public boolean updateTaskScheduler(final TaskSchedule<T> schedule) throws InterruptedException {
// Set up scheduler with worker pool
final String workerQueueName = schedule.getWorkerQueueName();
if (!buckets.containsKey(workerQueueName)) {
Expand All @@ -75,7 +76,7 @@ public boolean updateTaskScheduler(final TaskSchedule<T> schedule) throws IOExce
}
final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName);

taskScheduleBucket.updateQueues(schedule.getQueues(), schedule.isDurable());
taskScheduleBucket.updateQueues(schedule.getQueues(), schedule.isDurable(), schedule.getQueueType());
return taskScheduleBucket.isRunning();
}

Expand Down Expand Up @@ -109,7 +110,7 @@ private class TaskScheduleBucket {
private final TaskScheduler<T> taskScheduler;
private final String workerQueueName;

public TaskScheduleBucket(final String workerQueueName, final boolean durable) throws IOException, InterruptedException {
public TaskScheduleBucket(final String workerQueueName, final boolean durable) throws InterruptedException {
this.workerQueueName = workerQueueName;
taskScheduler = schedulerFactory.createScheduler(workerQueueName);
LOG.info("Worker Queue Name:{} (durable:{})", workerQueueName, durable);
Expand All @@ -134,36 +135,33 @@ public boolean isRunning() {
return dispatcher.isRunning();
}

private void updateQueues(final List<T> newTaskQueues, final boolean durable) {
private void updateQueues(final List<T> newTaskQueues, final boolean durable, final RabbitMQQueueType rabbitMQQueueType) {
final Map<String, ? extends TaskQueue> newTaskQueuesMap = newTaskQueues.stream().filter(Objects::nonNull)
.collect(Collectors.toMap(TaskQueue::getQueueName, Function.identity()));
// Remove queues that are not in the new list
final List<Entry<String, TaskConsumer>> removedQueues = taskConsumers.entrySet().stream().filter(e -> !newTaskQueuesMap.containsKey(e.getKey()))
.collect(Collectors.toList());
.toList();
removedQueues.forEach(e -> removeTaskConsumer(e.getKey()));
// Add and Update existing queues
newTaskQueues.stream().filter(Objects::nonNull).forEach(tc -> addOrUpdateTaskQueue(tc, durable));
newTaskQueues.stream().filter(Objects::nonNull).forEach(tc -> addOrUpdateTaskQueue(tc, durable, rabbitMQQueueType));
}

private void addOrUpdateTaskQueue(final T taskQueueConfiguration, final boolean durable) {
final String taskQueueName = taskQueueConfiguration.getQueueName();
addTaskConsumerIfAbsent(taskQueueName, durable);
private void addOrUpdateTaskQueue(final T taskQueueConfiguration, final boolean durable, final RabbitMQQueueType rabbitMQQueueType) {
addTaskConsumerIfAbsent(new QueueConfig(taskQueueConfiguration.getQueueName(), durable, rabbitMQQueueType));
taskScheduler.updateQueue(taskQueueConfiguration);
}

/**
* Adds a task consumer.
*
* @param taskQueueName queue name of the task consumer
* @param durable true if consumer queue should be created durable
* @throws IOException
* @param queueConfig Configuration parameters for the queue
*/
public void addTaskConsumerIfAbsent(final String taskQueueName, final boolean durable) {
taskConsumers.computeIfAbsent(taskQueueName, tqn -> {
public void addTaskConsumerIfAbsent(final QueueConfig queueConfig) {
taskConsumers.computeIfAbsent(queueConfig.queueName(), tqn -> {
try {
final TaskConsumer taskConsumer = new TaskConsumer(executorService, taskQueueName, durable, dispatcher, factory);
final TaskConsumer taskConsumer = new TaskConsumer(executorService, queueConfig, dispatcher, factory);
taskConsumer.start();
LOG.info("Started task queue {} (durable:{})", taskQueueName, durable);
LOG.info("Started task queue {} (durable:{}, queueType:{})", queueConfig.queueName(), queueConfig.durable(), queueConfig.queueType());
return taskConsumer;
} catch (final IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.IOException;

import nl.aerius.taskmanager.domain.QueueConfig;

/**
* Interface between the task manager and implementing communication service.
*/
Expand All @@ -39,10 +41,9 @@ public interface AdaptorFactory {

/**
* Creates a new TaksMessageHandler for the given worker type and queue.
* @param taskQueueName queue name
* @param durable true if the queue created should be persistent during server restart
* @param queueConfig Configuration parameters for the queue
* @return new TaksMessageHandler object
* @throws IOException error in case or connection problems
*/
TaskMessageHandler createTaskMessageHandler(String taskQueueName, boolean durable) throws IOException;
TaskMessageHandler createTaskMessageHandler(QueueConfig queueConfig) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright the State of the Netherlands
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package nl.aerius.taskmanager.domain;

public record QueueConfig(String queueName, boolean durable, RabbitMQQueueType queueType) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright the State of the Netherlands
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package nl.aerius.taskmanager.domain;

import java.util.Locale;

public enum RabbitMQQueueType {
CLASSIC,
QUORUM,
STREAM;

public String type() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;

/**
* Base class for a single schedule configuration.
Expand All @@ -29,6 +30,8 @@ public class TaskSchedule<T extends TaskQueue> {

private Boolean durable;

private RabbitMQQueueType queueType;

private List<T> queues = new ArrayList<>();

public String getWorkerQueueName() {
Expand All @@ -54,4 +57,12 @@ public void setDurable(final boolean durable) {
public boolean isDurable() {
return !Boolean.FALSE.equals(durable);
}

public RabbitMQQueueType getQueueType() {
return queueType;
}

public void setQueueType(final String queueType) {
this.queueType = RabbitMQQueueType.valueOf(queueType.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import nl.aerius.taskmanager.adaptor.WorkerProducer;
import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy;
import nl.aerius.taskmanager.client.BrokerConnectionFactory;
import nl.aerius.taskmanager.domain.QueueConfig;

/**
* RabbitMQ implementation of the {@link AdaptorFactory}.
Expand All @@ -45,8 +46,8 @@ public RabbitMQAdaptorFactory(final ScheduledExecutorService executorService, fi
}

@Override
public TaskMessageHandler createTaskMessageHandler(final String taskQueueName, final boolean durable) throws IOException {
return new RabbitMQMessageHandler(factory, taskQueueName, durable);
public TaskMessageHandler createTaskMessageHandler(final QueueConfig queueConfig) throws IOException {
return new RabbitMQMessageHandler(factory, queueConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import nl.aerius.taskmanager.domain.QueueConfig;

/**
* Implementation of RabbitMQ's DefaultConsumer for the taskmanager.
*
Expand All @@ -42,22 +44,23 @@ interface ConsumerCallback {

private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConsumer.class);

private final QueueConfig queueConfig;
private final String queueName;
private final ConsumerCallback callback;
private final boolean durable;

RabbitMQMessageConsumer(final Channel channel, final String queueName, final boolean durable, final ConsumerCallback callback) {
RabbitMQMessageConsumer(final Channel channel, final QueueConfig queueConfig, final ConsumerCallback callback) {
super(channel);
this.queueName = queueName;
this.durable = durable;
this.queueConfig = queueConfig;
this.queueName = queueConfig.queueName();
this.callback = callback;
}

public void startConsuming() throws IOException {
LOG.debug("Starting consumer {}.", queueName);
final Channel taskChannel = getChannel();
// ensure a durable channel exists
taskChannel.queueDeclare(queueName, durable, false, false, null);
taskChannel.queueDeclare(queueName, queueConfig.durable(), false, false,
RabbitMQQueueUtil.queueDeclareArguments(queueConfig.durable(), queueConfig.queueType()));
//ensure only one message gets delivered at a time.
taskChannel.basicQos(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import nl.aerius.taskmanager.adaptor.TaskMessageHandler;
import nl.aerius.taskmanager.client.BrokerConnectionFactory;
import nl.aerius.taskmanager.client.WorkerResultSender;
import nl.aerius.taskmanager.domain.QueueConfig;
import nl.aerius.taskmanager.mq.RabbitMQMessageConsumer.ConsumerCallback;

/**
Expand All @@ -41,8 +42,8 @@ class RabbitMQMessageHandler implements TaskMessageHandler<RabbitMQMessageMetaDa
private static final int DEFAULT_RETRY_SECONDS = 10;

private final BrokerConnectionFactory factory;
private final QueueConfig queueConfig;
private final String taskQueueName;
private final boolean durable;

private MessageReceivedHandler messageReceivedHandler;
private RabbitMQMessageConsumer consumer;
Expand All @@ -65,14 +66,12 @@ class RabbitMQMessageHandler implements TaskMessageHandler<RabbitMQMessageMetaDa
* Constructor.
*
* @param factory the factory to get the a RabbitMQ connection from
* @param taskQueueName the name of the task queue
* @param durable if true the queue will be created persistent
* @throws IOException
* @param queueConfig the configuration parameters of the queue
*/
public RabbitMQMessageHandler(final BrokerConnectionFactory factory, final String taskQueueName, final boolean durable) throws IOException {
public RabbitMQMessageHandler(final BrokerConnectionFactory factory, final QueueConfig queueConfig) {
this.factory = factory;
this.taskQueueName = taskQueueName;
this.durable = durable;
this.queueConfig = queueConfig;
this.taskQueueName = queueConfig.queueName();
}

@Override
Expand Down Expand Up @@ -155,8 +154,7 @@ private void stopAndStartConsumer() throws IOException {
}
consumer = new RabbitMQMessageConsumer(
factory.getConnection().createChannel(),
taskQueueName,
durable,
queueConfig,
this);
consumer.getChannel().addShutdownListener(this::handleShutdownSignal);
consumer.startConsuming();
Expand Down
Loading

0 comments on commit f5792fc

Please sign in to comment.