Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.RabbitMq] Improve consumer concurrency #352

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.6.1</Version>
<Version>2.6.2-rc13</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ public static RabbitMqMessageRoutingKeyProvider<object> GetMessageRoutingKeyProv
public static RabbitMqMessagePropertiesModifier<object> GetMessagePropertiesModifier(this HasProviderExtensions p, HasProviderExtensions settings = null)
=> p.GetOrDefault<RabbitMqMessagePropertiesModifier<object>>(RabbitMqProperties.MessagePropertiesModifier, settings, null);

public static string GetQueueName(this AbstractConsumerSettings p)
=> p.GetOrDefault<string>(RabbitMqProperties.QueueName, null);
public static string GetQueueName(this AbstractConsumerSettings c)
=> c.GetOrDefault<string>(RabbitMqProperties.QueueName, null);

public static string GetQueueName(this RequestResponseSettings p)
=> p.GetOrDefault<string>(RabbitMqProperties.QueueName, null);
public static string GetBindingRoutingKey(this AbstractConsumerSettings c, HasProviderExtensions settings = null)
=> c.GetOrDefault<string>(RabbitMqProperties.BindingRoutingKey, settings, null);

public static string GetExchageType(this ProducerSettings p, HasProviderExtensions settings = null)
=> p.GetOrDefault<string>(RabbitMqProperties.ExchangeType, settings, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

using System;

public static class RabbitMqMessageBusSettingsExtensions
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public abstract class AbstractRabbitMqConsumer : AbstractConsumer
private AsyncEventingBasicConsumer _consumer;
private string _consumerTag;

protected string QueueName { get; }
public string QueueName { get; }
protected abstract RabbitMqMessageAcknowledgementMode AcknowledgementMode { get; }

protected AbstractRabbitMqConsumer(ILogger logger, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter)
Expand Down Expand Up @@ -82,7 +82,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve

protected abstract Task<Exception> OnMessageReceived(Dictionary<string, object> messageHeaders, BasicDeliverEventArgs transportMessage);

protected void NackMessage(BasicDeliverEventArgs @event, bool requeue)
public void NackMessage(BasicDeliverEventArgs @event, bool requeue)
{
lock (_channel.ChannelLock)
{
Expand All @@ -91,7 +91,7 @@ protected void NackMessage(BasicDeliverEventArgs @event, bool requeue)
}
}

protected void AckMessage(BasicDeliverEventArgs @event)
public void AckMessage(BasicDeliverEventArgs @event)
{
lock (_channel.ChannelLock)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace SlimMessageBus.Host.RabbitMQ;

/// <summary>
/// Decorator for see <see cref="IMessageProcessor{TMessage}"/> that automatically acknowledges the message after processing.
/// </summary>
/// <param name="target"></param>
/// <param name="logger"></param>
/// <param name="acknowledgementMode"></param>
/// <param name="consumer"></param>
internal sealed class RabbitMqAutoAcknowledgeMessageProcessor(IMessageProcessor<BasicDeliverEventArgs> target,
ILogger logger,
RabbitMqMessageAcknowledgementMode acknowledgementMode,
IRabbitMqConsumer consumer)
: IMessageProcessor<BasicDeliverEventArgs>, IDisposable
{
public IReadOnlyCollection<AbstractConsumerSettings> ConsumerSettings => target.ConsumerSettings;

public void Dispose()
{
if (target is IDisposable targetDisposable)
{
targetDisposable.Dispose();
}
}

public async Task<ProcessMessageResult> ProcessMessage(BasicDeliverEventArgs transportMessage, IReadOnlyDictionary<string, object> messageHeaders, IDictionary<string, object> consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default)
{
var r = await target.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: cancellationToken);

if (acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
{
// Acknowledge after processing
var confirmOption = r.Exception != null
? RabbitMqMessageConfirmOptions.Nack // NAck after processing when message fails (unless the user already acknowledged in any way).
: RabbitMqMessageConfirmOptions.Ack; // Acknowledge after processing

consumer.ConfirmMessage(transportMessage, confirmOption, consumerContextProperties);
}

if (r.Exception != null)
{
// We rely on the IMessageProcessor to execute the ConsumerErrorHandler<T>, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost.
logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, consumer.QueueName, transportMessage, transportMessage.DeliveryTag);
}
return r;
}
}
102 changes: 77 additions & 25 deletions src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,74 @@
namespace SlimMessageBus.Host.RabbitMQ;

using Microsoft.Extensions.Logging;
public interface IRabbitMqConsumer
{
string QueueName { get; }
void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false);
}

