diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index 5645b21f..f9bea6cc 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 2.6.1 + 2.6.2-rc13 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs index 312a8013..85e8c34e 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs @@ -8,11 +8,11 @@ public static RabbitMqMessageRoutingKeyProvider GetMessageRoutingKeyProv public static RabbitMqMessagePropertiesModifier GetMessagePropertiesModifier(this HasProviderExtensions p, HasProviderExtensions settings = null) => p.GetOrDefault>(RabbitMqProperties.MessagePropertiesModifier, settings, null); - public static string GetQueueName(this AbstractConsumerSettings p) - => p.GetOrDefault(RabbitMqProperties.QueueName, null); + public static string GetQueueName(this AbstractConsumerSettings c) + => c.GetOrDefault(RabbitMqProperties.QueueName, null); - public static string GetQueueName(this RequestResponseSettings p) - => p.GetOrDefault(RabbitMqProperties.QueueName, null); + public static string GetBindingRoutingKey(this AbstractConsumerSettings c, HasProviderExtensions settings = null) + => c.GetOrDefault(RabbitMqProperties.BindingRoutingKey, settings, null); public static string GetExchageType(this ProducerSettings p, HasProviderExtensions settings = null) => p.GetOrDefault(RabbitMqProperties.ExchangeType, settings, null); diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusSettingsExtensions.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusSettingsExtensions.cs index e2634aa4..1eb66670 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusSettingsExtensions.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusSettingsExtensions.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host.RabbitMQ; -using System; - public static class RabbitMqMessageBusSettingsExtensions { /// diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs index 3100dbaa..10161c51 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs @@ -10,7 +10,7 @@ public abstract class AbstractRabbitMqConsumer : AbstractConsumer private AsyncEventingBasicConsumer _consumer; private string _consumerTag; - protected string QueueName { get; } + public string QueueName { get; } protected abstract RabbitMqMessageAcknowledgementMode AcknowledgementMode { get; } protected AbstractRabbitMqConsumer(ILogger logger, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter) @@ -82,7 +82,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve protected abstract Task OnMessageReceived(Dictionary messageHeaders, BasicDeliverEventArgs transportMessage); - protected void NackMessage(BasicDeliverEventArgs @event, bool requeue) + public void NackMessage(BasicDeliverEventArgs @event, bool requeue) { lock (_channel.ChannelLock) { @@ -91,7 +91,7 @@ protected void NackMessage(BasicDeliverEventArgs @event, bool requeue) } } - protected void AckMessage(BasicDeliverEventArgs @event) + public void AckMessage(BasicDeliverEventArgs @event) { lock (_channel.ChannelLock) { diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs new file mode 100644 index 00000000..9cad553e --- /dev/null +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs @@ -0,0 +1,47 @@ +namespace SlimMessageBus.Host.RabbitMQ; + +/// +/// Decorator for see that automatically acknowledges the message after processing. +/// +/// +/// +/// +/// +internal sealed class RabbitMqAutoAcknowledgeMessageProcessor(IMessageProcessor target, + ILogger logger, + RabbitMqMessageAcknowledgementMode acknowledgementMode, + IRabbitMqConsumer consumer) + : IMessageProcessor, IDisposable +{ + public IReadOnlyCollection ConsumerSettings => target.ConsumerSettings; + + public void Dispose() + { + if (target is IDisposable targetDisposable) + { + targetDisposable.Dispose(); + } + } + + public async Task ProcessMessage(BasicDeliverEventArgs transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) + { + var r = await target.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: cancellationToken); + + if (acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade) + { + // Acknowledge after processing + var confirmOption = r.Exception != null + ? RabbitMqMessageConfirmOptions.Nack // NAck after processing when message fails (unless the user already acknowledged in any way). + : RabbitMqMessageConfirmOptions.Ack; // Acknowledge after processing + + consumer.ConfirmMessage(transportMessage, confirmOption, consumerContextProperties); + } + + if (r.Exception != null) + { + // We rely on the IMessageProcessor to execute the ConsumerErrorHandler, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost. + logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, consumer.QueueName, transportMessage, transportMessage.DeliveryTag); + } + return r; + } +} diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs index 2040822c..b2a94b4f 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs @@ -1,29 +1,74 @@ namespace SlimMessageBus.Host.RabbitMQ; -using Microsoft.Extensions.Logging; +public interface IRabbitMqConsumer +{ + string QueueName { get; } + void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary properties, bool warnIfAlreadyConfirmed = false); +} -public class RabbitMqConsumer : AbstractRabbitMqConsumer +public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer { public static readonly string ContextProperty_MessageConfirmed = "RabbitMq_MessageConfirmed"; private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode; private readonly IMessageProcessor _messageProcessor; + private readonly IDictionary> _messageProcessorByRoutingKey; protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode; public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, IList consumers, IMessageSerializer serializer, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter) - : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) + : base(loggerFactory.CreateLogger(), channel, queueName: queueName, headerValueConverter) { _acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null) ?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode - _messageProcessor = new MessageProcessor( - consumers, - messageBus, - path: queueName, - responseProducer: messageBus, - messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()), - consumerContextInitializer: InitializeConsumerContext, - consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>)); + + IMessageProcessor CreateMessageProcessor(IEnumerable consumers) + { + IMessageProcessor messageProcessor = new MessageProcessor( + consumers, + messageBus, + path: queueName, + responseProducer: messageBus, + messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()), + consumerContextInitializer: InitializeConsumerContext, + consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>)); + + messageProcessor = new RabbitMqAutoAcknowledgeMessageProcessor(messageProcessor, Logger, _acknowledgementMode, this); + + // pick the maximum number of instances + var instances = consumers.Max(x => x.Instances); + // For a given rabbit channel, there is only 1 task that dispatches messages. We want to be be able to let each SMB consume process within its own task (1 or more) + messageProcessor = new ConcurrentMessageProcessorDecorator(instances, loggerFactory, messageProcessor); + + return messageProcessor; + } + + _messageProcessorByRoutingKey = consumers + .GroupBy(x => x.GetBindingRoutingKey() ?? string.Empty) + .ToDictionary(x => x.Key, CreateMessageProcessor); + + _messageProcessor = _messageProcessorByRoutingKey.Count == 1 && _messageProcessorByRoutingKey.TryGetValue(string.Empty, out var value) + ? value : null; + } + + protected override async Task OnStop() + { + try + { + // Wait max 5 seconds for all background processing tasks to complete + using var taskCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var backgrounProcessingTasks = _messageProcessorByRoutingKey.Values + .OfType>() + .Select(x => x.WaitAll(taskCancellationSource.Token)); + + await Task.WhenAll(backgrounProcessingTasks); + } + catch (Exception e) + { + Logger.LogError(e, "Error occurred while waiting for background processing tasks to complete"); + } + + await base.OnStop(); } private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext) @@ -40,14 +85,14 @@ private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, C consumerContext.SetConfirmAction(option => ConfirmMessage(transportMessage, option, consumerContext.Properties, warnIfAlreadyConfirmed: true)); } - private void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary properties, bool warnIfAlreadyConfirmed = false) + public void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary properties, bool warnIfAlreadyConfirmed = false) { if (properties.TryGetValue(ContextProperty_MessageConfirmed, out var confirmed) && confirmed is true) { // Note: We want to makes sure the 1st message confirmation is handled if (warnIfAlreadyConfirmed) { - Logger.LogWarning("The message (delivery tag {MessageDeliveryTag}, queue name {QueueName}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.DeliveryTag, QueueName); + Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, QueueName, transportMessage.DeliveryTag); } return; } @@ -80,23 +125,30 @@ protected override async Task OnMessageReceived(Dictionary, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost. - Logger.LogError(r.Exception, "Error processing message {Message} from exchange {Exchange}, delivery tag {DeliveryTag}", transportMessage, transportMessage.Exchange, transportMessage.DeliveryTag); + Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, QueueName, transportMessage.RoutingKey); } + // error handling happens in the message processor return null; } + + protected override async ValueTask DisposeAsyncCore() + { + await base.DisposeAsyncCore(); + + foreach (var messageProcessor in _messageProcessorByRoutingKey.Values) + { + if (messageProcessor is IDisposable disposable) + { + disposable.Dispose(); + } + } + } } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs index a40bc40e..d68ee57d 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs @@ -7,7 +7,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter) - : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) + : base(loggerFactory.CreateLogger(), channel, queueName: queueName, headerValueConverter) { _messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageBus, m => m.Body.ToArray()); } diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs index 528cd38f..febafe7a 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs @@ -62,24 +62,30 @@ private async Task CreateConnection() { try { - var retryCount = 3; - for (var retry = 0; _connection == null && retry < retryCount; retry++) - { - try + const int retryCount = 3; +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + await Retry.WithDelay(operation: async (cancellationTask) => { + // See https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-recovery ProviderSettings.ConnectionFactory.AutomaticRecoveryEnabled = true; ProviderSettings.ConnectionFactory.DispatchConsumersAsync = true; _connection = ProviderSettings.Endpoints != null && ProviderSettings.Endpoints.Count > 0 ? ProviderSettings.ConnectionFactory.CreateConnection(ProviderSettings.Endpoints) : ProviderSettings.ConnectionFactory.CreateConnection(); - } - catch (global::RabbitMQ.Client.Exceptions.BrokerUnreachableException e) + }, + shouldRetry: (ex, attempt) => { - _logger.LogInformation(e, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", retry, retryCount); - await Task.Delay(ProviderSettings.ConnectionFactory.NetworkRecoveryInterval); - } - } + if (ex is global::RabbitMQ.Client.Exceptions.BrokerUnreachableException && attempt < retryCount) + { + _logger.LogInformation(ex, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", attempt, retryCount); + return true; + } + return false; + }, + delay: ProviderSettings.ConnectionFactory.NetworkRecoveryInterval + ); +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously lock (_channelLock) { diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettings.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettings.cs index 1c3559e9..5868b118 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettings.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettings.cs @@ -13,10 +13,12 @@ public string ConnectionString public ConnectionFactory ConnectionFactory { get; set; } = new() { - NetworkRecoveryInterval = TimeSpan.FromSeconds(5) + NetworkRecoveryInterval = TimeSpan.FromSeconds(5), + // By default the consumer dispatch is single threaded, we can increase it to the number of consumers by applying the .Instances(10) setting + ConsumerDispatchConcurrency = 1 }; - public IList Endpoints { get; set; } = new List(); + public IList Endpoints { get; set; } = []; /// /// Allows to set a custom header values converter between SMB and the underlying RabbitMq client. diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs index 27a01faf..e9ddfe04 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs @@ -76,9 +76,9 @@ private void ProvisionConsumers() } } - private void DeclareQueueBinding(HasProviderExtensions settings, string bindingExchangeName, string queueName) + private void DeclareQueueBinding(AbstractConsumerSettings settings, string bindingExchangeName, string queueName) { - var bindingRoutingKey = settings.GetOrDefault(RabbitMqProperties.BindingRoutingKey, _providerSettings, string.Empty); + var bindingRoutingKey = settings.GetBindingRoutingKey(_providerSettings) ?? string.Empty; _logger.LogInformation("Binding queue {QueueName} to exchange {ExchangeName} using routing key {RoutingKey}", queueName, bindingExchangeName, bindingRoutingKey); try diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index 5f8656cb..6497b80e 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -122,7 +122,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor(instances, this, processor); + processor = new ConcurrentMessageProcessorDecorator(instances, LoggerFactory, processor); } _logger.LogInformation("Creating consumer for redis {PathKind} {Path}", GetPathKindString(pathKind), path); diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs similarity index 74% rename from src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs rename to src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs index 1c8572e6..b02b64fd 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs @@ -1,11 +1,11 @@ namespace SlimMessageBus.Host; /// -/// Decorator for that increases the amount of messages being concurrently processed. +/// Decorator profor that increases the amount of messages being concurrentlycessed. /// The expectation is that will be executed synchronously (in sequential order) by the caller on which we want to increase amount of concurrent transportMessage being processed. /// /// -public sealed class ConcurrencyIncreasingMessageProcessorDecorator : IMessageProcessor, IDisposable +public sealed class ConcurrentMessageProcessorDecorator : IMessageProcessor, IDisposable { private readonly ILogger _logger; private SemaphoreSlim _concurrentSemaphore; @@ -21,13 +21,13 @@ public sealed class ConcurrencyIncreasingMessageProcessorDecorator : I public IReadOnlyCollection ConsumerSettings => _target.ConsumerSettings; - public ConcurrencyIncreasingMessageProcessorDecorator(int concurrency, MessageBusBase messageBus, IMessageProcessor target) + public ConcurrentMessageProcessorDecorator(int concurrency, ILoggerFactory loggerFactory, IMessageProcessor target) { if (target is null) throw new ArgumentNullException(nameof(target)); - if (messageBus is null) throw new ArgumentNullException(nameof(messageBus)); - if (concurrency <= 1) throw new ArgumentOutOfRangeException(nameof(concurrency)); + if (loggerFactory is null) throw new ArgumentNullException(nameof(loggerFactory)); + if (concurrency <= 0) throw new ArgumentOutOfRangeException(nameof(concurrency)); - _logger = messageBus.LoggerFactory.CreateLogger>(); + _logger = loggerFactory.CreateLogger>(); _concurrentSemaphore = new SemaphoreSlim(concurrency); _target = target; } @@ -36,6 +36,11 @@ public ConcurrencyIncreasingMessageProcessorDecorator(int concurrency, MessageBu public void Dispose() { + if (_target is IDisposable targetDisposable) + { + targetDisposable.Dispose(); + } + _concurrentSemaphore?.Dispose(); _concurrentSemaphore = null; } @@ -65,21 +70,16 @@ public async Task ProcessMessage(TMessage transportMessage return new(null, null, null); } - public TMessage GetMessageWithException() - { - lock (_lastExceptionLock) - { - var m = _lastExceptionMessage; - _lastExceptionMessage = default; - return m; - } - } - - public async Task WaitAll() + /// + /// Waits on all processing tasks to finish. + /// + /// + /// + public async Task WaitAll(CancellationToken cancellationToken) { - while (_pendingCount > 0) + while (_pendingCount > 0 && !cancellationToken.IsCancellationRequested) { - await Task.Delay(200).ConfigureAwait(false); + await Task.Delay(200, cancellationToken).ConfigureAwait(false); } } @@ -106,7 +106,7 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti finally { _logger.LogDebug("Leaving ProcessMessages for message {MessageType}", typeof(TMessage)); - _concurrentSemaphore.Release(); + _concurrentSemaphore?.Release(); Interlocked.Decrement(ref _pendingCount); } diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index dcf7af8a..df74b264 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host; -using System; - /// /// Implementation of that performs orchestration around processing of a new message using an instance of the declared consumer ( or interface). /// diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index e5f8fb5d..85d96b9b 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -19,7 +19,7 @@ public ResponseMessageProcessor(ILoggerFactory loggerFactory, RequestResponseSet _logger = loggerFactory.CreateLogger>(); _requestResponseSettings = requestResponseSettings ?? throw new ArgumentNullException(nameof(requestResponseSettings)); _responseConsumer = responseConsumer ?? throw new ArgumentNullException(nameof(responseConsumer)); - _consumerSettings = new List { _requestResponseSettings }; + _consumerSettings = [_requestResponseSettings]; _messagePayloadProvider = messagePayloadProvider ?? throw new ArgumentNullException(nameof(messagePayloadProvider)); } diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs new file mode 100644 index 00000000..5b1fd18b --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs @@ -0,0 +1,60 @@ +namespace SlimMessageBus.Host.RabbitMQ.Test.Consumer; + +using global::RabbitMQ.Client.Events; + +public class RabbitMqAutoAcknowledgeMessageProcessorTests +{ + private readonly Mock> _targetMock; + private readonly Mock _targetDisposableMock; + private readonly Mock _consumerMock; + private readonly BasicDeliverEventArgs _transportMessage; + private readonly RabbitMqAutoAcknowledgeMessageProcessor _subject; + + public RabbitMqAutoAcknowledgeMessageProcessorTests() + { + _targetMock = new Mock>(); + _targetDisposableMock = _targetMock.As(); + _consumerMock = new Mock(); + + _transportMessage = new BasicDeliverEventArgs + { + Exchange = "exchange", + DeliveryTag = 1 + }; + + _subject = new RabbitMqAutoAcknowledgeMessageProcessor(_targetMock.Object, NullLogger.Instance, RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade, _consumerMock.Object); + } + + [Fact] + public void When_Dispose_Then_CallsDisposeOnTarget() + { + // arrange + + // act + _subject.Dispose(); + + // assert + _targetDisposableMock.Verify(x => x.Dispose(), Times.Once); + } + + [Fact] + public async Task When_ProcessMessage_Then_AutoAcknowledge() + { + // arrange + + // act + var result = await _subject.ProcessMessage( + _transportMessage, + new Dictionary(), + null, + null, + CancellationToken.None); + + // assert + _consumerMock.Verify(x => x.ConfirmMessage( + _transportMessage, + RabbitMqMessageConfirmOptions.Ack, + It.IsAny>(), + It.IsAny()), Times.Once); + } +} diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs index ec62d3b5..e8807729 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs @@ -15,7 +15,7 @@ namespace SlimMessageBus.Host.RabbitMQ.Test.IntegrationTests; public class RabbitMqMessageBusIt(ITestOutputHelper testOutputHelper) : BaseIntegrationTest(testOutputHelper) { - private const int NumberOfMessages = 144; + private const int NumberOfMessages = 300; protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { @@ -69,10 +69,13 @@ protected override void SetupServices(ServiceCollection services, IConfiguration public IMessageBus MessageBus => ServiceProvider.GetRequiredService(); [Theory] - [InlineData(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)] - [InlineData(RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)] - [InlineData(RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing)] - public async Task PubSubOnFanoutExchange(RabbitMqMessageAcknowledgementMode acknowledgementMode) + [InlineData(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade, 1)] + [InlineData(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade, 10)] + [InlineData(RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit, 1)] + [InlineData(RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit, 10)] + [InlineData(RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing, 1)] + [InlineData(RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing, 10)] + public async Task PubSubOnFanoutExchange(RabbitMqMessageAcknowledgementMode acknowledgementMode, int consumerConcurrency) { var subscribers = 2; var topic = "test-ping"; @@ -102,13 +105,17 @@ public async Task PubSubOnFanoutExchange(RabbitMqMessageAcknowledgementMode ackn .DeadLetterExchange("subscriber-dlq") .AcknowledgementMode(acknowledgementMode) .WithConsumer() - .WithConsumer()); + .WithConsumer() + .Instances(consumerConcurrency)); })); }); await BasicPubSub(subscribers, additionalAssertion: testData => { testData.ConsumedMessages.Should().AllSatisfy(x => x.ContentType.Should().Be(MediaTypeNames.Application.Json)); + // In the RabbitMQ client there is only one task dispatching the messages to the consumers + // If we leverage SMB to increase concurrency (instances) then each subscriber (2) will be potentially processed in up to 10 tasks concurrently + testData.TestMetric.ProcessingCountMax.Should().Be(consumerConcurrency * subscribers); }); } @@ -118,6 +125,7 @@ public class TestData { public List ProducedMessages { get; set; } public IReadOnlyCollection ConsumedMessages { get; set; } + public TestMetric TestMetric { get; set; } } private async Task BasicPubSub(int expectedMessageCopies, Action additionalAssertion = null) @@ -138,14 +146,10 @@ private async Task BasicPubSub(int expectedMessageCopies, Action addit .Select(i => i % 2 == 0 ? new PingMessage { Counter = i } : new PingDerivedMessage { Counter = i }) .ToList(); - foreach (var producedMessage in producedMessages) - { - // Send them in order - await messageBus.Publish(producedMessage); - } + await Task.WhenAll(producedMessages.Select(x => messageBus.Publish(x))); stopwatch.Stop(); - Logger.LogInformation("Published {0} messages in {1}", producedMessages.Count, stopwatch.Elapsed); + Logger.LogInformation("Published {MessageCount} messages in {Elapsed}", producedMessages.Count, stopwatch.Elapsed); // consume stopwatch.Restart(); @@ -172,7 +176,12 @@ private async Task BasicPubSub(int expectedMessageCopies, Action addit messages.All(x => x.Message.Counter == (int)x.Headers["Counter"]).Should().BeTrue(); messages.All(x => x.Message.Counter % 2 == 0 == (bool)x.Headers["Even"]).Should().BeTrue(); - additionalAssertion?.Invoke(new TestData { ProducedMessages = producedMessages, ConsumedMessages = consumedMessages.Snapshot() }); + additionalAssertion?.Invoke(new TestData + { + ProducedMessages = producedMessages, + ConsumedMessages = consumedMessages.Snapshot(), + TestMetric = testMetric + }); } [Theory] @@ -246,7 +255,7 @@ private async Task BasicReqResp() await Task.WhenAll(responseTasks).ConfigureAwait(false); stopwatch.Stop(); - Logger.LogInformation("Published and received {0} messages in {1}", responses.Count, stopwatch.Elapsed); + Logger.LogInformation("Published and received {MessageCount} messages in {Elapsed}", responses.Count, stopwatch.Elapsed); // assert @@ -269,60 +278,55 @@ public record PingDerivedMessage : PingMessage { } -public class PingConsumer : IConsumer, IConsumerWithContext +public abstract class AbstractPingConsumer : IConsumer, IConsumerWithContext + where T : PingMessage { private readonly ILogger _logger; private readonly TestEventCollector _messages; + private readonly TestMetric _testMetric; - public PingConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) + public AbstractPingConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) { _logger = logger; _messages = messages; + _testMetric = testMetric; testMetric.OnCreatedConsumer(); } public IConsumerContext Context { get; set; } - public async Task OnHandle(PingMessage message) + public async Task OnHandle(T message) { - var transportMessage = Context.GetTransportMessage(); - - _messages.Add(new(message, transportMessage.BasicProperties.MessageId, transportMessage.BasicProperties.ContentType, Context.Headers)); + _testMetric.OnProcessingStart(); + try + { + var transportMessage = Context.GetTransportMessage(); - _logger.LogInformation("Got message {Counter:000} on path {Path}.", message.Counter, Context.Path); + _messages.Add(new(message, transportMessage.BasicProperties.MessageId, transportMessage.BasicProperties.ContentType, Context.Headers)); - await FakeExceptionUtil.SimulateFakeException(message.Counter); - } -} + _logger.LogInformation("Got message {Counter:000} on path {Path}.", message.Counter, Context.Path); -public class PingDerivedConsumer : IConsumer, IConsumerWithContext -{ - private readonly ILogger _logger; - private readonly TestEventCollector _messages; + // simulate work + await Task.Delay(20); - public PingDerivedConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) - { - _logger = logger; - _messages = messages; - testMetric.OnCreatedConsumer(); + await FakeExceptionUtil.SimulateFakeException(message.Counter); + } + finally + { + _testMetric.OnProcessingFinish(); + } } +} - public IConsumerContext Context { get; set; } - - #region Implementation of IConsumer - - public async Task OnHandle(PingDerivedMessage message) - { - var transportMessage = Context.GetTransportMessage(); - - _messages.Add(new(message, transportMessage.BasicProperties.MessageId, transportMessage.BasicProperties.ContentType, Context.Headers)); - - _logger.LogInformation("Got message {Counter:000} on path {Path}.", message.Counter, Context.Path); - await FakeExceptionUtil.SimulateFakeException(message.Counter); - } +public class PingConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) + : AbstractPingConsumer(logger, messages, testMetric) +{ +} - #endregion +public class PingDerivedConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) + : AbstractPingConsumer(logger, messages, testMetric) +{ } public record EchoRequest : IRequest diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Usings.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Usings.cs index 398d3543..e809db9c 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Usings.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Usings.cs @@ -5,6 +5,7 @@ global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Logging.Abstractions; global using Moq; diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestMetric.cs b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestMetric.cs index f46d39ce..b5d27009 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestMetric.cs +++ b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestMetric.cs @@ -3,8 +3,37 @@ public class TestMetric { private long _createdConsumerCount; + private long _processingCount; + private long _processingCountMax; + private readonly object _lock = new(); public long CreatedConsumerCount => Interlocked.Read(ref _createdConsumerCount); + public long ProcessingCountMax => Interlocked.Read(ref _processingCountMax); + public void OnCreatedConsumer() => Interlocked.Increment(ref _createdConsumerCount); + + public void OnProcessingStart() + { + Interlocked.Increment(ref _processingCount); + lock (_lock) + { + if (_processingCount > _processingCountMax) + { + _processingCountMax = _processingCount; + } + } + } + + public void OnProcessingFinish() + { + lock (_lock) + { + if (_processingCount > _processingCountMax) + { + _processingCountMax = _processingCount; + } + } + Interlocked.Decrement(ref _processingCount); + } } diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrencyIncreasingMessageProcessorDecoratorTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrencyIncreasingMessageProcessorDecoratorTest.cs deleted file mode 100644 index c93f79f7..00000000 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrencyIncreasingMessageProcessorDecoratorTest.cs +++ /dev/null @@ -1,74 +0,0 @@ -namespace SlimMessageBus.Host.Test.Consumer; - -public class ConcurrencyIncreasingMessageProcessorDecoratorTest -{ - private readonly MessageBusMock _busMock; - private readonly Mock> _messageProcessorMock; - private ConcurrencyIncreasingMessageProcessorDecorator _subject; - - public ConcurrencyIncreasingMessageProcessorDecoratorTest() - { - _busMock = new MessageBusMock(); - _messageProcessorMock = new Mock>(); - } - - [Theory] - [InlineData(10, 40)] - [InlineData(2, 40)] - public async Task When_ProcessMessage_Given_NMessagesAndConcurrencySetToC_Then_NMethodInvocationsHappenOnTargetWithCConcurrently(int concurrency, int expectedMessageCount) - { - // arrange - _subject = new ConcurrencyIncreasingMessageProcessorDecorator(concurrency, _busMock.Bus, _messageProcessorMock.Object); - - var currentSectionCount = 0; - var maxSectionCount = 0; - var maxSectionCountLock = new object(); - var messageCount = 0; - - _messageProcessorMock - .Setup(x => x.ProcessMessage(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) - .Returns(async () => - { - // Entering critical section - Interlocked.Increment(ref currentSectionCount); - - // Simulate work - await Task.Delay(50); - - Interlocked.Increment(ref messageCount); - - lock (maxSectionCountLock) - { - if (currentSectionCount > maxSectionCount) - { - maxSectionCount = currentSectionCount; - } - } - - // Simulate work - await Task.Delay(50); - - // Leaving critical section - Interlocked.Decrement(ref currentSectionCount); - return new(null, null, null); - }); - - // act - var msg = new SomeMessage(); - var msgHeaders = new Dictionary(); - for (var i = 0; i < expectedMessageCount; i++) - { - // executed in sequence - await _subject.ProcessMessage(msg, msgHeaders, default); - } - - // assert - while (_subject.PendingCount > 0) - { - await Task.Delay(100); - } - - messageCount.Should().Be(expectedMessageCount); - maxSectionCount.Should().Be(concurrency); - } -} diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs new file mode 100644 index 00000000..fabd0748 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs @@ -0,0 +1,162 @@ +namespace SlimMessageBus.Host.Test.Consumer; + +public class ConcurrentMessageProcessorDecoratorTest +{ + private readonly MessageBusMock _busMock; + private readonly Mock> _messageProcessorMock; + + public ConcurrentMessageProcessorDecoratorTest() + { + _busMock = new MessageBusMock(); + _messageProcessorMock = new Mock>(); + } + + [Fact] + public void When_Dispose_Then_CallsDisposeOnTarget() + { + // arrange + var targetDisposableMock = _messageProcessorMock.As(); + var subject = new ConcurrentMessageProcessorDecorator(1, NullLoggerFactory.Instance, _messageProcessorMock.Object); + + // act + subject.Dispose(); + + // assert + targetDisposableMock.Verify(x => x.Dispose(), Times.Once); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task When_WaitAll_Then_WaitsOnAllPendingMessageProcessToFinish(bool cancelAwait) + { + // arrange + _messageProcessorMock + .Setup(x => x.ProcessMessage( + It.IsAny(), + It.IsAny>(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(async () => + { + await Task.Delay(TimeSpan.FromSeconds(1)); + return new(null, null, null); + }); + + var subject = new ConcurrentMessageProcessorDecorator(1, NullLoggerFactory.Instance, _messageProcessorMock.Object); + + await subject.ProcessMessage(new SomeMessage(), new Dictionary(), default); + + using var cts = new CancellationTokenSource(); + + if (cancelAwait) + { + cts.CancelAfter(TimeSpan.FromMilliseconds(100)); + } + + // act + var waitAll = () => subject.WaitAll(cts.Token); + + // assert + if (cancelAwait) + { + await waitAll.Should().ThrowAsync(); + } + else + { + await waitAll.Should().NotThrowAsync(); + } + _messageProcessorMock + .Verify(x => x.ProcessMessage( + It.IsAny(), + It.IsAny>(), + It.IsAny>(), + It.IsAny(), + It.IsAny()), + cancelAwait ? Times.Once : Times.Once); + } + + [Theory] + [InlineData(10, 40)] + [InlineData(2, 40)] + public async Task When_ProcessMessage_Given_NMessagesAndConcurrencySetToC_Then_NMethodInvocationsHappenOnTargetWithCConcurrently(int concurrency, int expectedMessageCount) + { + // arrange + var subject = new ConcurrentMessageProcessorDecorator(concurrency, NullLoggerFactory.Instance, _messageProcessorMock.Object); + + var currentSectionCount = 0; + var maxSectionCount = 0; + var maxSectionCountLock = new object(); + var messageCount = 0; + + _messageProcessorMock + .Setup(x => x.ProcessMessage(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .Returns(async () => + { + // Entering critical section + Interlocked.Increment(ref currentSectionCount); + + // Simulate work + await Task.Delay(50); + + Interlocked.Increment(ref messageCount); + + lock (maxSectionCountLock) + { + if (currentSectionCount > maxSectionCount) + { + maxSectionCount = currentSectionCount; + } + } + + // Simulate work + await Task.Delay(50); + + // Leaving critical section + Interlocked.Decrement(ref currentSectionCount); + return new(null, null, null); + }); + + // act + var msg = new SomeMessage(); + var msgHeaders = new Dictionary(); + for (var i = 0; i < expectedMessageCount; i++) + { + // executed in sequence + await subject.ProcessMessage(msg, msgHeaders, default); + } + + // assert + while (subject.PendingCount > 0) + { + await Task.Delay(100); + } + + messageCount.Should().Be(expectedMessageCount); + maxSectionCount.Should().Be(concurrency); + } + + [Fact] + public async Task When_ProcessMessage_Given_ExceptionHappensOnTarget_Then_ExceptionIsReportedOnSecondInvocation() + { + // arrange + var subject = new ConcurrentMessageProcessorDecorator(1, NullLoggerFactory.Instance, _messageProcessorMock.Object); + + var exception = new Exception("Boom!"); + + _messageProcessorMock + .Setup(x => x.ProcessMessage(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new ProcessMessageResult(exception, null, null)); + + var msg = new SomeMessage(); + var msgHeaders = new Dictionary(); + await subject.ProcessMessage(msg, msgHeaders, default); + + // act + var result = await subject.ProcessMessage(msg, msgHeaders, default); + + // assert + result.Exception.Should().Be(exception); + } +}