From 1365cb3c6022303481ac34e21a65c6d79c7d603b Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Sun, 26 Nov 2023 22:31:19 -0500 Subject: [PATCH] [Host.AzureServiceBus] Ability to specify SubscriptionName and message modifier default on whole bus level #185 Signed-off-by: Tomasz Maruszak --- docs/provider_azure_servicebus.md | 39 +++++++- src/Host.Plugin.Properties.xml | 2 +- .../AsbAbstractConsumerSettingsExtensions.cs | 11 ++- .../AsbHasProviderExtensionsExtensions.cs | 7 +- .../Config/AsbProducerBuilderExtensions.cs | 14 +-- .../Config/Delegates.cs | 3 + .../ServiceBusMessageBus.cs | 33 ++++--- .../ServiceBusMessageBusSettings.cs | 52 ++++++++++- ...eBusMessageBusSettingsValidationService.cs | 2 +- .../ServiceBusTopologyService.cs | 2 +- ...SlimMessageBus.Host.AzureServiceBus.csproj | 6 ++ .../Builders/MessageBusBuilder.cs | 39 +++++--- .../Settings/MessageBusSettings.cs | 5 +- .../SlimMessageBus.Host.Configuration.csproj | 2 +- .../Consumers/RedisTopicConsumer.cs | 8 +- .../RedisMessageBus.cs | 29 ++---- src/SlimMessageBus.Host.Redis/RedisUtils.cs | 11 +++ .../Services/MessageHeaderService.cs | 16 ++-- .../ServiceBusMessageBusIt.cs | 12 +-- .../ServiceBusMessageBusSettingsTests.cs | 92 +++++++++++++++++++ .../ServiceBusMessageBusTests.cs | 14 +-- .../MemoryMessageBusIt.cs | 6 +- .../IntegrationTests/RabbitMqMessageBusIt.cs | 2 +- .../RedisMessageBusIt.cs | 8 +- .../Collections/SafeDictionaryWrapperTest.cs | 4 +- .../Helpers/ReflectionUtilsTests.cs | 2 +- .../MessageBusBaseTests.cs | 8 +- .../Services/MessageHeaderServiceTests.cs | 2 +- 28 files changed, 316 insertions(+), 115 deletions(-) create mode 100644 src/SlimMessageBus.Host.AzureServiceBus/Config/Delegates.cs create mode 100644 src/SlimMessageBus.Host.Redis/RedisUtils.cs create mode 100644 src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusSettingsTests.cs diff --git a/docs/provider_azure_servicebus.md b/docs/provider_azure_servicebus.md index ba71e10b..68ca66e0 100644 --- a/docs/provider_azure_servicebus.md +++ b/docs/provider_azure_servicebus.md @@ -5,7 +5,9 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Configuration](#configuration) - [Producing Messages](#producing-messages) - [Message modifier](#message-modifier) + - [Global message modifier](#global-message-modifier) - [Consuming Messages](#consuming-messages) + - [Default Subscription Name](#default-subscription-name) - [Consumer context](#consumer-context) - [Exception Handling for Consumers](#exception-handling-for-consumers) - [Transport Specific Settings](#transport-specific-settings) @@ -45,7 +47,7 @@ To produce a given `TMessage` to a Azure Serivce Bus queue (or topic) use: ```cs // send TMessage to Azure SB queues -mbb.Produce(x => x.UseQueue()); +mbb.Produce(x => x.UseQueue()); // OR @@ -74,7 +76,7 @@ The second (`path`) parameter indicates a queue or topic name - depending on the When the default queue (or topic) path is configured for a message type: ```cs -mbb.Produce(x => x.DefaultQueue("some-queue")); +mbb.Produce(x => x.DefaultQueue("some-queue")); // OR @@ -113,6 +115,23 @@ mbb.Produce(x => > Since version 1.15.5 the Azure SB client was updated, so the native message type is now `Azure.Messaging.ServiceBus.ServiceBusMessage` (it used to be `Azure.ServiceBus.Message`). +#### Global message modifier + +The message modifier can also be applied for all ASB producers: + +```cs +mbb.WithProviderServiceBus(cfg => +{ + // producers will inherit this + cfg.WithModifier((message, sbMessage) => + { + sbMessage.ApplicationProperties["Source"] = "customer-service"; + }); +}); +``` + +> The global message modifier will be executed first, then the producer specific modifier next. + ## Consuming Messages To consume `TMessage` by `TConsumer` from `some-topic` Azure Service Bus topic use: @@ -120,7 +139,7 @@ To consume `TMessage` by `TConsumer` from `some-topic` Azure Service Bus topic u ```cs mbb.Consume(x => x .Topic("some-topic") - .SubscriptionName("subscriber-name") + .SubscriptionName("subscriber-name") .WithConsumer() .Instances(1)); ``` @@ -136,6 +155,20 @@ mbb.Consume(x => x .Instances(1)); ``` +### Default Subscription Name + +A default subscription name can be provided that will be applied for all ASB topic consumers (subscribers): + +```cs +mbb.WithProviderServiceBus(cfg => +{ + // consumers will inherit this + cfg.SubscriptionName("customer-service"); +}); +``` + +That way, the subscription name does not have to be repeated on each topic consumer (if it were to be the same). + ### Consumer context The consumer can implement the `IConsumerWithContext` interface to access the Azure Service Bus native message: diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index f3d08c80..776c34e9 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -5,7 +5,7 @@ netstandard2.1;net6.0;net8.0 - 2.2.0-rc3 + 2.2.0-rc4 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs index 28206deb..eeaab5e1 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs @@ -2,20 +2,21 @@ public static class AsbAbstractConsumerSettingsExtensions { - static internal void SetSubscriptionName(this AbstractConsumerSettings consumerSettings, string subscriptionName) + static internal void SetSubscriptionName(this HasProviderExtensions consumerSettings, string subscriptionName) { if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName)); consumerSettings.Properties[AsbProperties.SubscriptionNameKey] = subscriptionName; } - static internal string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, bool required = true) + static internal string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, ServiceBusMessageBusSettings providerSettings) { - if (!consumerSettings.Properties.ContainsKey(AsbProperties.SubscriptionNameKey) && !required) + if (consumerSettings.PathKind == PathKind.Topic) { - return null; + return consumerSettings.GetOrDefault(AsbProperties.SubscriptionNameKey, providerSettings, null) + ?? throw new ConfigurationMessageBusException($"SubscriptionName was not configured for topic {consumerSettings.Path}"); } - return consumerSettings.Properties[AsbProperties.SubscriptionNameKey] as string; + return null; } static internal void SetMaxAutoLockRenewalDuration(this AbstractConsumerSettings consumerSettings, TimeSpan duration) diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbHasProviderExtensionsExtensions.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbHasProviderExtensionsExtensions.cs index 4d3fcfea..d1dd4063 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbHasProviderExtensionsExtensions.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbHasProviderExtensionsExtensions.cs @@ -1,19 +1,18 @@ namespace SlimMessageBus.Host.AzureServiceBus; -using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; static internal class AsbHasProviderExtensionsExtensions { - static internal HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, Action messageModifierAction) + static internal HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, AsbMessageModifier messageModifierAction) { producerSettings.Properties[nameof(SetMessageModifier)] = messageModifierAction; return producerSettings; } - static internal Action GetMessageModifier(this HasProviderExtensions producerSettings) + static internal AsbMessageModifier GetMessageModifier(this HasProviderExtensions producerSettings) { - return producerSettings.GetOrDefault>(nameof(SetMessageModifier), null); + return producerSettings.GetOrDefault>(nameof(SetMessageModifier), null); } static internal HasProviderExtensions SetQueueOptions(this HasProviderExtensions producerSettings, Action optionsAction) diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbProducerBuilderExtensions.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbProducerBuilderExtensions.cs index 8d389ed3..70fbec0d 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbProducerBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbProducerBuilderExtensions.cs @@ -1,6 +1,5 @@ namespace SlimMessageBus.Host.AzureServiceBus; -using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; public static class AsbProducerBuilderExtensions @@ -44,21 +43,18 @@ public static ProducerBuilder ToQueue(this ProducerBuilder producerBuil } /// - /// Allows to set additional properties to the native when producing the message. + /// Allows to set additional properties to the native when producing the message. /// /// /// - /// + /// /// - public static ProducerBuilder WithModifier(this ProducerBuilder producerBuilder, Action modifierAction) + public static ProducerBuilder WithModifier(this ProducerBuilder producerBuilder, AsbMessageModifier modifier) { if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); - if (modifierAction is null) throw new ArgumentNullException(nameof(modifierAction)); + if (modifier is null) throw new ArgumentNullException(nameof(modifier)); - producerBuilder.Settings.SetMessageModifier((e, m) => - { - modifierAction((T)e, m); - }); + producerBuilder.Settings.SetMessageModifier((e, m) => modifier((T)e, m)); return producerBuilder; } diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/Delegates.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/Delegates.cs new file mode 100644 index 00000000..9f5edd3d --- /dev/null +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/Delegates.cs @@ -0,0 +1,3 @@ +namespace SlimMessageBus.Host.AzureServiceBus; + +public delegate void AsbMessageModifier(T message, ServiceBusMessage transportMessage); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs index 0c0e6c65..608a8a70 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs @@ -26,6 +26,7 @@ public override async Task ProvisionTopology() await provisioningService.ProvisionTopology(); // provisining happens asynchronously } + #region Overrides of MessageBusBase protected override void Build() @@ -62,7 +63,9 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso static void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx) => ctx.SetTransportMessage(m); - foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(required: false))).ToDictionary(x => x.Key, x => x.ToList())) + foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers + .GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings))) + .ToDictionary(x => x.Key, x => x.ToList())) { var topicSubscription = new TopicSubscriptionParams(path: path, subscriptionName: subscriptionName); var messageProcessor = new MessageProcessor( @@ -78,7 +81,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso if (Settings.RequestResponse != null) { - var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(required: false)); + var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(ProviderSettings)); var messageProcessor = new ResponseMessageProcessor( LoggerFactory, Settings.RequestResponse, @@ -130,18 +133,13 @@ protected override async Task ProduceToTransport(object message, string path, by } } + // global modifier first + InvokeMessageModifier(message, messageType, m, ProviderSettings); if (messageType != null) { + // local producer modifier second var producerSettings = GetProducerSettings(messageType); - try - { - var messageModifier = producerSettings.GetMessageModifier(); - messageModifier?.Invoke(message, m); - } - catch (Exception e) - { - _logger.LogWarning(e, "The configured message modifier failed for message type {MessageType} and message {Message}", messageType, message); - } + InvokeMessageModifier(message, messageType, m, producerSettings); } var senderClient = _producerByPath.GetOrAdd(path); @@ -161,6 +159,19 @@ protected override async Task ProduceToTransport(object message, string path, by } } + private void InvokeMessageModifier(object message, Type messageType, ServiceBusMessage m, HasProviderExtensions settings) + { + try + { + var messageModifier = settings.GetMessageModifier(); + messageModifier?.Invoke(message, m); + } + catch (Exception e) + { + _logger.LogWarning(e, "The configured message modifier failed for message type {MessageType} and message {Message}", messageType, message); + } + } + public override Task ProduceRequest(object request, IDictionary requestHeaders, string path, ProducerSettings producerSettings) { if (requestHeaders is null) throw new ArgumentNullException(nameof(requestHeaders)); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettings.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettings.cs index f0777c8f..6fa4387a 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettings.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettings.cs @@ -3,7 +3,7 @@ using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; -public class ServiceBusMessageBusSettings +public class ServiceBusMessageBusSettings : HasProviderExtensions { public string ConnectionString { get; set; } public Func ClientFactory { get; set; } @@ -74,4 +74,54 @@ public ServiceBusMessageBusSettings(string serviceBusConnectionString) { ConnectionString = serviceBusConnectionString; } + + /// + /// Configures the default subscription name when consuming form Azure ServiceBus topic. + /// + /// + /// + public ServiceBusMessageBusSettings SubscriptionName(string subscriptionName) + { + if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName)); + + this.SetSubscriptionName(subscriptionName); + return this; + } + + /// + /// Allows to set additional properties to the native when producing the any message. + /// + /// + /// Should the previously set modifier be executed as well? + /// + public ServiceBusMessageBusSettings WithModifier(AsbMessageModifier modifier, bool executePrevious = true) + { + if (modifier is null) throw new ArgumentNullException(nameof(modifier)); + + var previousModifier = executePrevious ? this.GetMessageModifier() : null; + this.SetMessageModifier(previousModifier == null + ? modifier + : (message, transportMessage) => + { + previousModifier(message, transportMessage); + modifier(message, transportMessage); + }); + return this; + } + + /// + /// Allows to set additional properties to the native when producing the any message. + /// + /// + /// Should the previously set modifier be executed as well? + /// + public ServiceBusMessageBusSettings WithModifier(AsbMessageModifier modifier, bool executePrevious = true) + => WithModifier((message, transportMessage) => + { + if (message is T typedMessage) + { + modifier(typedMessage, transportMessage); + } + }, + executePrevious); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettingsValidationService.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettingsValidationService.cs index 96679fef..1dd79690 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettingsValidationService.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBusSettingsValidationService.cs @@ -24,7 +24,7 @@ protected override void AssertConsumer(ConsumerSettings consumerSettings) { base.AssertConsumer(consumerSettings); - if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(required: false))) + if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(ProviderSettings))) { ThrowConsumerFieldNotSet(consumerSettings, nameof(AsbConsumerBuilderExtensions.SubscriptionName)); } diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs index c23afaa9..decaf2f8 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs @@ -173,7 +173,7 @@ await TryCreateQueue(path, topologyProvisioning.CanConsumerCreateQueue, options if ((topicStatus & TopologyCreationStatus.Exists) != 0) { var consumerSettingsBySubscription = consumerSettingsList - .Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(required: false))) + .Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(providerSettings))) .Where(x => x.SubscriptionName != null) .ToDictionary(x => x.SubscriptionName, x => x.ConsumerSettings); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj b/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj index 1924695a..e6253c4d 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj +++ b/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj @@ -18,4 +18,10 @@ + + + <_Parameter1>SlimMessageBus.Host.AzureServiceBus.Test + + + diff --git a/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs index 3e53dbfd..0124b395 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs @@ -264,27 +264,38 @@ public MessageBusBuilder WithMessageTypeResolver(Type messageTypeResolverType) public MessageBusBuilder WithMessageTypeResolver() => WithMessageTypeResolver(typeof(T)); /// - /// Hook called whenver message is being produced. Can be used to add (or mutate) message headers. + /// Hook called whenver message is being produced. Can be used to change message headers. /// - public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier headerModifier) => WithHeaderModifier(headerModifier); - - /// - /// Hook called whenver message is being produced. Can be used to add (or mutate) message headers. - /// - public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier headerModifier) + /// Should the previously set modifier be executed as well? + public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier headerModifier, bool executePrevious = true) { if (headerModifier == null) throw new ArgumentNullException(nameof(headerModifier)); - Settings.HeaderModifier = (headers, message) => - { - if (message is T typedMessage) - { - headerModifier(headers, typedMessage); - } - }; + var previousHeaderModifier = executePrevious ? Settings.HeaderModifier : null; + Settings.HeaderModifier = previousHeaderModifier == null + ? headerModifier + : (headers, message) => + { + previousHeaderModifier(headers, message); + headerModifier(headers, message); + }; return this; } + /// + /// Hook called whenver message is being produced. Can be used to change message headers. + /// + /// Should the previously set modifier be executed as well? + public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier headerModifier, bool executePrevious = true) + => WithHeaderModifier((headers, message) => + { + if (message is T typedMessage) + { + headerModifier(headers, typedMessage); + } + }, + executePrevious); + /// /// Enables or disabled the auto statrt of message consumption upon bus creation. If false, then you need to call the .Start() on the bus to start consuming messages. /// diff --git a/src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs b/src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs index bcae8dc2..e9a68bf7 100644 --- a/src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs +++ b/src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs @@ -22,7 +22,7 @@ public IServiceProvider ServiceProvider public IList Consumers { get; } public RequestResponseSettings RequestResponse { get; set; } public Type SerializerType { get; set; } - public Type MessageTypeResolverType { get; set; } + public Type MessageTypeResolverType { get; set; } /// /// Determines if a child scope is created for the message consumption. The consumer instance is then derived from that scope. @@ -30,9 +30,8 @@ public IServiceProvider ServiceProvider public bool? IsMessageScopeEnabled { get; set; } /// - /// Hook called whenver message is being produced. Can be used to add (or mutate) message headers. + /// Hook called whenever message is being produced. Can be used to change message headers. /// - // ToDo: Support many modifiers public MessageHeaderModifier HeaderModifier { get; set; } /// diff --git a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj index 760cdaa2..382d39b2 100644 --- a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj +++ b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj @@ -7,7 +7,7 @@ Core configuration interfaces of SlimMessageBus SlimMessageBus SlimMessageBus.Host - 2.0.5-rc2 + 2.0.5-rc3 diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs index 81529142..0388fa7d 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs @@ -5,7 +5,7 @@ public class RedisTopicConsumer : AbstractConsumer, IRedisConsumer private readonly ISubscriber _subscriber; private readonly IMessageSerializer _envelopeSerializer; private ChannelMessageQueue _channelMessageQueue; - private IMessageProcessor _messageProcessor; + private readonly IMessageProcessor _messageProcessor; public string Path { get; } @@ -20,7 +20,9 @@ public RedisTopicConsumer(ILogger logger, string topic, ISub protected override async Task OnStart() { - _channelMessageQueue = await _subscriber.SubscribeAsync(Path).ConfigureAwait(false); + // detect if wildcard is used + var channel = RedisUtils.ToRedisChannel(Path); + _channelMessageQueue = await _subscriber.SubscribeAsync(channel).ConfigureAwait(false); _channelMessageQueue.OnMessage(OnMessage); } @@ -53,4 +55,4 @@ private async Task OnMessage(ChannelMessage m) Logger.LogError(exception, "Error occured while processing the redis channel {Topic}", Path); } } -} \ No newline at end of file +} diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index 507155a2..76fd1462 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -113,11 +113,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor(instances, this, processor); } - _logger.LogInformation( - pathKind == PathKind.Topic - ? "Creating consumer for redis channel {Path}" - : "Creating consumer for redis list {Path}", - path); + _logger.LogInformation("Creating consumer for redis {PathKind} {Path}", GetPathKindString(pathKind), path); if (pathKind == PathKind.Topic) { AddTopicConsumer(path, subscriber, processor); @@ -130,12 +126,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor(LoggerFactory, Settings.RequestResponse, this, messagePayloadProvider: m => m.Payload)); @@ -152,6 +143,8 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor pathKind == PathKind.Topic ? "channel" : "list"; + #region Overrides of MessageBusBase protected override Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders = null, CancellationToken cancellationToken = default) @@ -177,19 +170,15 @@ protected async virtual Task ProduceToTransport(Type messageType, object message var messageWithHeadersBytes = ProviderSettings.EnvelopeSerializer.Serialize(typeof(MessageWithHeaders), messageWithHeaders); _logger.LogDebug( - kind == PathKind.Topic - ? "Producing message {Message} of type {MessageType} to redis channel {Path} with size {MessageSize}" - : "Producing message {Message} of type {MessageType} to redis list {Path} with size {MessageSize}", - message, messageType.Name, path, messageWithHeadersBytes.Length); + "Producing message {Message} of type {MessageType} to redis {PathKind} {Path} with size {MessageSize}", + message, messageType.Name, GetPathKindString(kind), path, messageWithHeadersBytes.Length); var result = kind == PathKind.Topic - ? await Database.PublishAsync(path, messageWithHeadersBytes).ConfigureAwait(false) // Use Redis Pub/Sub + ? await Database.PublishAsync(RedisUtils.ToRedisChannel(path), messageWithHeadersBytes).ConfigureAwait(false) // Use Redis Pub/Sub : await Database.ListRightPushAsync(path, messageWithHeadersBytes).ConfigureAwait(false); // Use Redis List Type (append on the right side/end of list) _logger.LogDebug( - kind == PathKind.Topic - ? "Produced message {Message} of type {MessageType} to redis channel {Path} with result {RedisResult}" - : "Produced message {Message} of type {MessageType} to redis list {Path} with result {RedisResult}", - message, messageType, path, result); + "Produced message {Message} of type {MessageType} to redis channel {PathKind} {Path} with result {RedisResult}", + message, messageType, GetPathKindString(kind), path, result); } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Redis/RedisUtils.cs b/src/SlimMessageBus.Host.Redis/RedisUtils.cs new file mode 100644 index 00000000..a304f417 --- /dev/null +++ b/src/SlimMessageBus.Host.Redis/RedisUtils.cs @@ -0,0 +1,11 @@ +namespace SlimMessageBus.Host.Redis; + +static internal class RedisUtils +{ + /// + /// Detect if wildcard is used + /// + /// + /// + static internal RedisChannel ToRedisChannel(string path) => path.Contains('*') ? RedisChannel.Pattern(path) : RedisChannel.Literal(path); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Services/MessageHeaderService.cs b/src/SlimMessageBus.Host/Services/MessageHeaderService.cs index c646399c..f6877245 100644 --- a/src/SlimMessageBus.Host/Services/MessageHeaderService.cs +++ b/src/SlimMessageBus.Host/Services/MessageHeaderService.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host.Services; -using System.Runtime; - internal interface IMessageHeaderService { void AddMessageHeaders(IDictionary messageHeaders, IDictionary headers, object message, ProducerSettings producerSettings); @@ -34,19 +32,19 @@ public void AddMessageHeaders(IDictionary messageHeaders, IDicti AddMessageTypeHeader(message, messageHeaders); - if (_settings.HeaderModifier != null) - { - // Call header hook - _logger.LogTrace($"Executing bus {nameof(MessageBusSettings.HeaderModifier)}"); - _settings.HeaderModifier(messageHeaders, message); - } - if (producerSettings.HeaderModifier != null) { // Call header hook _logger.LogTrace($"Executing producer {nameof(ProducerSettings.HeaderModifier)}"); producerSettings.HeaderModifier(messageHeaders, message); } + + if (_settings.HeaderModifier != null) + { + // Call header hook + _logger.LogTrace($"Executing bus {nameof(MessageBusSettings.HeaderModifier)}"); + _settings.HeaderModifier(messageHeaders, message); + } } public void AddMessageTypeHeader(object message, IDictionary headers) diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs index 1e7955d3..fa011f9c 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs @@ -75,7 +75,7 @@ public async Task BasicPubSubOnTopic() })); }); - await BasicPubSub(concurrency, subscribers, subscribers).ConfigureAwait(false); + await BasicPubSub(concurrency, subscribers, subscribers); } [Fact] @@ -94,7 +94,7 @@ public async Task BasicPubSubOnQueue() .WithConsumer() .Instances(concurrency)); }); - await BasicPubSub(concurrency, 1, 1).ConfigureAwait(false); + await BasicPubSub(concurrency, 1, 1); } private static string GetMessageId(PingMessage message) => $"ID_{message.Counter}"; @@ -187,7 +187,7 @@ public async Task BasicReqRespOnTopic() }); }); - await BasicReqResp().ConfigureAwait(false); + await BasicReqResp(); } [Fact] @@ -210,7 +210,7 @@ public async Task BasicReqRespOnQueue() x.DefaultTimeout(TimeSpan.FromSeconds(60)); }); }); - await BasicReqResp().ConfigureAwait(false); + await BasicReqResp(); } private async Task BasicReqResp() @@ -262,7 +262,7 @@ public async Task FIFOUsingSessionsOnQueue() .Instances(concurrency) .EnableSession(x => x.MaxConcurrentSessions(10).SessionIdleTimeout(TimeSpan.FromSeconds(5)))); }); - await BasicPubSub(concurrency, 1, 1, CheckMessagesWithinSameSessionAreInOrder).ConfigureAwait(false); + await BasicPubSub(concurrency, 1, 1, CheckMessagesWithinSameSessionAreInOrder); } private static void CheckMessagesWithinSameSessionAreInOrder(TestData testData) @@ -295,7 +295,7 @@ public async Task FIFOUsingSessionsOnTopic() .EnableSession(x => x.MaxConcurrentSessions(10).SessionIdleTimeout(TimeSpan.FromSeconds(5)))); }); - await BasicPubSub(concurrency, 1, 1, CheckMessagesWithinSameSessionAreInOrder).ConfigureAwait(false); + await BasicPubSub(concurrency, 1, 1, CheckMessagesWithinSameSessionAreInOrder); } } diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusSettingsTests.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusSettingsTests.cs new file mode 100644 index 00000000..66fb2a8a --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusSettingsTests.cs @@ -0,0 +1,92 @@ +namespace SlimMessageBus.Host.AzureServiceBus.Test; + +using Azure.Messaging.ServiceBus; + +using SlimMessageBus.Host; + +public class ServiceBusMessageBusSettingsTests +{ + private readonly ServiceBusMessageBusSettings _subject = new(); + + [Fact] + public void When_SetSubscriptionName_Given_SubscriptionNameIsNull_Then_ThrowsException() + { + // arrange + + // act + var act = () => _subject.SubscriptionName(null); + + // assert + act.Should().Throw(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void When_GetSubscriptionName_Given_SubscriptionNameAtTheBusLevelOrConsumerLevel_Then_DefaultToBusLevelOrTakesConsumerValue(bool consumerSubscriptionNameApplied) + { + // arrange + var busSubscriptionName = "global-sub-name"; + _subject.SubscriptionName(busSubscriptionName); + + var consumerBuilder = new ConsumerBuilder(new MessageBusSettings()); + + var consumerSubscriptionName = "consumer-sub-name"; + if (consumerSubscriptionNameApplied) + { + consumerBuilder.SubscriptionName(consumerSubscriptionName); + } + + // act + var subscriptionName = consumerBuilder.ConsumerSettings.GetSubscriptionName(_subject); + + // assert + subscriptionName.Should().Be(consumerSubscriptionNameApplied ? consumerSubscriptionName : busSubscriptionName); + } + + [Fact] + public void When_WithModifier_Given_NullValue_Then_ThrowsException() + { + // arrange + + // act + var act = () => _subject.WithModifier(null); + + // assert + act.Should().Throw(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void When_WithModifier_Given_SeveralModifiers_Then_ExecutesThemAll(bool executePrevious) + { + // arrange + var someMessage = new SomeMessage(); + var transportMessage = new Mock(); + + var modifier1 = new Mock>(); + var modifier2 = new Mock>(); + var modifier3 = new Mock>(); + var modifier4 = new Mock>(); + + _subject.WithModifier(modifier1.Object, executePrevious: executePrevious); + _subject.WithModifier(modifier2.Object, executePrevious: executePrevious); + _subject.WithModifier(modifier3.Object, executePrevious: executePrevious); + _subject.WithModifier(modifier4.Object, executePrevious: executePrevious); + + var modifier = _subject.GetMessageModifier(); + + // act + modifier(someMessage, transportMessage.Object); + + // assert + if (executePrevious) + { + modifier1.Verify(x => x(someMessage, transportMessage.Object), Times.Once); + modifier2.Verify(x => x(someMessage, transportMessage.Object), Times.Once); + } + modifier3.Verify(x => x(It.IsAny(), transportMessage.Object), Times.Never); + modifier4.Verify(x => x(someMessage, transportMessage.Object), Times.Once); + } +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs index aef62b7c..e053036f 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs @@ -82,8 +82,8 @@ public async Task WhenPublishGivenModifierConfiguredForMessageTypeThenModifierEx var m2 = new SomeMessage { Id = "2", Value = 3 }; // act - await ProviderBus.Value.Publish(m1).ConfigureAwait(false); - await ProviderBus.Value.Publish(m2).ConfigureAwait(false); + await ProviderBus.Value.Publish(m1); + await ProviderBus.Value.Publish(m2); // assert var topicClient = SenderMockByPath["default-topic"]; @@ -104,7 +104,7 @@ public async Task When_Publish_Given_ModifierConfiguredForMessageTypeThatThrowsE var m = new SomeMessage { Id = "1", Value = 10 }; // act - await ProviderBus.Value.Publish(m).ConfigureAwait(false); + await ProviderBus.Value.Publish(m); // assert SenderMockByPath["default-topic"].Verify(x => x.SendMessageAsync(It.IsAny(), It.IsAny()), Times.Once); @@ -154,10 +154,10 @@ public async Task When_Publish_Then_TopicClientOrQueueClientIsCreatedForTopicNam var om2 = new OtherMessage { Id = "2" }; // act - await ProviderBus.Value.Publish(sm1, "some-topic").ConfigureAwait(false); - await ProviderBus.Value.Publish(sm2, "some-topic").ConfigureAwait(false); - await ProviderBus.Value.Publish(om1, "some-queue").ConfigureAwait(false); - await ProviderBus.Value.Publish(om2, "some-queue").ConfigureAwait(false); + await ProviderBus.Value.Publish(sm1, "some-topic"); + await ProviderBus.Value.Publish(sm2, "some-topic"); + await ProviderBus.Value.Publish(om1, "some-queue"); + await ProviderBus.Value.Publish(om2, "some-queue"); // assert SenderMockByPath.Should().HaveCount(2); diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusIt.cs index bfb34cc0..fca409c1 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusIt.cs @@ -58,7 +58,7 @@ public async Task BasicPubSubOnTopic(bool enableSerialization) })); }); - await BasicPubSub(subscribers).ConfigureAwait(false); + await BasicPubSub(subscribers); } private async Task BasicPubSub(int subscribers) @@ -119,7 +119,7 @@ public async Task BasicReqRespOnTopic(bool enableSerialization) mbb.Handle(x => x.Topic(topic).Instances(2)); }); - await BasicReqResp().ConfigureAwait(false); + await BasicReqResp(); } [Theory] @@ -137,7 +137,7 @@ public async Task BasicReqRespWithoutRespOnTopic(bool enableSerialization) mbb.Handle(x => x.Topic(topic).Instances(2)); }); - await BasicReqRespWithoutResp().ConfigureAwait(false); + await BasicReqRespWithoutResp(); } private async Task BasicReqResp() diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs index b968879b..105359f6 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs @@ -213,7 +213,7 @@ public async Task BasicReqRespOnTopic() }); }); - await BasicReqResp().ConfigureAwait(false); + await BasicReqResp(); } private async Task BasicReqResp() diff --git a/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusIt.cs index e8167a46..597851db 100644 --- a/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Redis.Test/RedisMessageBusIt.cs @@ -70,7 +70,7 @@ public async Task BasicPubSubOnTopic() })); }); - await BasicPubSub(consumers).ConfigureAwait(false); + await BasicPubSub(consumers); } [Fact] @@ -96,7 +96,7 @@ public async Task BasicPubSubOnQueue() })); }); - await BasicPubSub(consumers).ConfigureAwait(false); + await BasicPubSub(consumers); } private async Task BasicPubSub(int expectedMessageCopies) @@ -172,7 +172,7 @@ public async Task BasicReqRespOnTopic() }); }); - await BasicReqResp().ConfigureAwait(false); + await BasicReqResp(); } [Fact] @@ -198,7 +198,7 @@ public async Task BasicReqRespOnQueue() }); }); - await BasicReqResp().ConfigureAwait(false); + await BasicReqResp(); } private async Task BasicReqResp() diff --git a/src/Tests/SlimMessageBus.Host.Test/Collections/SafeDictionaryWrapperTest.cs b/src/Tests/SlimMessageBus.Host.Test/Collections/SafeDictionaryWrapperTest.cs index ded1b480..7f3fb5c5 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Collections/SafeDictionaryWrapperTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Collections/SafeDictionaryWrapperTest.cs @@ -43,7 +43,7 @@ public void ClearWorks() } [Fact] - public void CheckThreadSafety() + public async Task CheckThreadSafetyAsync() { // arrange var w = new SafeDictionaryWrapper(); @@ -74,7 +74,7 @@ public void CheckThreadSafety() w.GetOrAdd($"c_{i}", k => $"v_{i}"); } }, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default); - Task.WaitAll(task1, task2, task3); + await Task.WhenAll(task1, task2, task3); // assert w.Dictionary.Count.Should().Be(3 * count); diff --git a/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs b/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs index e58417f8..b8a49cc3 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs @@ -6,7 +6,7 @@ public class ReflectionUtilsTests public void When_GenerateGetterFunc_Given_TaskOfT_Then_ResultOfTaskIsObtained() { // arrange - Task taskWithResult = Task.FromResult(1); + var taskWithResult = Task.FromResult(1); var resultPropertyInfo = typeof(Task).GetProperty(nameof(Task.Result)); // act diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs index c7e65494..e317b58d 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs @@ -399,8 +399,8 @@ public async Task When_Publish_Given_Disposed_Then_ThrowsException() Bus.Dispose(); // act - Func act = async () => await Bus.Publish(new SomeMessage()).ConfigureAwait(false); - Func actWithTopic = async () => await Bus.Publish(new SomeMessage(), "some-topic").ConfigureAwait(false); + Func act = async () => await Bus.Publish(new SomeMessage()); + Func actWithTopic = async () => await Bus.Publish(new SomeMessage(), "some-topic"); // assert await act.Should().ThrowAsync(); @@ -414,8 +414,8 @@ public async Task When_Send_Given_Disposed_Then_ThrowsException() Bus.Dispose(); // act - Func act = async () => await Bus.Send(new SomeRequest()).ConfigureAwait(false); - Func actWithTopic = async () => await Bus.Send(new SomeRequest(), "some-topic").ConfigureAwait(false); + Func act = async () => await Bus.Send(new SomeRequest()); + Func actWithTopic = async () => await Bus.Send(new SomeRequest(), "some-topic"); // assert await act.Should().ThrowAsync(); diff --git a/src/Tests/SlimMessageBus.Host.Test/Services/MessageHeaderServiceTests.cs b/src/Tests/SlimMessageBus.Host.Test/Services/MessageHeaderServiceTests.cs index 340a46fd..cb398c3a 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Services/MessageHeaderServiceTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Services/MessageHeaderServiceTests.cs @@ -70,7 +70,7 @@ public void When_AddMessageHeaders_Given_ExistingHeader_And_ProducerModifier_And // assert messageHeaders.Should().HaveCount(5); messageHeaders.Should().Contain(x => x.Key == MessageHeaders.MessageType && (string)x.Value == nameof(SomeMessage)); - messageHeaders.Should().Contain(x => x.Key == "order" && (int)x.Value == 2); + messageHeaders.Should().Contain(x => x.Key == "order" && (int)x.Value == 1); messageHeaders.Should().Contain(x => x.Key == "bus-header" && (bool)x.Value); messageHeaders.Should().Contain(x => x.Key == "producer-header" && (bool)x.Value); messageHeaders.Should().Contain(x => x.Key == "existing-header" && (bool)x.Value);