Skip to content

Commit

Permalink
Samples that show concurrent processing (#1803)
Browse files Browse the repository at this point in the history
Samples that show concurrent processing with a limit and concurrent processing without auto ACK
  • Loading branch information
ramonsmits authored Sep 1, 2023
1 parent e0660ba commit 0d3dd96
Showing 1 changed file with 58 additions and 2 deletions.
60 changes: 58 additions & 2 deletions Samples/Client/Client_Subscribe_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
// ReSharper disable UnusedType.Global
// ReSharper disable UnusedMember.Global
// ReSharper disable InconsistentNaming
// ReSharper disable UnusedMember.Local

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
Expand Down Expand Up @@ -177,4 +177,60 @@ public static async Task Subscribe_Topic()
response.DumpToConsole();
}
}
}

static void ConcurrentProcessingDisableAutoAcknowledge(CancellationToken shutdownToken, IMqttClient mqttClient)
{
/*
* This sample shows how to achieve concurrent processing and not have message AutoAcknowledged
* This to have a proper QoS1 (at-least-once) experience for what at least MQTT specification can provide
*/
mqttClient.ApplicationMessageReceivedAsync += async ea =>
{
ea.AutoAcknowledge = false;

async Task ProcessAsync()
{
// DO YOUR WORK HERE!
await Task.Delay(1000, shutdownToken);
await ea.AcknowledgeAsync(shutdownToken);
// WARNING: If process failures are not transient the message will be retried on every restart of the client
// A failed message will not be dispatched again to the client as MQTT does not have a NACK packet to let
// the broker know processing failed
//
// Optionally: Use a framework like Polly to create a retry policy: https://github.com/App-vNext/Polly#retry
}

_ = Task.Run(ProcessAsync, shutdownToken);
};
}

static void ConcurrentProcessingWithLimit(CancellationToken shutdownToken, IMqttClient mqttClient)
{
/*
* This sample shows how to achieve concurrent processing, with:
* - a maximum concurrency limit based on Environment.ProcessorCount
*/

var concurrent = new SemaphoreSlim(Environment.ProcessorCount);

mqttClient.ApplicationMessageReceivedAsync += async ea =>
{
await concurrent.WaitAsync(shutdownToken).ConfigureAwait(false);

async Task ProcessAsync()
{
try
{
// DO YOUR WORK HERE!
await Task.Delay(1000, shutdownToken);
}
finally
{
concurrent.Release();
}
}

_ = Task.Run(ProcessAsync, shutdownToken);
};
}
}

0 comments on commit 0d3dd96

Please sign in to comment.