Skip to content

Commit

Permalink
[Host.Serialization] Introduce multi-targeting for net8, net6 and net…
Browse files Browse the repository at this point in the history
…standard #211

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Feb 4, 2024
1 parent c281980 commit f1b6bbc
Show file tree
Hide file tree
Showing 19 changed files with 146 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>netstandard2.1;net6.0;net8.0</TargetFrameworks>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.2.2</Version>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

using System.Diagnostics.CodeAnalysis;

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;

Expand Down Expand Up @@ -85,7 +83,7 @@ private async Task Checkpoint(ProcessEventArgs args)
public Task TryCheckpoint()
=> Checkpoint(_lastMessage);

protected static IReadOnlyDictionary<string, object> GetHeadersFromTransportMessage([NotNull] EventData e)
protected static IReadOnlyDictionary<string, object> GetHeadersFromTransportMessage(EventData e)
// Note: Try to see if the Properties are already IReadOnlyDictionary or Dictionary prior allocating a new collection
=> e.Properties as IReadOnlyDictionary<string, object> ?? new Dictionary<string, object>(e.Properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Core configuration interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
<RootNamespace>SlimMessageBus.Host</RootNamespace>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<Version>2.0.2</Version>
<Description>Core interceptor interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
Expand Down
5 changes: 1 addition & 4 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

public static class KafkaExtensions
{
public static TopicPartitionOffset AddOffset([NotNull] this TopicPartitionOffset topicPartitionOffset, int addOffset)
public static TopicPartitionOffset AddOffset(this TopicPartitionOffset topicPartitionOffset, int addOffset)
=> new(topicPartitionOffset.TopicPartition, topicPartitionOffset.Offset + addOffset);

public static IReadOnlyDictionary<string, object> ToHeaders(this ConsumeResult<Ignore, byte[]> consumeResult, IMessageSerializer headerSerializer)
Expand Down
12 changes: 5 additions & 7 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using ConsumeResult = ConsumeResult<Ignore, byte[]>;
using IConsumer = IConsumer<Ignore, byte[]>;

Expand Down Expand Up @@ -175,7 +173,7 @@ protected override async Task OnStop()
}
}

protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition> partitions)
protected virtual void OnPartitionAssigned(ICollection<TopicPartition> partitions)
{
// Ensure processors exist for each assigned topic-partition
foreach (var partition in partitions)
Expand All @@ -187,7 +185,7 @@ protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition>
}
}

protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOffset> partitions)
protected virtual void OnPartitionRevoked(ICollection<TopicPartitionOffset> partitions)
{
foreach (var partition in partitions)
{
Expand All @@ -198,23 +196,23 @@ protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOf
}
}

protected virtual void OnPartitionEndReached([NotNull] TopicPartitionOffset offset)
protected virtual void OnPartitionEndReached(TopicPartitionOffset offset)
{
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
}

protected async virtual ValueTask OnMessage([NotNull] ConsumeResult message)
protected async virtual ValueTask OnMessage(ConsumeResult message)
{
Logger.LogDebug("Group [{Group}]: Received message with Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, payload size: {MessageSize}", Group, message.Topic, message.Partition, message.Offset, message.Message.Value?.Length ?? 0);

var processor = _processors[message.TopicPartition];
await processor.OnMessage(message).ConfigureAwait(false);
}

protected virtual void OnOffsetsCommitted([NotNull] CommittedOffsets e)
protected virtual void OnOffsetsCommitted(CommittedOffsets e)
{
if (e.Error.IsError || e.Error.IsFatal)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using ConsumeResult = ConsumeResult<Ignore, byte[]>;

public abstract class KafkaPartitionConsumer : IKafkaPartitionConsumer
Expand Down Expand Up @@ -30,7 +28,7 @@ protected KafkaPartitionConsumer(ILoggerFactory loggerFactory, AbstractConsumerS
_logger = loggerFactory.CreateLogger<KafkaPartitionConsumer>();

_logger.LogInformation("Creating consumer for Group: {Group}, Topic: {Topic}, Partition: {Partition}", group, topicPartition.Topic, topicPartition.Partition);

ConsumerSettings = consumerSettings ?? throw new ArgumentNullException(nameof(consumerSettings));
Group = group;
TopicPartition = topicPartition;
Expand Down Expand Up @@ -76,7 +74,7 @@ public void Dispose()

#region Implementation of IKafkaTopicPartitionProcessor

public void OnPartitionAssigned([NotNull] TopicPartition partition)
public void OnPartitionAssigned(TopicPartition partition)
{
_lastCheckpointOffset = null;
_lastOffset = null;
Expand All @@ -91,7 +89,7 @@ public void OnPartitionAssigned([NotNull] TopicPartition partition)
}
}

public async Task OnMessage([NotNull] ConsumeResult message)
public async Task OnMessage(ConsumeResult message)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
Expand Down
6 changes: 2 additions & 4 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using IProducer = Confluent.Kafka.IProducer<byte[], byte[]>;
using Message = Confluent.Kafka.Message<byte[], byte[]>;

Expand Down Expand Up @@ -164,7 +162,7 @@ protected override async Task ProduceToTransport(object message, string path, by
message, messageType?.Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset);
}