public class RabbitMqConsumer : AbstractRabbitMqConsumer
public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer
{
public static readonly string ContextProperty_MessageConfirmed = "RabbitMq_MessageConfirmed";

private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode;
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
private readonly IDictionary<string, IMessageProcessor<BasicDeliverEventArgs>> _messageProcessorByRoutingKey;

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;

public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, IList<ConsumerSettings> consumers, IMessageSerializer serializer, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName: queueName, headerValueConverter)
{
_acknowledgementMode = consumers.Select(x => x.GetOrDefault<RabbitMqMessageAcknowledgementMode?>(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
_messageProcessor = new MessageProcessor<BasicDeliverEventArgs>(
consumers,
messageBus,
path: queueName,
responseProducer: messageBus,
messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()),
consumerContextInitializer: InitializeConsumerContext,
consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>));

IMessageProcessor<BasicDeliverEventArgs> CreateMessageProcessor(IEnumerable<ConsumerSettings> consumers)
{
IMessageProcessor<BasicDeliverEventArgs> messageProcessor = new MessageProcessor<BasicDeliverEventArgs>(
consumers,
messageBus,
path: queueName,
responseProducer: messageBus,
messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()),
consumerContextInitializer: InitializeConsumerContext,
consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>));

messageProcessor = new RabbitMqAutoAcknowledgeMessageProcessor(messageProcessor, Logger, _acknowledgementMode, this);

// pick the maximum number of instances
var instances = consumers.Max(x => x.Instances);
// For a given rabbit channel, there is only 1 task that dispatches messages. We want to be be able to let each SMB consume process within its own task (1 or more)
messageProcessor = new ConcurrentMessageProcessorDecorator<BasicDeliverEventArgs>(instances, loggerFactory, messageProcessor);

return messageProcessor;
}

_messageProcessorByRoutingKey = consumers
.GroupBy(x => x.GetBindingRoutingKey() ?? string.Empty)
.ToDictionary(x => x.Key, CreateMessageProcessor);

_messageProcessor = _messageProcessorByRoutingKey.Count == 1 && _messageProcessorByRoutingKey.TryGetValue(string.Empty, out var value)
? value : null;
}

protected override async Task OnStop()
{
try
{
// Wait max 5 seconds for all background processing tasks to complete
using var taskCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var backgrounProcessingTasks = _messageProcessorByRoutingKey.Values
.OfType<ConcurrentMessageProcessorDecorator<BasicDeliverEventArgs>>()
.Select(x => x.WaitAll(taskCancellationSource.Token));

await Task.WhenAll(backgrounProcessingTasks);
}
catch (Exception e)
{
Logger.LogError(e, "Error occurred while waiting for background processing tasks to complete");
}

await base.OnStop();
}

private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
Expand All @@ -40,14 +85,14 @@ private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, C
consumerContext.SetConfirmAction(option => ConfirmMessage(transportMessage, option, consumerContext.Properties, warnIfAlreadyConfirmed: true));
}

private void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false)
public void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false)
{
if (properties.TryGetValue(ContextProperty_MessageConfirmed, out var confirmed) && confirmed is true)
{
// Note: We want to makes sure the 1st message confirmation is handled
if (warnIfAlreadyConfirmed)
{
Logger.LogWarning("The message (delivery tag {MessageDeliveryTag}, queue name {QueueName}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.DeliveryTag, QueueName);
Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, QueueName, transportMessage.DeliveryTag);
}
return;
}
Expand Down Expand Up @@ -80,23 +125,30 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
ConfirmMessage(transportMessage, RabbitMqMessageConfirmOptions.Ack, consumerContextProperties);
}

var r = await _messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);

if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
var messageProcessor = _messageProcessor;
if (messageProcessor != null || _messageProcessorByRoutingKey.TryGetValue(transportMessage.RoutingKey, out messageProcessor))
{
// Acknowledge after processing
var confirmOption = r.Exception != null
? RabbitMqMessageConfirmOptions.Nack // NAck after processing when message fails (unless the user already acknowledged in any way).
: RabbitMqMessageConfirmOptions.Ack; // Acknowledge after processing
ConfirmMessage(transportMessage, confirmOption, consumerContextProperties);
await messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);
}

