Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.AzureServiceBus] Ability to specify SubscriptionName and message modifier defaults #201

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration](#configuration)
- [Producing Messages](#producing-messages)
- [Message modifier](#message-modifier)
- [Global message modifier](#global-message-modifier)
- [Consuming Messages](#consuming-messages)
- [Default Subscription Name](#default-subscription-name)
- [Consumer context](#consumer-context)
- [Exception Handling for Consumers](#exception-handling-for-consumers)
- [Transport Specific Settings](#transport-specific-settings)
Expand Down Expand Up @@ -45,7 +47,7 @@ To produce a given `TMessage` to a Azure Serivce Bus queue (or topic) use:

```cs
// send TMessage to Azure SB queues
mbb.Produce<TMessage>(x => x.UseQueue());
mbb.Produce<TMessage>(x => x.UseQueue());

// OR

Expand Down Expand Up @@ -74,7 +76,7 @@ The second (`path`) parameter indicates a queue or topic name - depending on the
When the default queue (or topic) path is configured for a message type:

```cs
mbb.Produce<TMessage>(x => x.DefaultQueue("some-queue"));
mbb.Produce<TMessage>(x => x.DefaultQueue("some-queue"));

// OR

Expand Down Expand Up @@ -113,14 +115,31 @@ mbb.Produce<PingMessage>(x =>

> Since version 1.15.5 the Azure SB client was updated, so the native message type is now `Azure.Messaging.ServiceBus.ServiceBusMessage` (it used to be `Azure.ServiceBus.Message`).

#### Global message modifier

The message modifier can also be applied for all ASB producers:

```cs
mbb.WithProviderServiceBus(cfg =>
{
// producers will inherit this
cfg.WithModifier((message, sbMessage) =>
{
sbMessage.ApplicationProperties["Source"] = "customer-service";
});
});
```

> The global message modifier will be executed first, then the producer specific modifier next.

## Consuming Messages

To consume `TMessage` by `TConsumer` from `some-topic` Azure Service Bus topic use:

```cs
mbb.Consume<TMessage>(x => x
.Topic("some-topic")
.SubscriptionName("subscriber-name")
.SubscriptionName("subscriber-name")
.WithConsumer<TConsumer>()
.Instances(1));
```
Expand All @@ -136,6 +155,20 @@ mbb.Consume<TMessage>(x => x
.Instances(1));
```

### Default Subscription Name

A default subscription name can be provided that will be applied for all ASB topic consumers (subscribers):

```cs
mbb.WithProviderServiceBus(cfg =>
{
// consumers will inherit this
cfg.SubscriptionName("customer-service");
});
```

That way, the subscription name does not have to be repeated on each topic consumer (if it were to be the same).

### Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Azure Service Bus native message:
Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.1;net6.0;net8.0</TargetFrameworks>
<Version>2.2.0-rc3</Version>
<Version>2.2.0-rc4</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

public static class AsbAbstractConsumerSettingsExtensions
{
static internal void SetSubscriptionName(this AbstractConsumerSettings consumerSettings, string subscriptionName)
static internal void SetSubscriptionName(this HasProviderExtensions consumerSettings, string subscriptionName)
{
if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName));

consumerSettings.Properties[AsbProperties.SubscriptionNameKey] = subscriptionName;
}

static internal string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, bool required = true)
static internal string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, ServiceBusMessageBusSettings providerSettings)
{
if (!consumerSettings.Properties.ContainsKey(AsbProperties.SubscriptionNameKey) && !required)
if (consumerSettings.PathKind == PathKind.Topic)
{
return null;
return consumerSettings.GetOrDefault<string>(AsbProperties.SubscriptionNameKey, providerSettings, null)
?? throw new ConfigurationMessageBusException($"SubscriptionName was not configured for topic {consumerSettings.Path}");
}
return consumerSettings.Properties[AsbProperties.SubscriptionNameKey] as string;
return null;
}

static internal void SetMaxAutoLockRenewalDuration(this AbstractConsumerSettings consumerSettings, TimeSpan duration)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

static internal class AsbHasProviderExtensionsExtensions
{
static internal HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, Action<object, ServiceBusMessage> messageModifierAction)
static internal HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, AsbMessageModifier<object> messageModifierAction)
{
producerSettings.Properties[nameof(SetMessageModifier)] = messageModifierAction;
return producerSettings;
}

static internal Action<object, ServiceBusMessage> GetMessageModifier(this HasProviderExtensions producerSettings)
static internal AsbMessageModifier<object> GetMessageModifier(this HasProviderExtensions producerSettings)
{
return producerSettings.GetOrDefault<Action<object, ServiceBusMessage>>(nameof(SetMessageModifier), null);
return producerSettings.GetOrDefault<AsbMessageModifier<object>>(nameof(SetMessageModifier), null);
}

static internal HasProviderExtensions SetQueueOptions(this HasProviderExtensions producerSettings, Action<CreateQueueOptions> optionsAction)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public static class AsbProducerBuilderExtensions
Expand Down Expand Up @@ -44,21 +43,18 @@ public static ProducerBuilder<T> ToQueue<T>(this ProducerBuilder<T> producerBuil
}

/// <summary>
/// Allows to set additional properties to the native <see cref="Message"/> when producing the <see cref="T"/> message.
/// Allows to set additional properties to the native <see cref="ServiceBusMessage"/> when producing the <see cref="T"/> message.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="producerBuilder"></param>
/// <param name="modifierAction"></param>
/// <param name="modifier"></param>
/// <returns></returns>
public static ProducerBuilder<T> WithModifier<T>(this ProducerBuilder<T> producerBuilder, Action<T, ServiceBusMessage> modifierAction)
public static ProducerBuilder<T> WithModifier<T>(this ProducerBuilder<T> producerBuilder, AsbMessageModifier<T> modifier)
{
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (modifierAction is null) throw new ArgumentNullException(nameof(modifierAction));
if (modifier is null) throw new ArgumentNullException(nameof(modifier));

producerBuilder.Settings.SetMessageModifier((e, m) =>
{
modifierAction((T)e, m);
});
producerBuilder.Settings.SetMessageModifier((e, m) => modifier((T)e, m));
return producerBuilder;
}

Expand Down
3 changes: 3 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/Config/Delegates.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public delegate void AsbMessageModifier<in T>(T message, ServiceBusMessage transportMessage);
33 changes: 22 additions & 11 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public override async Task ProvisionTopology()
await provisioningService.ProvisionTopology(); // provisining happens asynchronously
}


#region Overrides of MessageBusBase

protected override void Build()
Expand Down Expand Up @@ -62,7 +63,9 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso

static void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx) => ctx.SetTransportMessage(m);

foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(required: false))).ToDictionary(x => x.Key, x => x.ToList()))
foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers
.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings)))
.ToDictionary(x => x.Key, x => x.ToList()))
{
var topicSubscription = new TopicSubscriptionParams(path: path, subscriptionName: subscriptionName);
var messageProcessor = new MessageProcessor<ServiceBusReceivedMessage>(
Expand All @@ -78,7 +81,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso

if (Settings.RequestResponse != null)
{
var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(required: false));
var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(ProviderSettings));
var messageProcessor = new ResponseMessageProcessor<ServiceBusReceivedMessage>(
LoggerFactory,
Settings.RequestResponse,
Expand Down Expand Up @@ -130,18 +133,13 @@ protected override async Task ProduceToTransport(object message, string path, by
}
}

// global modifier first
InvokeMessageModifier(message, messageType, m, ProviderSettings);
if (messageType != null)
{
// local producer modifier second
var producerSettings = GetProducerSettings(messageType);
try
{
var messageModifier = producerSettings.GetMessageModifier();
messageModifier?.Invoke(message, m);
}
catch (Exception e)
{
_logger.LogWarning(e, "The configured message modifier failed for message type {MessageType} and message {Message}", messageType, message);
}
InvokeMessageModifier(message, messageType, m, producerSettings);
}

var senderClient = _producerByPath.GetOrAdd(path);
Expand All @@ -161,6 +159,19 @@ protected override async Task ProduceToTransport(object message, string path, by
}
}

private void InvokeMessageModifier(object message, Type messageType, ServiceBusMessage m, HasProviderExtensions settings)
{
try
{
var messageModifier = settings.GetMessageModifier();
messageModifier?.Invoke(message, m);
}
catch (Exception e)
{
_logger.LogWarning(e, "The configured message modifier failed for message type {MessageType} and message {Message}", messageType, message);
}
}

public override Task ProduceRequest(object request, IDictionary<string, object> requestHeaders, string path, ProducerSettings producerSettings)
{
if (requestHeaders is null) throw new ArgumentNullException(nameof(requestHeaders));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public class ServiceBusMessageBusSettings
public class ServiceBusMessageBusSettings : HasProviderExtensions
{
public string ConnectionString { get; set; }
public Func<ServiceBusClient> ClientFactory { get; set; }
Expand Down Expand Up @@ -74,4 +74,54 @@ public ServiceBusMessageBusSettings(string serviceBusConnectionString)
{
ConnectionString = serviceBusConnectionString;
}

/// <summary>
/// Configures the default subscription name when consuming form Azure ServiceBus topic.
/// </summary>
/// <param name="subscriptionName"></param>
/// <returns></returns>
public ServiceBusMessageBusSettings SubscriptionName(string subscriptionName)
{
if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName));

this.SetSubscriptionName(subscriptionName);
return this;
}

/// <summary>
/// Allows to set additional properties to the native <see cref="ServiceBusMessage"/> when producing the any message.
/// </summary>
/// <param name="modifier"></param>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
/// <returns></returns>
public ServiceBusMessageBusSettings WithModifier(AsbMessageModifier<object> modifier, bool executePrevious = true)
{
if (modifier is null) throw new ArgumentNullException(nameof(modifier));

var previousModifier = executePrevious ? this.GetMessageModifier() : null;
this.SetMessageModifier(previousModifier == null
? modifier
: (message, transportMessage) =>
{
previousModifier(message, transportMessage);
modifier(message, transportMessage);
});
return this;
}

/// <summary>
/// Allows to set additional properties to the native <see cref="ServiceBusMessage"/> when producing the any message.
/// </summary>
/// <param name="modifier"></param>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
/// <returns></returns>
public ServiceBusMessageBusSettings WithModifier<T>(AsbMessageModifier<T> modifier, bool executePrevious = true)
=> WithModifier((message, transportMessage) =>
{
if (message is T typedMessage)
{
modifier(typedMessage, transportMessage);
}
},
executePrevious);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected override void AssertConsumer(ConsumerSettings consumerSettings)
{
base.AssertConsumer(consumerSettings);

if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(required: false)))
if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(ProviderSettings)))
{
ThrowConsumerFieldNotSet(consumerSettings, nameof(AsbConsumerBuilderExtensions.SubscriptionName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ await TryCreateQueue(path, topologyProvisioning.CanConsumerCreateQueue, options
if ((topicStatus & TopologyCreationStatus.Exists) != 0)
{
var consumerSettingsBySubscription = consumerSettingsList
.Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(required: false)))
.Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(providerSettings)))
.Where(x => x.SubscriptionName != null)
.ToDictionary(x => x.SubscriptionName, x => x.ConsumerSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@
<ProjectReference Include="..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>SlimMessageBus.Host.AzureServiceBus.Test</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

</Project>
39 changes: 25 additions & 14 deletions src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,27 +264,38 @@
public MessageBusBuilder WithMessageTypeResolver<T>() => WithMessageTypeResolver(typeof(T));

/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// Hook called whenver message is being produced. Can be used to change message headers.
/// </summary>
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier) => WithHeaderModifier<object>(headerModifier);

/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public MessageBusBuilder WithHeaderModifier<T>(MessageHeaderModifier<T> headerModifier)
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier, bool executePrevious = true)
{
if (headerModifier == null) throw new ArgumentNullException(nameof(headerModifier));

Settings.HeaderModifier = (headers, message) =>
{
if (message is T typedMessage)
{
headerModifier(headers, typedMessage);
}
};
var previousHeaderModifier = executePrevious ? Settings.HeaderModifier : null;
Settings.HeaderModifier = previousHeaderModifier == null
? headerModifier
: (headers, message) =>
{
previousHeaderModifier(headers, message);
headerModifier(headers, message);
};
return this;
}

/// <summary>
/// Hook called whenver message is being produced. Can be used to change message headers.
/// </summary>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
public MessageBusBuilder WithHeaderModifier<T>(MessageHeaderModifier<T> headerModifier, bool executePrevious = true)
=> WithHeaderModifier((headers, message) =>
{
if (message is T typedMessage)
{
headerModifier(headers, typedMessage);
}
},
executePrevious);

/// <summary>
/// Enables or disabled the auto statrt of message consumption upon bus creation. If false, then you need to call the .Start() on the bus to start consuming messages.
/// </summary>
Expand Down Expand Up @@ -314,7 +325,7 @@
child.MergeFrom(Settings);
}

builderAction?.Invoke(child);

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Change this expression which always evaluates to the same result. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Change this expression which always evaluates to the same result. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Change this expression which always evaluates to the same result. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Change this expression which always evaluates to the same result. (https://rules.sonarsource.com/csharp/RSPEC-2589)

return this;
}
Expand Down
Loading
Loading