protected byte[] GetMessageKey(ProducerSettings producerSettings, [NotNull] Type messageType, object message, string topic)
protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic)
{
var keyProvider = producerSettings?.GetKeyProvider();
if (keyProvider != null)
Expand All @@ -183,7 +181,7 @@ protected byte[] GetMessageKey(ProducerSettings producerSettings, [NotNull] Type

private const int NoPartition = -1;

protected int GetMessagePartition(ProducerSettings producerSettings, [NotNull] Type messageType, object message, string topic)
protected int GetMessagePartition(ProducerSettings producerSettings, Type messageType, object message, string topic)
{
var partitionProvider = producerSettings.GetPartitionProvider();
if (partitionProvider != null)
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected override async Task ProduceToTransport(object message, string path, by
{
var m = new MqttApplicationMessage
{
PayloadSegment = messagePayload,
PayloadSegment = new ArraySegment<byte>(messagePayload),
Topic = path
};

Expand Down
16 changes: 15 additions & 1 deletion src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,38 @@ protected virtual SqlCommand CreateCommand()
public async virtual ValueTask BeginTransaction()
{
ValidateNoTransactionStarted();
#if NETSTANDARD2_0
_transaction = Connection.BeginTransaction(Settings.TransactionIsolationLevel);
#else
_transaction = (SqlTransaction)await Connection.BeginTransactionAsync(Settings.TransactionIsolationLevel);
#endif
}

public async virtual ValueTask CommitTransaction()
{
ValidateTransactionStarted();

#if NETSTANDARD2_0
_transaction.Commit();
_transaction.Dispose();
#else
await _transaction.CommitAsync();
await _transaction.DisposeAsync();
#endif
_transaction = null;
}

public async virtual ValueTask RollbackTransaction()
{
ValidateTransactionStarted();

#if NETSTANDARD2_0
_transaction.Rollback();
_transaction.Dispose();
#else
await _transaction.RollbackAsync();
await _transaction.DisposeAsync();
#endif
_transaction = null;
}

Expand Down Expand Up @@ -186,7 +200,7 @@ await ExecuteNonQuery(Settings.SchemaCreationRetry,
BEGIN
CREATE NONCLUSTERED INDEX [{indexName}] ON {_sqlTemplate.TableNameQualified}
(
{string.Join(',', columns.Select(c => $"{c} ASC"))}
{string.Join(",", columns.Select(c => $"{c} ASC"))}
)
END", token: token);
}
Expand Down
4 changes: 1 addition & 3 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

using System.Runtime;

using global::RabbitMQ.Client;

public class RabbitMqTopologyService
Expand Down Expand Up @@ -117,7 +115,7 @@ private string DeclareQueue(HasProviderExtensions settings, string queueName, Ac
_logger.LogInformation("Declaring queue {QueueName}, Durable: {Durable}, AutoDelete: {AutoDelete}, Exclusive: {Exclusive}", queueName, queueDurable, queueAutoDelete, queueExclusive);
try
{
var arguments = new Dictionary<string, object>(queueArguments ?? Enumerable.Empty<KeyValuePair<string, object>>());
var arguments = new Dictionary<string, object>(queueArguments) ?? [];
argumentModifier?.Invoke(arguments);

_channel.QueueDeclare(queueName, durable: queueDurable, exclusive: queueExclusive, autoDelete: queueAutoDelete, arguments: arguments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<Version>2.0.2</Version>
<Description>Core serialization interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public CheckpointTrigger(HasProviderExtensions settings, ILoggerFactory loggerFa
public static bool IsConfigured(HasProviderExtensions settings)
=> settings.GetOrDefault<int?>(CheckpointSettings.CheckpointCount, null) != null || settings.GetOrDefault<TimeSpan?>(CheckpointSettings.CheckpointDuration, null) != null;

public static (int CheckpontCount, TimeSpan CheckpointDuration) GetConfiguration(HasProviderExtensions settings)
=> (settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
public static CheckpointValue GetConfiguration(HasProviderExtensions settings)
=> new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault));


Expand Down Expand Up @@ -68,4 +68,4 @@ public void Reset()
}

#endregion
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
namespace SlimMessageBus.Host;
namespace SlimMessageBus.Host;

public class CheckpointTriggerFactory : ICheckpointTriggerFactory
{
private readonly ILoggerFactory _loggerFactory;
private readonly Func<IReadOnlyCollection<(int, TimeSpan)>, string> _exceptionMessageFactory;
private readonly Func<IReadOnlyCollection<CheckpointValue>, string> _exceptionMessageFactory;

public CheckpointTriggerFactory(ILoggerFactory loggerFactory, Func<IReadOnlyCollection<(int, TimeSpan)>, string> exceptionMessageFactory)
public CheckpointTriggerFactory(ILoggerFactory loggerFactory, Func<IReadOnlyCollection<CheckpointValue>, string> exceptionMessageFactory)
{
_loggerFactory = loggerFactory;
_exceptionMessageFactory = exceptionMessageFactory;
Expand All @@ -21,7 +21,7 @@ public ICheckpointTrigger Create(IEnumerable<AbstractConsumerSettings> consumerS
if (consumerSettingsWithConfiguredCheckpoints.Count > 1)
{
// Check if checkpoint settings across all the configured consumers is all the same.
var configuredCheckpoints = consumerSettingsWithConfiguredCheckpoints.Select(x => CheckpointTrigger.GetConfiguration(x)).ToHashSet();
var configuredCheckpoints = consumerSettingsWithConfiguredCheckpoints.Select(CheckpointTrigger.GetConfiguration).ToHashSet();
if (configuredCheckpoints.Count > 1)
{
var msg = _exceptionMessageFactory(configuredCheckpoints);
Expand Down
17 changes: 17 additions & 0 deletions src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointValue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace SlimMessageBus.Host;

public class CheckpointValue
{
public CheckpointValue()
{
}

public CheckpointValue(int checkpointCount, TimeSpan checkpointDuration)
{
CheckpointCount = checkpointCount;
CheckpointDuration = checkpointDuration;
}

public int CheckpointCount { get; }
public TimeSpan CheckpointDuration { get; }
};
52 changes: 52 additions & 0 deletions src/SlimMessageBus.Host/Helpers/CompatMethods.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace SlimMessageBus.Host;

Check warning on line 1 in src/SlimMessageBus.Host/Helpers/CompatMethods.cs

View workflow job for this annotation

GitHub Actions / build

Remove this empty namespace. (https://rules.sonarsource.com/csharp/RSPEC-3261)

#if NETSTANDARD2_0

/// <summary>
/// Helper for netstandard2.0
/// </summary>
public static class DictionaryExtensions
{
public static void Deconstruct<TKey, TValue>(this KeyValuePair<TKey, TValue> keyValuePair, out TKey key, out TValue value)
{
key = keyValuePair.Key;
value = keyValuePair.Value;
}

public static bool TryAdd<K, V>(this IDictionary<K, V> dict, K key, V value)
{
if (!dict.ContainsKey(key))
{
dict.Add(key, value);
return true;
}
return false;
}
}

static internal class HashCode
{
public static int Combine(object value1, object value2)
{
unchecked // Overflow is fine, just wrap
{
var hash = 17;
hash = hash * 23 + value1.GetHashCode();
hash = hash * 23 + value2.GetHashCode();
return hash;
}
}
}

public static class TimeSpanExtensions
{
public static TimeSpan Multiply(this TimeSpan timeSpan, double factor)
=> TimeSpan.FromMilliseconds(timeSpan.TotalMilliseconds * factor);
}

public static class CollectionExtensions
{
public static HashSet<T> ToHashSet<T>(this IEnumerable<T> items) => new HashSet<T>(items);
}

#endif
10 changes: 6 additions & 4 deletions src/SlimMessageBus.Host/MessageWithHeaders.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SlimMessageBus.Host;

public struct MessageWithHeaders : IEquatable<MessageWithHeaders>
public readonly struct MessageWithHeaders : IEquatable<MessageWithHeaders>
{
// ToDo: Change to ReadOnlyMemory<byte>
public byte[] Payload { get; }
Expand All @@ -23,19 +23,21 @@ public MessageWithHeaders(byte[] payload, Dictionary<string, object> headers)
Payload = payload;
Headers = headers;
}
public override int GetHashCode()
{
return HashCode.Combine(Payload, Headers);
}

public override bool Equals(object obj) =>
obj is MessageWithHeaders headers &&
EqualityComparer<byte[]>.Default.Equals(Payload, headers.Payload) &&
EqualityComparer<IReadOnlyDictionary<string, object>>.Default.Equals(Headers, headers.Headers);

public override int GetHashCode() => HashCode.Combine(Payload, Headers);

public static bool operator ==(MessageWithHeaders left, MessageWithHeaders right) => left.Equals(right);

public static bool operator !=(MessageWithHeaders left, MessageWithHeaders right) => !(left == right);

public bool Equals(MessageWithHeaders other) =>
EqualityComparer<byte[]>.Default.Equals(Payload, other.Payload) &&
EqualityComparer<IReadOnlyDictionary<string, object>>.Default.Equals(Headers, other.Headers);
}
}
Loading

0 comments on commit f1b6bbc

Please sign in to comment.