Skip to content

Commit

Permalink
Retain the copy message dotnet#2113
Browse files Browse the repository at this point in the history
  • Loading branch information
xljiulang committed Nov 22, 2024
1 parent 63d8518 commit 6e49966
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 28 deletions.
78 changes: 50 additions & 28 deletions Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Collections;
using System.Collections.Concurrent;
using MQTTnet.Adapter;
Expand Down Expand Up @@ -159,9 +160,10 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
var matchingSubscribersCount = 0;
try
{
var clonedMessage = CloneApplicationMessage(applicationMessage);
if (applicationMessage.Retain)
{
await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false);
await _retainedMessagesManager.UpdateMessage(senderId, clonedMessage.Value).ConfigureAwait(false);
}

List<MqttSession> subscriberSessions;
Expand Down Expand Up @@ -204,7 +206,7 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
}
}

var publishPacketCopy = MqttPublishPacketFactory.Create(applicationMessage);
var publishPacketCopy = MqttPublishPacketFactory.Create(clonedMessage.Value);
publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;
publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers;

Expand Down Expand Up @@ -256,6 +258,26 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties);
}

static Lazy<MqttApplicationMessage> CloneApplicationMessage(MqttApplicationMessage m)
{
return new Lazy<MqttApplicationMessage>(() => new MqttApplicationMessage
{
ContentType = m.ContentType,
CorrelationData = m.CorrelationData?.ToArray(),
Dup = m.Dup,
MessageExpiryInterval = m.MessageExpiryInterval,
Payload = m.Payload.IsEmpty ? default : new ReadOnlySequence<byte>(m.Payload.ToArray()),
PayloadFormatIndicator = m.PayloadFormatIndicator,
QualityOfServiceLevel = m.QualityOfServiceLevel,
Retain = m.Retain,
ResponseTopic = m.ResponseTopic,
Topic = m.Topic,
UserProperties = m.UserProperties?.Select(u => u.Clone()).ToList(),
SubscriptionIdentifiers = m.SubscriptionIdentifiers?.ToList(),
TopicAlias = m.TopicAlias
});
}

public void Dispose()
{
_createConnectionSyncRoot.Dispose();
Expand Down Expand Up @@ -671,39 +693,39 @@ static bool ShouldPersistSession(MqttConnectedClient connectedClient)
switch (connectedClient.ChannelAdapter.PacketFormatterAdapter.ProtocolVersion)
{
case MqttProtocolVersion.V500:
{
// MQTT 5.0 section 3.1.2.11.2
// The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23].
//
// A Client that only wants to process messages while connected will set the Clean Start to 1 and set the Session Expiry Interval to 0.
// It will not receive Application Messages published before it connected and has to subscribe afresh to any topics that it is interested
// in each time it connects.

var effectiveSessionExpiryInterval = connectedClient.DisconnectPacket?.SessionExpiryInterval ?? 0U;
if (effectiveSessionExpiryInterval == 0U)
{
// From RFC: If the Session Expiry Interval is absent, the Session Expiry Interval in the CONNECT packet is used.
effectiveSessionExpiryInterval = connectedClient.ConnectPacket.SessionExpiryInterval;
}
// MQTT 5.0 section 3.1.2.11.2
// The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23].
//
// A Client that only wants to process messages while connected will set the Clean Start to 1 and set the Session Expiry Interval to 0.
// It will not receive Application Messages published before it connected and has to subscribe afresh to any topics that it is interested
// in each time it connects.

var effectiveSessionExpiryInterval = connectedClient.DisconnectPacket?.SessionExpiryInterval ?? 0U;
if (effectiveSessionExpiryInterval == 0U)
{
// From RFC: If the Session Expiry Interval is absent, the Session Expiry Interval in the CONNECT packet is used.
effectiveSessionExpiryInterval = connectedClient.ConnectPacket.SessionExpiryInterval;
}

return effectiveSessionExpiryInterval != 0U;
}
return effectiveSessionExpiryInterval != 0U;
}

case MqttProtocolVersion.V311:
{
// MQTT 3.1.1 section 3.1.2.4: persist only if 'not CleanSession'
//
// If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one.
// This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be
// reused in any subsequent Session [MQTT-3.1.2-6].
{
// MQTT 3.1.1 section 3.1.2.4: persist only if 'not CleanSession'
//
// If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one.
// This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be
// reused in any subsequent Session [MQTT-3.1.2-6].

return !connectedClient.ConnectPacket.CleanSession;
}
return !connectedClient.ConnectPacket.CleanSession;
}

case MqttProtocolVersion.V310:
{
return true;
}
{
return true;
}

default:
throw new NotSupportedException();
Expand Down
5 changes: 5 additions & 0 deletions Source/MQTTnet/Packets/MqttUserProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public override string ToString()
{
return $"{Name} = {Value}";
}

public MqttUserProperty Clone()
{
return new MqttUserProperty(Name, Value);
}
}

0 comments on commit 6e49966

Please sign in to comment.