Skip to content

Commit

Permalink
[Host.AzureServiceBus] Ability to specify SubscriptionName and messag…
Browse files Browse the repository at this point in the history
…e modifier default on whole bus level #185

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Dec 14, 2023
1 parent a6ee111 commit 1365cb3
Show file tree
Hide file tree
Showing 28 changed files with 316 additions and 115 deletions.
39 changes: 36 additions & 3 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration](#configuration)
- [Producing Messages](#producing-messages)
- [Message modifier](#message-modifier)
- [Global message modifier](#global-message-modifier)
- [Consuming Messages](#consuming-messages)
- [Default Subscription Name](#default-subscription-name)
- [Consumer context](#consumer-context)
- [Exception Handling for Consumers](#exception-handling-for-consumers)
- [Transport Specific Settings](#transport-specific-settings)
Expand Down Expand Up @@ -45,7 +47,7 @@ To produce a given `TMessage` to a Azure Serivce Bus queue (or topic) use:

```cs
// send TMessage to Azure SB queues
mbb.Produce<TMessage>(x => x.UseQueue());
mbb.Produce<TMessage>(x => x.UseQueue());

// OR
Expand Down Expand Up @@ -74,7 +76,7 @@ The second (`path`) parameter indicates a queue or topic name - depending on the
When the default queue (or topic) path is configured for a message type:

```cs
mbb.Produce<TMessage>(x => x.DefaultQueue("some-queue"));
mbb.Produce<TMessage>(x => x.DefaultQueue("some-queue"));

// OR
Expand Down Expand Up @@ -113,14 +115,31 @@ mbb.Produce<PingMessage>(x =>

> Since version 1.15.5 the Azure SB client was updated, so the native message type is now `Azure.Messaging.ServiceBus.ServiceBusMessage` (it used to be `Azure.ServiceBus.Message`).
#### Global message modifier

The message modifier can also be applied for all ASB producers:

```cs
mbb.WithProviderServiceBus(cfg =>
{
// producers will inherit this
cfg.WithModifier((message, sbMessage) =>
{
sbMessage.ApplicationProperties["Source"] = "customer-service";
});
});
```

> The global message modifier will be executed first, then the producer specific modifier next.
## Consuming Messages

To consume `TMessage` by `TConsumer` from `some-topic` Azure Service Bus topic use:

```cs
mbb.Consume<TMessage>(x => x
.Topic("some-topic")
.SubscriptionName("subscriber-name")
.SubscriptionName("subscriber-name")
.WithConsumer<TConsumer>()
.Instances(1));
```
Expand All @@ -136,6 +155,20 @@ mbb.Consume<TMessage>(x => x
.Instances(1));
```

### Default Subscription Name

A default subscription name can be provided that will be applied for all ASB topic consumers (subscribers):

```cs
mbb.WithProviderServiceBus(cfg =>
{
// consumers will inherit this
cfg.SubscriptionName("customer-service");
});
```

That way, the subscription name does not have to be repeated on each topic consumer (if it were to be the same).

### Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Azure Service Bus native message:
Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.1;net6.0;net8.0</TargetFrameworks>
<Version>2.2.0-rc3</Version>
<Version>2.2.0-rc4</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

public static class AsbAbstractConsumerSettingsExtensions
{
static internal void SetSubscriptionName(this AbstractConsumerSettings consumerSettings, string subscriptionName)
static internal void SetSubscriptionName(this HasProviderExtensions consumerSettings, string subscriptionName)
{
if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName));

consumerSettings.Properties[AsbProperties.SubscriptionNameKey] = subscriptionName;
}

static internal string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, bool required = true)
static internal string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, ServiceBusMessageBusSettings providerSettings)
{
if (!consumerSettings.Properties.ContainsKey(AsbProperties.SubscriptionNameKey) && !required)
if (consumerSettings.PathKind == PathKind.Topic)
{
return null;
return consumerSettings.GetOrDefault<string>(AsbProperties.SubscriptionNameKey, providerSettings, null)
?? throw new ConfigurationMessageBusException($"SubscriptionName was not configured for topic {consumerSettings.Path}");
}
return consumerSettings.Properties[AsbProperties.SubscriptionNameKey] as string;
return null;
}

static internal void SetMaxAutoLockRenewalDuration(this AbstractConsumerSettings consumerSettings, TimeSpan duration)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

static internal class AsbHasProviderExtensionsExtensions
{
static internal HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, Action<object, ServiceBusMessage> messageModifierAction)
static internal HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, AsbMessageModifier<object> messageModifierAction)
{
producerSettings.Properties[nameof(SetMessageModifier)] = messageModifierAction;
return producerSettings;
}

static internal Action<object, ServiceBusMessage> GetMessageModifier(this HasProviderExtensions producerSettings)
static internal AsbMessageModifier<object> GetMessageModifier(this HasProviderExtensions producerSettings)
{
return producerSettings.GetOrDefault<Action<object, ServiceBusMessage>>(nameof(SetMessageModifier), null);
return producerSettings.GetOrDefault<AsbMessageModifier<object>>(nameof(SetMessageModifier), null);
}

static internal HasProviderExtensions SetQueueOptions(this HasProviderExtensions producerSettings, Action<CreateQueueOptions> optionsAction)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public static class AsbProducerBuilderExtensions
Expand Down Expand Up @@ -44,21 +43,18 @@ public static ProducerBuilder<T> ToQueue<T>(this ProducerBuilder<T> producerBuil
}

/// <summary>
/// Allows to set additional properties to the native <see cref="Message"/> when producing the <see cref="T"/> message.
/// Allows to set additional properties to the native <see cref="ServiceBusMessage"/> when producing the <see cref="T"/> message.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="producerBuilder"></param>
/// <param name="modifierAction"></param>
/// <param name="modifier"></param>
/// <returns></returns>
public static ProducerBuilder<T> WithModifier<T>(this ProducerBuilder<T> producerBuilder, Action<T, ServiceBusMessage> modifierAction)
public static ProducerBuilder<T> WithModifier<T>(this ProducerBuilder<T> producerBuilder, AsbMessageModifier<T> modifier)
{
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (modifierAction is null) throw new ArgumentNullException(nameof(modifierAction));
if (modifier is null) throw new ArgumentNullException(nameof(modifier));

producerBuilder.Settings.SetMessageModifier((e, m) =>
{
modifierAction((T)e, m);
});
producerBuilder.Settings.SetMessageModifier((e, m) => modifier((T)e, m));
return producerBuilder;
}

Expand Down
3 changes: 3 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/Config/Delegates.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public delegate void AsbMessageModifier<in T>(T message, ServiceBusMessage transportMessage);
33 changes: 22 additions & 11 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public override async Task ProvisionTopology()
await provisioningService.ProvisionTopology(); // provisining happens asynchronously
}


#region Overrides of MessageBusBase

protected override void Build()
Expand Down Expand Up @@ -62,7 +63,9 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso

static void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx) => ctx.SetTransportMessage(m);

foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(required: false))).ToDictionary(x => x.Key, x => x.ToList()))
foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers
.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings)))
.ToDictionary(x => x.Key, x => x.ToList()))
{
var topicSubscription = new TopicSubscriptionParams(path: path, subscriptionName: subscriptionName);
var messageProcessor = new MessageProcessor<ServiceBusReceivedMessage>(
Expand All @@ -78,7 +81,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso

if (Settings.RequestResponse != null)
{
var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(required: false));
var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(ProviderSettings));
var messageProcessor = new ResponseMessageProcessor<ServiceBusReceivedMessage>(
LoggerFactory,
Settings.RequestResponse,
Expand Down Expand Up @@ -130,18 +133,13 @@ protected override async Task ProduceToTransport(object message, string path, by
}
}

// global modifier first
InvokeMessageModifier(message, messageType, m, ProviderSettings);
if (messageType != null)
{
// local producer modifier second
var producerSettings = GetProducerSettings(messageType);
try
{
var messageModifier = producerSettings.GetMessageModifier();
messageModifier?.Invoke(message, m);
}
catch (Exception e)
{
_logger.LogWarning(e, "The configured message modifier failed for message type {MessageType} and message {Message}", messageType, message);
}
InvokeMessageModifier(message, messageType, m, producerSettings);
}

var senderClient = _producerByPath.GetOrAdd(path);
Expand All @@ -161,6 +159,19 @@ protected override async Task ProduceToTransport(object message, string path, by
}
}

private void InvokeMessageModifier(object message, Type messageType, ServiceBusMessage m, HasProviderExtensions settings)
{
try
{
var messageModifier = settings.GetMessageModifier();
messageModifier?.Invoke(message, m);
}
catch (Exception e)
{
_logger.LogWarning(e, "The configured message modifier failed for message type {MessageType} and message {Message}", messageType, message);
}
}

public override Task ProduceRequest(object request, IDictionary<string, object> requestHeaders, string path, ProducerSettings producerSettings)
{
if (requestHeaders is null) throw new ArgumentNullException(nameof(requestHeaders));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public class ServiceBusMessageBusSettings
public class ServiceBusMessageBusSettings : HasProviderExtensions
{
public string ConnectionString { get; set; }
public Func<ServiceBusClient> ClientFactory { get; set; }
Expand Down Expand Up @@ -74,4 +74,54 @@ public ServiceBusMessageBusSettings(string serviceBusConnectionString)
{
ConnectionString = serviceBusConnectionString;
}

/// <summary>
/// Configures the default subscription name when consuming form Azure ServiceBus topic.
/// </summary>
/// <param name="subscriptionName"></param>
/// <returns></returns>
public ServiceBusMessageBusSettings SubscriptionName(string subscriptionName)
{
if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName));

this.SetSubscriptionName(subscriptionName);
return this;
}

/// <summary>
/// Allows to set additional properties to the native <see cref="ServiceBusMessage"/> when producing the any message.
/// </summary>
/// <param name="modifier"></param>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
/// <returns></returns>
public ServiceBusMessageBusSettings WithModifier(AsbMessageModifier<object> modifier, bool executePrevious = true)
{
if (modifier is null) throw new ArgumentNullException(nameof(modifier));

var previousModifier = executePrevious ? this.GetMessageModifier() : null;
this.SetMessageModifier(previousModifier == null
? modifier
: (message, transportMessage) =>
{
previousModifier(message, transportMessage);
modifier(message, transportMessage);
});
return this;
}

/// <summary>
/// Allows to set additional properties to the native <see cref="ServiceBusMessage"/> when producing the any message.
/// </summary>
/// <param name="modifier"></param>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
/// <returns></returns>
public ServiceBusMessageBusSettings WithModifier<T>(AsbMessageModifier<T> modifier, bool executePrevious = true)
=> WithModifier((message, transportMessage) =>
{
if (message is T typedMessage)
{
modifier(typedMessage, transportMessage);
}
},
executePrevious);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected override void AssertConsumer(ConsumerSettings consumerSettings)
{
base.AssertConsumer(consumerSettings);

if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(required: false)))
if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(ProviderSettings)))
{
ThrowConsumerFieldNotSet(consumerSettings, nameof(AsbConsumerBuilderExtensions.SubscriptionName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ await TryCreateQueue(path, topologyProvisioning.CanConsumerCreateQueue, options
if ((topicStatus & TopologyCreationStatus.Exists) != 0)
{
var consumerSettingsBySubscription = consumerSettingsList
.Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(required: false)))
.Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(providerSettings)))
.Where(x => x.SubscriptionName != null)
.ToDictionary(x => x.SubscriptionName, x => x.ConsumerSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@
<ProjectReference Include="..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>SlimMessageBus.Host.AzureServiceBus.Test</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

</Project>
39 changes: 25 additions & 14 deletions src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,27 +264,38 @@ public MessageBusBuilder WithMessageTypeResolver(Type messageTypeResolverType)
public MessageBusBuilder WithMessageTypeResolver<T>() => WithMessageTypeResolver(typeof(T));

/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// Hook called whenver message is being produced. Can be used to change message headers.
/// </summary>
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier) => WithHeaderModifier<object>(headerModifier);

/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public MessageBusBuilder WithHeaderModifier<T>(MessageHeaderModifier<T> headerModifier)
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier, bool executePrevious = true)
{
if (headerModifier == null) throw new ArgumentNullException(nameof(headerModifier));

Settings.HeaderModifier = (headers, message) =>
{
if (message is T typedMessage)
{
headerModifier(headers, typedMessage);
}
};
var previousHeaderModifier = executePrevious ? Settings.HeaderModifier : null;
Settings.HeaderModifier = previousHeaderModifier == null
? headerModifier
: (headers, message) =>
{
previousHeaderModifier(headers, message);
headerModifier(headers, message);
};
return this;
}

/// <summary>
/// Hook called whenver message is being produced. Can be used to change message headers.
/// </summary>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
public MessageBusBuilder WithHeaderModifier<T>(MessageHeaderModifier<T> headerModifier, bool executePrevious = true)
=> WithHeaderModifier((headers, message) =>
{
if (message is T typedMessage)
{
headerModifier(headers, typedMessage);
}
},
executePrevious);

/// <summary>
/// Enables or disabled the auto statrt of message consumption upon bus creation. If false, then you need to call the .Start() on the bus to start consuming messages.
/// </summary>
Expand Down
Loading

0 comments on commit 1365cb3

Please sign in to comment.