Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Jan 24, 2025
1 parent c653493 commit a7d529a
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 101 deletions.
3 changes: 3 additions & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ csharp_style_allow_embedded_statements_on_same_line_experimental = true:silent
csharp_style_allow_blank_lines_between_consecutive_braces_experimental = true:silent
csharp_style_allow_blank_line_after_colon_in_constructor_initializer_experimental = true:silent
csharp_style_prefer_primary_constructors = true:suggestion
csharp_prefer_system_threading_lock = true:suggestion
dotnet_diagnostic.xUnit1045.severity = silent

[*.{cs,vb}]
#### Naming styles ####
Expand Down Expand Up @@ -186,6 +188,7 @@ dotnet_style_qualification_for_event = false:suggestion
dotnet_diagnostic.VSTHRD200.severity = none
# not supported by .netstandard2.0
dotnet_diagnostic.CA1510.severity = none
dotnet_diagnostic.CA1512.severity = none

[*.{csproj,xml}]
indent_style = space
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
/// <summary>
/// Circuit breaker to toggle consumer status on an external events.
/// </summary>
internal sealed class CircuitBreakerConsumerInterceptor(ILogger<CircuitBreakerConsumerInterceptor> logger) : IAbstractConsumerInterceptor
internal sealed partial class CircuitBreakerConsumerInterceptor(ILogger<CircuitBreakerConsumerInterceptor> logger) : IAbstractConsumerInterceptor
{
private readonly ILogger<CircuitBreakerConsumerInterceptor> _logger = logger;

public int Order => 100;

public async Task<bool> CanStart(AbstractConsumer consumer)
Expand Down Expand Up @@ -33,12 +35,12 @@ async Task BreakerChanged(Circuit state)
var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default";
if (shouldPause)
{
logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
LogCircuitTripped(path, bus);
await consumer.DoStop().ConfigureAwait(false);
}
else
{
logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
LogCircuitRestored(path, bus);
await consumer.DoStart().ConfigureAwait(false);
}
consumer.SetIsPaused(shouldPause);
Expand Down Expand Up @@ -89,4 +91,32 @@ public async Task<bool> CanStop(AbstractConsumer consumer)
public Task Started(AbstractConsumer consumer) => Task.CompletedTask;

public Task Stopped(AbstractConsumer consumer) => Task.CompletedTask;

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Warning,
Message = "Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.")]
private partial void LogCircuitTripped(string path, string bus);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Information,
Message = "Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.")]
private partial void LogCircuitRestored(string path, string bus);

#endregion
}

#if NETSTANDARD2_0

partial class CircuitBreakerConsumerInterceptor
{
private partial void LogCircuitTripped(string path, string bus)
=> _logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);

private partial void LogCircuitRestored(string path, string bus)
=> _logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class OutboxLockRenewalTimerFactory<TOutboxMessage, TOutboxMessageKey>(IS
private bool _isDisposed = false;

public IOutboxLockRenewalTimer CreateRenewalTimer(TimeSpan lockDuration, TimeSpan interval, Action<Exception> lockLost, CancellationToken cancellationToken)
=> (OutboxLockRenewalTimer<TOutboxMessage, TOutboxMessageKey>)ActivatorUtilities.CreateInstance(_scope.ServiceProvider, typeof(OutboxLockRenewalTimer<TOutboxMessage, TOutboxMessageKey>), lockDuration, interval, lockLost, cancellationToken);
=> ActivatorUtilities.CreateInstance<OutboxLockRenewalTimer<TOutboxMessage, TOutboxMessageKey>>(_scope.ServiceProvider);

public async ValueTask DisposeAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" Condition="'$(TargetFramework)' == 'net8.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0-rc*" Condition="'$(TargetFramework)' == 'net9.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" Condition="'$(TargetFramework)' == 'net9.0'" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ public GoogleProtobufMessageSerializer(ILoggerFactory loggerFactory, IMessagePar
_messageParserFactory = messageParserFactory ?? new MessageParserFactory();
}

public byte[] Serialize(Type t, object message)
{
return ((IMessage)message).ToByteArray();
}
public byte[] Serialize(Type t, object message)
=> ((IMessage)message).ToByteArray();

public object Deserialize(Type t, byte[] payload)
{
Expand All @@ -31,7 +29,7 @@ public object Deserialize(Type t, byte[] payload)
BindingFlags.Instance,
null,
messageParser,
new object[] { payload });
[payload]);

return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" Condition="'$(TargetFramework)' == 'net8.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0-rc*" Condition="'$(TargetFramework)' == 'net9.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" Condition="'$(TargetFramework)' == 'net9.0'" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" Condition="'$(TargetFramework)' == 'net8.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0-rc*" Condition="'$(TargetFramework)' == 'net9.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" Condition="'$(TargetFramework)' == 'net9.0'" />
</ItemGroup>

