Skip to content

Commit

Permalink
Fix for declaring worker queue (#78)
Browse files Browse the repository at this point in the history
Changes made to declaring worker queue incorrectly created worker queue always are quorum, and also missed a call to one declare. With this change it's backward compatible with older RabbitMQ (one without queue types)
  • Loading branch information
Hilbrand authored Jan 10, 2025
1 parent f5792fc commit c4a1378
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public boolean updateTaskScheduler(final TaskSchedule<T> schedule) throws Interr
final String workerQueueName = schedule.getWorkerQueueName();
if (!buckets.containsKey(workerQueueName)) {
LOG.info("Added scheduler for worker queue {}", workerQueueName);
buckets.put(workerQueueName, new TaskScheduleBucket(workerQueueName, schedule.isDurable()));
buckets.put(workerQueueName, new TaskScheduleBucket(new QueueConfig(workerQueueName, schedule.isDurable(), schedule.getQueueType())));
}
final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName);

Expand Down Expand Up @@ -110,11 +110,11 @@ private class TaskScheduleBucket {
private final TaskScheduler<T> taskScheduler;
private final String workerQueueName;

public TaskScheduleBucket(final String workerQueueName, final boolean durable) throws InterruptedException {
this.workerQueueName = workerQueueName;
public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException {
this.workerQueueName = queueConfig.queueName();
taskScheduler = schedulerFactory.createScheduler(workerQueueName);
LOG.info("Worker Queue Name:{} (durable:{})", workerQueueName, durable);
workerProducer = factory.createWorkerProducer(workerQueueName, durable);
LOG.info("Worker Queue Name:{} (durable:{}, queueType:{})", workerQueueName, queueConfig.durable(), queueConfig.queueType());
workerProducer = factory.createWorkerProducer(queueConfig);
final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler);
workerSizeObserverProxy.addObserver(workerQueueName, workerPool);
workerProducer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ public interface AdaptorFactory {

/**
* Creates a new worker producer for the given worker type.
* @param workerQueueName name of queue of the worker
* @param durable true if the queue created should be persistent during server restart
* @param queueConfig Configuration parameters for the queue
* @return new worker producer object
*/
WorkerProducer createWorkerProducer(String workerQueueName, boolean durable);
WorkerProducer createWorkerProducer(QueueConfig queueConfig);

/**
* Creates a new TaksMessageHandler for the given worker type and queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public WorkerSizeProviderProxy createWorkerSizeProvider() {
}

@Override
public WorkerProducer createWorkerProducer(final String workerQueueName, final boolean durable) {
return new RabbitMQWorkerProducer(factory, workerQueueName, durable);
public WorkerProducer createWorkerProducer(final QueueConfig queueConfig) {
return new RabbitMQWorkerProducer(factory, queueConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -36,6 +37,7 @@
import nl.aerius.taskmanager.client.BrokerConnectionFactory;
import nl.aerius.taskmanager.client.QueueConstants;
import nl.aerius.taskmanager.domain.Message;
import nl.aerius.taskmanager.domain.QueueConfig;
import nl.aerius.taskmanager.domain.RabbitMQQueueType;

/**
Expand All @@ -51,16 +53,17 @@ class RabbitMQWorkerProducer implements WorkerProducer {

private final BrokerConnectionFactory factory;
private final String workerQueueName;
private final boolean durable;
private final RabbitMQQueueType queueType;

private WorkerFinishedHandler workerFinishedHandler;
private boolean isShutdown;
private final boolean durable;

public RabbitMQWorkerProducer(final BrokerConnectionFactory factory, final String workerQueueName,
final boolean durable) {
public RabbitMQWorkerProducer(final BrokerConnectionFactory factory, final QueueConfig queueConfig) {
this.factory = factory;
this.workerQueueName = workerQueueName;
this.durable = durable;
this.workerQueueName = queueConfig.queueName();
this.durable = queueConfig.durable();
this.queueType = queueConfig.queueType();
}

@Override
Expand All @@ -80,7 +83,7 @@ public void forwardMessage(final Message<?> message) throws IOException {
// or do we expect worker to send instead of CC the message?
final Channel channel = factory.getConnection().createChannel();
try {
channel.queueDeclare(workerQueueName, durable, false, false, null);
channel.queueDeclare(workerQueueName, durable, false, false, RabbitMQQueueUtil.queueDeclareArguments(durable, queueType));
final BasicProperties.Builder forwardBuilder = rabbitMQMessage.getProperties().builder();
// new header map (even in case of existing headers, original can be a UnmodifiableMap)
final Map<String, Object> headers = rabbitMQMessage.getProperties().getHeaders() == null ? new HashMap<>()
Expand Down Expand Up @@ -128,7 +131,8 @@ private void tryStartReplyConsumer() {
connection.removeShutdownListener(this::restartConnection);
}
if (warn) {
LOG.warn("(Re)starting reply consumer for queue {} failed, retrying in a while", workerQueueName);
LOG.warn("(Re)starting reply consumer for queue {} failed, retrying in a while: {}", workerQueueName,
Optional.ofNullable(e1.getMessage()).orElse(Optional.ofNullable(e1.getCause()).map(Throwable::getMessage).orElse("Unknown")));
LOG.trace("(Re)starting failed with exception:", e1);
warn = false;
}
Expand Down Expand Up @@ -165,7 +169,7 @@ private void startReplyConsumer(final Connection connection) throws IOException
replyChannel.queueDeclare(workerReplyQueue, false, true, true, null);
// ensure the worker queue is around as well (so we can retrieve number of customers later on).
// Worker queue is durable and non-exclusive with autodelete off.
replyChannel.queueDeclare(workerQueueName, durable, false, false, RabbitMQQueueUtil.queueDeclareArguments(durable, RabbitMQQueueType.QUORUM));
replyChannel.queueDeclare(workerQueueName, durable, false, false, RabbitMQQueueUtil.queueDeclareArguments(durable, queueType));
replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public WorkerSizeProviderProxy createWorkerSizeProvider() {
}

@Override
public WorkerProducer createWorkerProducer(final String workerQueueName, final boolean durable) {
public WorkerProducer createWorkerProducer(final QueueConfig queueConfig) {
return mockWorkerProducer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import nl.aerius.taskmanager.adaptor.WorkerProducer;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.domain.QueueConfig;

/**
* Test class for {@link RabbitMQWorkerProducer}.
Expand All @@ -47,7 +48,7 @@ class RabbitMQWorkerProducerTest extends AbstractRabbitMQTest {
void testForwardMessage() throws IOException, InterruptedException {
final byte[] sendBody = "4321".getBytes();

final WorkerProducer wp = adapterFactory.createWorkerProducer(WORKER_QUEUE_NAME, false);
final WorkerProducer wp = adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, null));
wp.start();
final BasicProperties bp = new BasicProperties();
wp.forwardMessage(new RabbitMQMessage(WORKER_QUEUE_NAME, null, 4321, bp, sendBody) {
Expand Down

0 comments on commit c4a1378

Please sign in to comment.