Skip to content

Commit

Permalink
MQTT Plugin (#678)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrey <[email protected]>
  • Loading branch information
AntyaDev and mangystx authored Apr 10, 2024
1 parent 73d4868 commit e346c41
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 168 deletions.
8 changes: 5 additions & 3 deletions examples/Demo/Demo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@
<PackageReference Include="Dapper.Contrib" Version="2.0.78" />
<PackageReference Include="LiteDB" Version="5.0.15" />
<PackageReference Include="MathNet.Numerics" Version="5.0.0" />
<PackageReference Include="NBomber" Version="5.6.0-beta.13" />

<PackageReference Include="NBomber" Version="5.6.0" />
<PackageReference Include="NBomber.Data" Version="5.0.0" />
<PackageReference Include="NBomber.Http" Version="5.1.0-beta.4" />
<PackageReference Include="NBomber.Http" Version="5.1.0" />
<PackageReference Include="NBomber.MQTT" Version="0.1.0" />
<PackageReference Include="NBomber.WebSockets" Version="0.1.0" />
<PackageReference Include="NBomber.Sinks.InfluxDB" Version="5.0.2" />
<PackageReference Include="MQTTnet" Version="3.1.2" />

<PackageReference Include="Serilog.Sinks.Elasticsearch" Version="9.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.6.122" />
<PackageReference Include="System.Data.SQLite.Core" Version="1.0.117" />
Expand Down
52 changes: 21 additions & 31 deletions examples/Demo/MQTT/ClientPool/ClientPoolMqttExample.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
using NBomber;
using NBomber.CSharp;
using NBomber.Data;
using MqttClient = NBomber.MQTT.MqttClient;

namespace Demo.MQTT.ClientPool;

Expand All @@ -21,30 +19,33 @@ public class ClientPoolMqttExample
{
public void Run()
{
var clientPool = new ClientPool<IMqttClient>();
var responsePromises = new ConcurrentDictionary<IMqttClient, TaskCompletionSource<MqttApplicationMessage>>();
var clientPool = new ClientPool<MqttClient>();
var message = Array.Empty<byte>();

var scenario = Scenario.Create("mqtt_scenario", async ctx =>
{
var client = clientPool.GetClient(ctx.ScenarioInfo);
var promise = responsePromises[client];

var publish = await Step.Run("publish", ctx, async () =>
{
await client.PublishAsync(client.Options.ClientId, message);
return Response.Ok(sizeBytes: message.Length);
var msg = new MqttApplicationMessageBuilder()
.WithTopic(client.Client.Options.ClientId)
.WithPayload(message)
.Build();

var response = await client.Publish(msg);
return response;
});

var receive = await Step.Run("receive", ctx, async () =>
{
var response = await promise.Task;
return Response.Ok(sizeBytes: response.Payload.Length);
var response = await client.Receive();
return response;
});

return Response.Ok();
})
.WithoutWarmUp()
.WithWarmUpDuration(TimeSpan.FromSeconds(3))
.WithLoadSimulations(Simulation.KeepConstant(copies: 1, during: TimeSpan.FromSeconds(30)))
.WithInit(async context =>
{
Expand All @@ -58,29 +59,18 @@ public void Run()
{
counter++;

var client = mqttFactory.CreateMqttClient();
var client = new MqttClient(mqttFactory.CreateMqttClient());
var clientOptions = new MqttClientOptionsBuilder()
.WithWebSocketServer(config.MqttServerUrl)
.WithWebSocketServer(optionsBuilder => optionsBuilder.WithUri(config.MqttServerUrl))
.WithCleanSession()
.WithClientId($"client_{i}")
.Build();

var result = await client.ConnectAsync(clientOptions);
var result = await client.Connect(clientOptions);

if (result.ResultCode == MqttClientConnectResultCode.Success)
if (!result.IsError)
{
// register client and push response promise
responsePromises[client] = new TaskCompletionSource<MqttApplicationMessage>();

client.UseApplicationMessageReceivedHandler(msg =>
{
var promise = responsePromises[client];
responsePromises[client] = new TaskCompletionSource<MqttApplicationMessage>(); // set new promise
promise.TrySetResult(msg.ApplicationMessage);
});

await client.SubscribeAsync(client.Options.ClientId);

await client.Subscribe(client.Client.Options.ClientId);
clientPool.AddClient(client);
}
else
Expand All @@ -89,13 +79,13 @@ public void Run()
if (counter == 10)
{
counter = 0;
await Task.Delay(500);
await Task.Delay(500); // pause, to do not overload MQTT broker
}
}
})
.WithClean(context =>
.WithClean(ctx =>
{
clientPool.DisposeClients(client => client.DisconnectAsync().Wait());
clientPool.DisposeClients(client => client.Disconnect().Wait());
return Task.CompletedTask;
});

Expand Down
6 changes: 4 additions & 2 deletions examples/Demo/MQTT/ClientPool/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
{
"ScenarioName": "mqtt_scenario",

"WarmUpDuration": "00:00:03",

"LoadSimulationsSettings": [
{ "KeepConstant": [500, "00:00:20"] }
{ "KeepConstant": [100, "00:00:20"] }
],

"CustomSettings": {
"MqttServerUrl": "ws://localhost:8083/mqtt",
"ClientCount": 500,
"ClientCount": 100,
"MsgSizeBytes": 200
}
}
Expand Down
86 changes: 0 additions & 86 deletions examples/Demo/MQTT/ConstantRate/ConstantPublishRate.cs

This file was deleted.

9 changes: 0 additions & 9 deletions examples/Demo/MQTT/ConstantRate/docker-compose.yaml

This file was deleted.

54 changes: 19 additions & 35 deletions examples/Demo/MQTT/PingPongMqttTest.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
using NBomber.CSharp;
using NBomber.Data;
using MqttClient = NBomber.MQTT.MqttClient;

namespace Demo.MQTT;

Expand All @@ -15,60 +14,45 @@ public void Run()

var scenario = Scenario.Create("ping_pong_mqtt_scenario", async ctx =>
{
using var mqttClient = new MqttFactory().CreateMqttClient();
var topic = $"/clients/{ctx.ScenarioInfo.ThreadId}";
var promise = new TaskCompletionSource<MqttApplicationMessage>();
using var client = new MqttClient(new MqttFactory().CreateMqttClient());
var topic = $"/clients/{ctx.ScenarioInfo.InstanceId}";

var connect = await Step.Run("connect", ctx, async () =>
{
var clientOptions = new MqttClientOptionsBuilder()
.WithWebSocketServer("ws://localhost:8083/mqtt")
.WithWebSocketServer(options => options.WithUri("ws://localhost:8083/mqtt"))
.WithCleanSession()
.WithClientId($"client_{ctx.ScenarioInfo.ThreadId}")
.WithClientId($"client_{ctx.ScenarioInfo.InstanceId}")
.Build();

var result = await mqttClient.ConnectAsync(clientOptions);
return result.ResultCode == MqttClientConnectResultCode.Success
? Response.Ok()
: Response.Fail(
statusCode: MqttClientConnectResultCode.Success.ToString(),
message: $"MQTT connection code is: {result.ResultCode}, reason: {result.ReasonString}"
);
var response = await client.Connect(clientOptions);
return response;
});

var subscribe = await Step.Run("subscribe", ctx, async () =>
{
mqttClient.UseApplicationMessageReceivedHandler(msg =>
{
promise.TrySetResult(msg.ApplicationMessage);
});

await mqttClient.SubscribeAsync(topic);

return Response.Ok();
});
var subscribe = await Step.Run("subscribe", ctx, () => client.Subscribe(topic));

var publish = await Step.Run("publish", ctx, async () =>
{
await mqttClient.PublishAsync(topic, payload);
return Response.Ok(sizeBytes: payload.Length);
var msg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.Build();

var response = await client.Publish(msg);
return response;
});

var receive = await Step.Run("receive", ctx, async () =>
{
var msg = await promise.Task;
return Response.Ok(sizeBytes: msg.Payload.Length);
var response = await client.Receive();
return response;
});

var disconnect = await Step.Run("disconnect", ctx, async () =>
{
await mqttClient.DisconnectAsync();
return Response.Ok();
});
var disconnect = await Step.Run("disconnect", ctx, () => client.Disconnect());

return Response.Ok();
})
.WithoutWarmUp()
.WithWarmUpDuration(TimeSpan.FromSeconds(3))
.WithLoadSimulations(
Simulation.KeepConstant(1, TimeSpan.FromSeconds(30))
);
Expand Down
3 changes: 1 addition & 2 deletions examples/Demo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
using Demo.MQTT;
using Demo.HTTP.WebAppSimulator;
using Demo.HTTP.SimpleBookstore;
using Demo.MQTT.ConstantRate;
using Demo.MQTT.ClientPool;
using Demo.WebSockets;

// -------------------------------
Expand Down Expand Up @@ -92,7 +92,6 @@
// ----- MQTT -----
// ----------------
// new PingPongMqttTest().Run();
// new ConstantPublishToMqtt().Run();
// new ClientPoolMqttExample().Run();

// ----------------
Expand Down

0 comments on commit e346c41

Please sign in to comment.