<ItemGroup>
Expand Down
74 changes: 64 additions & 10 deletions src/SlimMessageBus.Host.Serialization.Json/JsonMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace SlimMessageBus.Host.Serialization.Json;

using Newtonsoft.Json;

public class JsonMessageSerializer : IMessageSerializer, IMessageSerializer<string>
public partial class JsonMessageSerializer : IMessageSerializer, IMessageSerializer<string>
{
private readonly ILogger _logger;
private readonly Encoding _encoding;
Expand All @@ -30,10 +30,10 @@ public JsonMessageSerializer()
public byte[] Serialize(Type t, object message)
{
var jsonPayload = JsonConvert.SerializeObject(message, t, _serializerSettings);
_logger.LogDebug("Type {MessageType} serialized from {Message} to JSON {MessageJson}", t, message, jsonPayload);
LogSerialized(t, message, jsonPayload);
return _encoding.GetBytes(jsonPayload);
}

}

public object Deserialize(Type t, byte[] payload)
{
var jsonPayload = string.Empty;
Expand All @@ -44,7 +44,11 @@ public object Deserialize(Type t, byte[] payload)
}
catch (Exception e)
{
_logger.LogError(e, "Type {MessageType} could not been deserialized, payload: {MessagePayload}, JSON: {MessageJson}", t, _logger.IsEnabled(LogLevel.Debug) ? Convert.ToBase64String(payload) : "(...)", jsonPayload);
var base64Payload = _logger.IsEnabled(LogLevel.Debug)
? Convert.ToBase64String(payload)
: "(...)";

LogDeserializationFailed(t, jsonPayload, base64Payload, e);
throw;
}
}
Expand All @@ -56,16 +60,66 @@ public object Deserialize(Type t, byte[] payload)
string IMessageSerializer<string>.Serialize(Type t, object message)
{
var payload = JsonConvert.SerializeObject(message, t, _serializerSettings);
_logger.LogDebug("Type {MessageType} serialized from {Message} to JSON {MessageJson}", t, message, payload);
LogSerialized(t, message, payload);
return payload;
}

public object Deserialize(Type t, string payload)
{
var message = JsonConvert.DeserializeObject(payload, t, _serializerSettings);
_logger.LogDebug("Type {MessageType} deserialized from JSON {MessageJson} to {Message}", t, payload, message);
return message;
try
{
var message = JsonConvert.DeserializeObject(payload, t, _serializerSettings);
LogDeserializedFromString(t, payload, message);
return message;
}
catch (Exception e)
{
LogDeserializationFailed(t, payload, string.Empty, e);
throw;
}
}

#endregion

#region Logging

#if !NETSTANDARD2_0

[LoggerMessage(
EventId = 0,
Level = LogLevel.Debug,
Message = "Type {MessageType} serialized from {Message} to JSON {MessageJson}")]
private partial void LogSerialized(Type messageType, object message, string messageJson);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Debug,
Message = "Type {MessageType} deserialized from JSON {MessageJson} to {Message}")]
private partial void LogDeserializedFromString(Type messageType, string messageJson, object message);

[LoggerMessage(
EventId = 2,
Level = LogLevel.Error,
Message = "Type {MessageType} could not been deserialized, payload: {MessagePayload}, JSON: {MessageJson}")]
private partial void LogDeserializationFailed(Type messageType, string messageJson, string messagePayload, Exception e);

#endif

#endregion
}
}

#if NETSTANDARD2_0