if (r.Exception != null)
else
{
// We rely on the IMessageProcessor to execute the ConsumerErrorHandler<T>, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost.
Logger.LogError(r.Exception, "Error processing message {Message} from exchange {Exchange}, delivery tag {DeliveryTag}", transportMessage, transportMessage.Exchange, transportMessage.DeliveryTag);
Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, QueueName, transportMessage.RoutingKey);
}

// error handling happens in the message processor
return null;
}

protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore();

foreach (var messageProcessor in _messageProcessorByRoutingKey.Values)
{
if (messageProcessor is IDisposable disposable)
{
disposable.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer
protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade;

public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName: queueName, headerValueConverter)
{
_messageProcessor = new ResponseMessageProcessor<BasicDeliverEventArgs>(loggerFactory, requestResponseSettings, messageBus, m => m.Body.ToArray());
}
Expand Down
26 changes: 16 additions & 10 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,30 @@ private async Task CreateConnection()
{
try
{
var retryCount = 3;
for (var retry = 0; _connection == null && retry < retryCount; retry++)
{
try
const int retryCount = 3;
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
await Retry.WithDelay(operation: async (cancellationTask) =>
{
// See https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-recovery
ProviderSettings.ConnectionFactory.AutomaticRecoveryEnabled = true;
ProviderSettings.ConnectionFactory.DispatchConsumersAsync = true;

_connection = ProviderSettings.Endpoints != null && ProviderSettings.Endpoints.Count > 0
? ProviderSettings.ConnectionFactory.CreateConnection(ProviderSettings.Endpoints)
: ProviderSettings.ConnectionFactory.CreateConnection();
}
catch (global::RabbitMQ.Client.Exceptions.BrokerUnreachableException e)
},
shouldRetry: (ex, attempt) =>
{
_logger.LogInformation(e, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", retry, retryCount);
await Task.Delay(ProviderSettings.ConnectionFactory.NetworkRecoveryInterval);
}
}
if (ex is global::RabbitMQ.Client.Exceptions.BrokerUnreachableException && attempt < retryCount)
{
_logger.LogInformation(ex, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", attempt, retryCount);
return true;
}
return false;
},
delay: ProviderSettings.ConnectionFactory.NetworkRecoveryInterval
);
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

lock (_channelLock)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ public string ConnectionString

public ConnectionFactory ConnectionFactory { get; set; } = new()
{
NetworkRecoveryInterval = TimeSpan.FromSeconds(5)
NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
// By default the consumer dispatch is single threaded, we can increase it to the number of consumers by applying the .Instances(10) setting
ConsumerDispatchConcurrency = 1
};

public IList<AmqpTcpEndpoint> Endpoints { get; set; } = new List<AmqpTcpEndpoint>();
public IList<AmqpTcpEndpoint> Endpoints { get; set; } = [];

/// <summary>
/// Allows to set a custom header values converter between SMB and the underlying RabbitMq client.
Expand Down
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ private void ProvisionConsumers()
}
}

private void DeclareQueueBinding(HasProviderExtensions settings, string bindingExchangeName, string queueName)
private void DeclareQueueBinding(AbstractConsumerSettings settings, string bindingExchangeName, string queueName)
{
var bindingRoutingKey = settings.GetOrDefault(RabbitMqProperties.BindingRoutingKey, _providerSettings, string.Empty);
var bindingRoutingKey = settings.GetBindingRoutingKey(_providerSettings) ?? string.Empty;

_logger.LogInformation("Binding queue {QueueName} to exchange {ExchangeName} using routing key {RoutingKey}", queueName, bindingExchangeName, bindingRoutingKey);
try
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor<Me
}

// When it was requested to have more than once concurrent instances working then we need to fan out the incoming Redis consumption tasks
processor = new ConcurrencyIncreasingMessageProcessorDecorator<MessageWithHeaders>(instances, this, processor);
processor = new ConcurrentMessageProcessorDecorator<MessageWithHeaders>(instances, LoggerFactory, processor);
}

_logger.LogInformation("Creating consumer for redis {PathKind} {Path}", GetPathKindString(pathKind), path);
Expand Down
Loading
Loading