diff --git a/Samples/Client/Client_Subscribe_Samples.cs b/Samples/Client/Client_Subscribe_Samples.cs index a1bcd680e..36e6b61f2 100644 --- a/Samples/Client/Client_Subscribe_Samples.cs +++ b/Samples/Client/Client_Subscribe_Samples.cs @@ -5,8 +5,8 @@ // ReSharper disable UnusedType.Global // ReSharper disable UnusedMember.Global // ReSharper disable InconsistentNaming +// ReSharper disable UnusedMember.Local -using MQTTnet; using MQTTnet.Client; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -177,4 +177,60 @@ public static async Task Subscribe_Topic() response.DumpToConsole(); } } -} + + static void ConcurrentProcessingDisableAutoAcknowledge(CancellationToken shutdownToken, IMqttClient mqttClient) + { + /* + * This sample shows how to achieve concurrent processing and not have message AutoAcknowledged + * This to have a proper QoS1 (at-least-once) experience for what at least MQTT specification can provide + */ + mqttClient.ApplicationMessageReceivedAsync += async ea => + { + ea.AutoAcknowledge = false; + + async Task ProcessAsync() + { + // DO YOUR WORK HERE! + await Task.Delay(1000, shutdownToken); + await ea.AcknowledgeAsync(shutdownToken); + // WARNING: If process failures are not transient the message will be retried on every restart of the client + // A failed message will not be dispatched again to the client as MQTT does not have a NACK packet to let + // the broker know processing failed + // + // Optionally: Use a framework like Polly to create a retry policy: https://github.com/App-vNext/Polly#retry + } + + _ = Task.Run(ProcessAsync, shutdownToken); + }; + } + + static void ConcurrentProcessingWithLimit(CancellationToken shutdownToken, IMqttClient mqttClient) + { + /* + * This sample shows how to achieve concurrent processing, with: + * - a maximum concurrency limit based on Environment.ProcessorCount + */ + + var concurrent = new SemaphoreSlim(Environment.ProcessorCount); + + mqttClient.ApplicationMessageReceivedAsync += async ea => + { + await concurrent.WaitAsync(shutdownToken).ConfigureAwait(false); + + async Task ProcessAsync() + { + try + { + // DO YOUR WORK HERE! + await Task.Delay(1000, shutdownToken); + } + finally + { + concurrent.Release(); + } + } + + _ = Task.Run(ProcessAsync, shutdownToken); + }; + } +} \ No newline at end of file