public partial class JsonMessageSerializer
{
private void LogSerialized(Type messageType, object message, string messageJson)
=> _logger.LogDebug("Type {MessageType} serialized from {Message} to JSON {MessageJson}", messageType, message, messageJson);

private void LogDeserializedFromString(Type messageType, string messageJson, object message)
=> _logger.LogDebug("Type {MessageType} deserialized from JSON {MessageJson} to {Message}", messageType, messageJson, message);

private void LogDeserializationFailed(Type messageType, string messageJson, string messagePayload, Exception e)
=> _logger.LogError(e, "Type {MessageType} could not been deserialized, payload: {MessagePayload}, JSON: {MessageJson}", messageType, messagePayload, messageJson);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" Condition="'$(TargetFramework)' == 'net8.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0-rc*" Condition="'$(TargetFramework)' == 'net9.0'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" Condition="'$(TargetFramework)' == 'net9.0'" />

<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageReference Include="System.Text.Json" Version="6.0.10" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="System.Text.Json" Version="6.0.10" Condition="'$(TargetFramework)' == 'net6.0'" />
<PackageReference Include="System.Text.Json" Version="8.0.5" Condition="'$(TargetFramework)' == 'net8.0'" />
<PackageReference Include="System.Text.Json" Version="9.0.0-rc*" Condition="'$(TargetFramework)' == 'net9.0'" />
<PackageReference Include="System.Text.Json" Version="9.0.0" Condition="'$(TargetFramework)' == 'net9.0'" />
</ItemGroup>

<ItemGroup>
Expand Down
35 changes: 32 additions & 3 deletions src/SlimMessageBus.Host.Sql.Common/SqlHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;

public static class SqlHelper
public static partial class SqlHelper
{
private static readonly HashSet<int> TransientErrorNumbers =
[
Expand All @@ -20,7 +20,7 @@ public static async Task<TResult> RetryIfError<TResult>(ILogger logger, SqlRetry
{
if (tries > 1)
{
logger.LogInformation("SQL error encountered. Will begin attempt number {SqlRetryNumber} of {SqlRetryCount} max...", tries, retrySettings.RetryCount);
LogSqlError(logger, retrySettings.RetryCount, tries);
await Task.Delay(nextRetryInterval, token);
nextRetryInterval = nextRetryInterval.Multiply(retrySettings.RetryIntervalFactor);
}
Expand All @@ -36,7 +36,7 @@ public static async Task<TResult> RetryIfError<TResult>(ILogger logger, SqlRetry
}
// transient SQL error - continue trying
lastTransientException = sqlEx;
logger.LogDebug(sqlEx, "SQL error occurred {SqlErrorCode}. Will retry operation", sqlEx.Number);
LogWillRetry(logger, sqlEx.Number, sqlEx);
}
}
throw lastTransientException;
Expand All @@ -47,4 +47,33 @@ public static Task<TResult> RetryIfTransientError<TResult>(ILogger logger, SqlRe

public static Task RetryIfTransientError(ILogger logger, SqlRetrySettings retrySettings, Func<Task> operation, CancellationToken token) =>
RetryIfTransientError<object>(logger, retrySettings, async () => { await operation(); return null; }, token);

#region Logging

[LoggerMessage(
EventId = 0,
Level = LogLevel.Information,
Message = "SQL error encountered. Will begin attempt number {SqlRetryNumber} of {SqlRetryCount} max...")]
private static partial void LogSqlError(ILogger logger, int sqlRetryCount, int sqlRetryNumber);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Debug,
Message = "SQL error occurred {SqlErrorCode}. Will retry operation")]
private static partial void LogWillRetry(ILogger logger, int sqlErrorCode, SqlException e);

#endregion
}

#if NETSTANDARD2_0

partial class SqlHelper
{
private static partial void LogSqlError(ILogger logger, int sqlRetryCount, int sqlRetryNumber)
=> logger.LogInformation("SQL error encountered. Will begin attempt number {SqlRetryNumber} of {SqlRetryCount} max...", sqlRetryNumber, sqlRetryCount);

private static partial void LogWillRetry(ILogger logger, int sqlErrorCode, SqlException e)
=> logger.LogDebug(e, "SQL error occurred {SqlErrorCode}. Will retry operation", sqlErrorCode);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public MessageHandler(
catch (Exception ex)
{
attempts++;
var handleErrorResult = await DoHandleError(message, messageType, messageScope, consumerContext, ex, attempts, cancellationToken).ConfigureAwait(false);
var handleErrorResult = await DoHandleError(message, messageType, messageScope, consumerContext, ex, attempts).ConfigureAwait(false);
if (handleErrorResult is ProcessResult.RetryState)
{
continue;
Expand Down Expand Up @@ -144,7 +144,7 @@ private async Task<object> DoHandleInternal(object message, IMessageTypeConsumer
return await ExecuteConsumer(message, consumerContext, consumerInvoker, responseType).ConfigureAwait(false);
}

private async Task<ProcessResult> DoHandleError(object message, Type messageType, IMessageScope messageScope, IConsumerContext consumerContext, Exception ex, int attempts, CancellationToken cancellationToken)
private async Task<ProcessResult> DoHandleError(object message, Type messageType, IMessageScope messageScope, IConsumerContext consumerContext, Exception ex, int attempts)
{
var errorHandlerResult = ProcessResult.Failure;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p
if (requestState == null)
{
LogResponseWillBeDiscarded(path, requestId);
// ToDo: add and API hook to these kind of situation
return null;
}

Expand Down
1 change: 1 addition & 0 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSetting
}

#if NETSTANDARD2_0

public abstract partial class MessageBusBase
{
private partial void LogCouldNotStartConsumers(Exception ex)
Expand Down
Loading

0 comments on commit a7d529a

Please sign in to comment.