Skip to content

Commit

Permalink
[Host.Hybrid] SlimMessageBus.Host.ConfigurationMessageBusException: C…
Browse files Browse the repository at this point in the history
…ould not find any bus that produces the message type #199

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 25, 2023
1 parent 4f89c97 commit 5222819
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.1;net6.0</TargetFrameworks>
<Version>2.2.0-rc2</Version>
<Version>2.2.0-rc3</Version>
</PropertyGroup>

</Project>
61 changes: 51 additions & 10 deletions src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
namespace SlimMessageBus.Host.Hybrid;

using System.Collections.Concurrent;

public class HybridMessageBus : IMasterMessageBus, ICompositeMessageBus, IDisposable, IAsyncDisposable
{
private readonly ILogger _logger;
private readonly Dictionary<string, MessageBusBase> _busByName;
private readonly ProducerByMessageTypeCache<MessageBusBase[]> _busesByMessageType;
private readonly RuntimeTypeCache _runtimeTypeCache;

Check warning on line 10 in src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs

View workflow job for this annotation

GitHub Actions / build

Remove the field '_runtimeTypeCache' and declare it as a local variable in the relevant methods. (https://rules.sonarsource.com/csharp/RSPEC-1450)
private readonly ConcurrentDictionary<Type, bool> _undeclaredMessageType;

public ILoggerFactory LoggerFactory { get; }
public MessageBusSettings Settings { get; }
Expand Down Expand Up @@ -49,6 +52,8 @@ public HybridMessageBus(MessageBusSettings settings, HybridMessageBusSettings pr

_busesByMessageType = new ProducerByMessageTypeCache<MessageBusBase[]>(_logger, busesByMessageType, _runtimeTypeCache);

_undeclaredMessageType = new();

// ToDo: defer start of busses until here

Check warning on line 57 in src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs

View workflow job for this annotation

GitHub Actions / build

Complete the task associated to this 'TODO' comment. (https://rules.sonarsource.com/csharp/RSPEC-1135)

Check warning on line 57 in src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs

View workflow job for this annotation

GitHub Actions / build

Complete the task associated to this 'TODO' comment. (https://rules.sonarsource.com/csharp/RSPEC-1135)
}

Expand Down Expand Up @@ -107,28 +112,49 @@ protected virtual MessageBusBase[] Route(object message, string path)
{
var messageType = message.GetType();

var buses = _busesByMessageType[messageType]
?? throw new ConfigurationMessageBusException($"Could not find any bus that produces the message type: {messageType}");
var buses = _busesByMessageType[messageType];
if (buses != null)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Resolved bus {BusName} for message type: {MessageType} and path {Path}", string.Join(",", buses.Select(x => x.Settings.Name)), messageType, path);
}
return buses;
}

if (_logger.IsEnabled(LogLevel.Debug))
if (ProviderSettings.UndeclaredMessageTypeMode == UndeclaredMessageTypeMode.RaiseException)
{
_logger.LogDebug("Resolved bus {BusName} for message type: {MessageType} and path {Path}", string.Join(",", buses.Select(x => x.Settings.Name)), messageType, path);
throw new ConfigurationMessageBusException($"Could not find any bus that produces the message type: {messageType}");
}

return buses;
// Add the message type, so that we only emit warn log once
if (ProviderSettings.UndeclaredMessageTypeMode == UndeclaredMessageTypeMode.RaiseOneTimeLog && _undeclaredMessageType.TryAdd(messageType, true))
{
_logger.LogInformation("Could not find any bus that produces the message type: {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.", messageType);
}

return Array.Empty<MessageBusBase>();
}

#region Implementation of IMessageBusProducer

public Task<TResponseMessage> ProduceSend<TResponseMessage>(object request, TimeSpan? timeout, string path = null, IDictionary<string, object> headers = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default)

Check warning on line 141 in src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs

View workflow job for this annotation

GitHub Actions / build

Add the default parameter value defined in the overridden method. (https://rules.sonarsource.com/csharp/RSPEC-1006)
{
var buses = Route(request, path);
return buses[0].ProduceSend<TResponseMessage>(request, timeout, path, headers, currentServiceProvider, cancellationToken);
if (buses.Length > 0)
{
return buses[0].ProduceSend<TResponseMessage>(request, timeout, path, headers, currentServiceProvider, cancellationToken);
}
return Task.FromResult<TResponseMessage>(default);
}

public async Task ProducePublish(object message, string path = null, IDictionary<string, object> headers = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default)
{
var buses = Route(message, path);
if (buses.Length == 0)
{
return;
}

if (buses.Length == 1)
{
Expand Down Expand Up @@ -174,7 +200,10 @@ public IMessageBus GetChildBus(string name)
public async Task Publish<TMessage>(TMessage message, string path = null, IDictionary<string, object> headers = null, CancellationToken cancellationToken = default)
{
var buses = Route(message, path);

if (buses.Length == 0)
{
return;
}
if (buses.Length == 1)
{
await buses[0].Publish(message, path, headers, cancellationToken);
Expand All @@ -200,19 +229,31 @@ public async Task Publish<TMessage>(TMessage message, string path = null, IDicti
public Task<TResponse> Send<TResponse>(IRequest<TResponse> request, string path = null, IDictionary<string, object> headers = null, TimeSpan? timeout = null, CancellationToken cancellationToken = default)
{
var buses = Route(request, path);
return buses[0].Send(request, path, headers, timeout, cancellationToken);
if (buses.Length > 0)
{
return buses[0].Send(request, path, headers, timeout, cancellationToken);
}
return Task.FromResult<TResponse>(default);
}

public Task Send(IRequest request, string path = null, IDictionary<string, object> headers = null, TimeSpan? timeout = null, CancellationToken cancellationToken = default)
{
var buses = Route(request, path);
return buses[0].Send(request, path, headers, timeout, cancellationToken);
if (buses.Length > 0)
{
return buses[0].Send(request, path, headers, timeout, cancellationToken);
}
return Task.CompletedTask;
}

public Task<TResponse> Send<TResponse, TRequest>(TRequest request, string path = null, IDictionary<string, object> headers = null, TimeSpan? timeout = null, CancellationToken cancellationToken = default)
{
var buses = Route(request, path);
return buses[0].Send<TResponse, TRequest>(request, path, headers, timeout, cancellationToken);
if (buses.Length > 0)
{
return buses[0].Send<TResponse, TRequest>(request, path, headers, timeout, cancellationToken);
}
return Task.FromResult<TResponse>(default);
}

#endregion
Expand Down
25 changes: 24 additions & 1 deletion src/SlimMessageBus.Host/Hybrid/HybridMessageBusSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,33 @@ public class HybridMessageBusSettings
/// <summary>
/// When there are multiple matching bus for a publish message type, defines the mode of execution.
/// </summary>
/// <remarks>The default is <see cref="PublishExecutionMode.Sequential"/>.</remarks>
public PublishExecutionMode PublishExecutionMode { get; set; } = PublishExecutionMode.Sequential;

/// <summary>
/// When a message type is being produced that has no matching child bus that can produce such a message, defines the mode of execution.
/// </summary>
/// <remarks>The default is <see cref="UndeclaredMessageTypeMode.RaiseOneTimeLog"/>.</remarks>
public UndeclaredMessageTypeMode UndeclaredMessageTypeMode { get; set; } = UndeclaredMessageTypeMode.RaiseOneTimeLog;
}

public enum UndeclaredMessageTypeMode
{
/// <summary>
/// Nothing happens (silent failure).
/// </summary>
DoNothing = 0,
/// <summary>
/// An INFO log is generated for every message type that was encountered. THe log happens only once for each message type.
/// </summary>
RaiseOneTimeLog = 1,
/// <summary>
/// Raises an exception (every time message is produced).
/// </summary>
RaiseException = 2,
}

public enum PublishExecutionMode : int
public enum PublishExecutionMode
{
/// <summary>
/// Execute the publish on the first bus, then on the next (one after another).
Expand Down
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private Dictionary<Type, ProducerSettings> BuildProducerByBaseMessageType()

private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
{
_lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices<IMessageBusLifecycleInterceptor>();
_lifecycleInterceptors ??= Settings.ServiceProvider?.GetService<IEnumerable<IMessageBusLifecycleInterceptor>>();
if (_lifecycleInterceptors != null)
{
foreach (var i in _lifecycleInterceptors)
Expand Down Expand Up @@ -256,8 +256,8 @@ public async Task Stop()
}
}

protected virtual Task OnStart() => Task.CompletedTask;
protected virtual Task OnStop() => Task.CompletedTask;
protected internal virtual Task OnStart() => Task.CompletedTask;
protected internal virtual Task OnStop() => Task.CompletedTask;

protected void AssertActive()
{
Expand Down
11 changes: 11 additions & 0 deletions src/Tests/SlimMessageBus.Host.Test.Common/MoqMatchers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace SlimMessageBus.Host.Test.Common;

public static class MoqMatchers
{
public static bool LogMessageMatcher(object formattedLogValueObject, Func<string, bool> messageMatch)
{
var logValues = formattedLogValueObject as IReadOnlyList<KeyValuePair<string, object>>;
var originalFormat = logValues?.FirstOrDefault(logValue => logValue.Key == "{OriginalFormat}").Value.ToString();
return originalFormat != null && messageMatch(originalFormat);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public class XunitLoggerFactory : ILoggerFactory
{
private readonly ITestOutputHelper _output;

public ITestOutputHelper Output => _output;

public XunitLoggerFactory(ITestOutputHelper output) => _output = output;
Expand Down
139 changes: 126 additions & 13 deletions src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ namespace SlimMessageBus.Host.Test.Hybrid;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Hybrid;
using SlimMessageBus.Host.Test.Common;

public class HybridMessageBusTest
{
private readonly Lazy<HybridMessageBus> _subject;
private readonly MessageBusBuilder _messageBusBuilder;
private readonly Mock<IServiceProvider> _serviceProviderMock = new();
private readonly Mock<IMessageSerializer> _messageSerializerMock = new();
private readonly Mock<ILoggerFactory> _loggerFactoryMock = new();
private readonly Mock<ILogger<HybridMessageBusSettings>> _loggerMock = new();

private Mock<MessageBusBase> _bus1Mock;
private Mock<MessageBusBase> _bus2Mock;

private readonly HybridMessageBusSettings _providerSettings;

public HybridMessageBusTest()
{
_messageBusBuilder = MessageBusBuilder.Create();
Expand All @@ -32,6 +37,9 @@ public HybridMessageBusTest()

_serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(_messageSerializerMock.Object);
_serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver());
_serviceProviderMock.Setup(x => x.GetService(typeof(ILoggerFactory))).Returns(_loggerFactoryMock.Object);

_loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny<string>())).Returns(_loggerMock.Object);

_messageBusBuilder.AddChildBus("bus1", (mbb) =>
{
Expand Down Expand Up @@ -64,7 +72,120 @@ public HybridMessageBusTest()
});
});

_subject = new Lazy<HybridMessageBus>(() => new HybridMessageBus(_messageBusBuilder.Settings, new HybridMessageBusSettings(), _messageBusBuilder));
_providerSettings = new();

_subject = new Lazy<HybridMessageBus>(() => new HybridMessageBus(_messageBusBuilder.Settings, _providerSettings, _messageBusBuilder));
}

[Fact]
public async Task When_Start_Then_StartChildBuses()
{
// act
await _subject.Value.Start();

// assert
_bus1Mock.Verify(x => x.OnStart(), Times.Once);
_bus2Mock.Verify(x => x.OnStart(), Times.Once);
}

[Fact]
public async Task When_Stop_Then_StopChildBuses()
{
await _subject.Value.Start();

// act
await _subject.Value.Stop();

// assert
_bus1Mock.Verify(x => x.OnStop(), Times.Once);
_bus2Mock.Verify(x => x.OnStop(), Times.Once);
}

[Fact]
public async Task When_ProvisionTopology_Then_CallsProvisionTopologyOnChildBuses()
{
// act
await _subject.Value.ProvisionTopology();

// assert
_bus1Mock.Verify(x => x.ProvisionTopology(), Times.Once);
_bus2Mock.Verify(x => x.ProvisionTopology(), Times.Once);
}

[Theory]
[InlineData(UndeclaredMessageTypeMode.DoNothing)]
[InlineData(UndeclaredMessageTypeMode.RaiseOneTimeLog)]
[InlineData(UndeclaredMessageTypeMode.RaiseException)]
public async Task Given_UndeclareMessageType_When_Publish_Then_FollowsSettingsMode(UndeclaredMessageTypeMode mode)
{
// arrange
_providerSettings.UndeclaredMessageTypeMode = mode;

var message = new object();

// act
Func<Task> act = () => _subject.Value.Publish(message);

// assert
if (mode == UndeclaredMessageTypeMode.RaiseException)
{
await act.Should().ThrowAsync<ConfigurationMessageBusException>();
}
else
{
await act.Should().NotThrowAsync();

_loggerMock.Verify(x => x.Log(
LogLevel.Information,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))),
It.IsAny<Exception>(),
It.IsAny<Func<It.IsAnyType, Exception, string>>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never);
}

_bus1Mock.VerifyGet(x => x.Settings);
_bus1Mock.VerifyNoOtherCalls();

_bus2Mock.VerifyGet(x => x.Settings);
_bus2Mock.VerifyNoOtherCalls();
}

[Theory]
[InlineData(UndeclaredMessageTypeMode.DoNothing)]
[InlineData(UndeclaredMessageTypeMode.RaiseOneTimeLog)]
[InlineData(UndeclaredMessageTypeMode.RaiseException)]
public async Task Given_UndeclareMessageType_When_Send_Then_FollowsSettingsMode(UndeclaredMessageTypeMode mode)
{
// arrange
_providerSettings.UndeclaredMessageTypeMode = mode;

var message = new SomeUndeclaredRequest();

// act
Func<Task<SomeResponse>> act = () => _subject.Value.Send(message);

// assert
if (mode == UndeclaredMessageTypeMode.RaiseException)
{
await act.Should().ThrowAsync<ConfigurationMessageBusException>();
}
else
{
await act.Should().NotThrowAsync();

_loggerMock.Verify(x => x.Log(
LogLevel.Information,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))),
It.IsAny<Exception>(),
It.IsAny<Func<It.IsAnyType, Exception, string>>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never);
}

_bus1Mock.VerifyGet(x => x.Settings);
_bus1Mock.VerifyNoOtherCalls();

_bus2Mock.VerifyGet(x => x.Settings);
_bus2Mock.VerifyNoOtherCalls();
}

[Fact]
Expand Down Expand Up @@ -155,18 +276,6 @@ public async Task Given_DeclaredRequestMessageTypeAndItsAncestors_When_Send_Then
_bus2Mock.Verify(x => x.Send(someDerivedRequest, null, null, null, default), Times.Once);
}

[Fact]
public async Task Given_NotDeclaredMessageType_When_Publish_Then_ThrowsException()
{
// arrange

// act
Func<Task> notDeclaredTypePublish = () => _subject.Value.Publish("Fake Message");

// assert
await notDeclaredTypePublish.Should().ThrowAsync<ConfigurationMessageBusException>();
}

[Fact]
public void Given_RequestMessageTypeDeclaredOnMoreThanOneBus_When_Constructor_Then_ThrowsException()
{
Expand Down Expand Up @@ -221,6 +330,10 @@ internal class SomeDerivedRequest : SomeRequest
internal class SomeResponse
{
}

internal class SomeUndeclaredRequest : IRequest<SomeResponse>
{
}
}


0 comments on commit 5222819

Please sign in to comment.