From 885559f3b4f22b574f786a2cce3c34c44ee47092 Mon Sep 17 00:00:00 2001 From: Liam Davey Date: Mon, 13 Feb 2023 14:10:37 +1000 Subject: [PATCH 1/3] metrics-collector - split D2C messages before sending upstream if messages are greater than limit --- .../metrics-collector/src/Constants.cs | 3 +- .../src/IotHubUpload/IotHubUpload.cs | 73 +++++++++++++++---- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/edge-modules/metrics-collector/src/Constants.cs b/edge-modules/metrics-collector/src/Constants.cs index 0a6543b10b0..364d83c2684 100755 --- a/edge-modules/metrics-collector/src/Constants.cs +++ b/edge-modules/metrics-collector/src/Constants.cs @@ -13,5 +13,6 @@ public static class Constants public const string DefaultLogAnalyticsWorkspaceDomainPrefixOds = ".ods.opinsights."; public const string DefaultLogAnalyticsWorkspaceDomainPrefixOms = ".oms.opinsights."; public const string ProductInfo = "IoTEdgeMetricsCollectorModule"; + public static readonly int MaxMessageSize = 255000; } -} +} \ No newline at end of file diff --git a/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs b/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs index 5d1c4e0c458..4c83edb52f3 100755 --- a/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs +++ b/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs @@ -31,25 +31,14 @@ public async Task PublishAsync(IEnumerable metrics, CancellationTo { Preconditions.CheckNotNull(metrics, nameof(metrics)); IEnumerable outputMetrics = metrics.Select(m => new ExportMetric(m)); + List> splitMetrics = new List> { outputMetrics }; + IEnumerable messagesToSend = BatchAndBuildMessages(splitMetrics); - string outputString = JsonConvert.SerializeObject(outputMetrics); - - if (Settings.Current.TransformForIoTCentral) - { - outputString = Transform(outputMetrics); - } - - byte[] metricsData = Encoding.UTF8.GetBytes(outputString); - if (Settings.Current.CompressForUpload) + foreach (Message metricsMessage in messagesToSend) { - metricsData = Compression.CompressToGzip(metricsData); + await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage); } - Message metricsMessage = new Message(metricsData); - metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier; - - await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage); - return true; } catch (Exception e) @@ -59,6 +48,58 @@ public async Task PublishAsync(IEnumerable metrics, CancellationTo } } + private byte[] serializeMetrics(IEnumerable outputMetrics) + { + string outputString = JsonConvert.SerializeObject(outputMetrics); + + if (Settings.Current.TransformForIoTCentral) + { + outputString = Transform(outputMetrics); + } + + byte[] metricsData = Encoding.UTF8.GetBytes(outputString); + + if (Settings.Current.CompressForUpload) + { + metricsData = Compression.CompressToGzip(metricsData); + } + + return metricsData; + } + + private IEnumerable BatchAndBuildMessages(List> splitMetrics) + { + + List messageList = new List(); + byte[] metricsData = serializeMetrics(splitMetrics.Last()); + if (metricsData.Length > Constants.MaxMessageSize) + { + LoggerUtil.Writer.LogInformation($"IoT message is {metricsData.Length} bytes, splitting messages..."); + List> splitAgainMetrics = new List>(); + foreach (IEnumerable metrics in splitMetrics) + { + ExportMetric[] metricsarray = metrics.ToArray(); + ExportMetric[] firstArray = metricsarray.Take(metricsarray.Length / 2).ToArray(); + splitAgainMetrics.Add(firstArray); + ExportMetric[] secondArray = metricsarray.Skip(metricsarray.Length / 2).ToArray(); + splitAgainMetrics.Add(secondArray); + } + + return BatchAndBuildMessages(splitAgainMetrics); + } + else + { + foreach (IEnumerable metrics in splitMetrics) + { + metricsData = serializeMetrics(metrics); + Message metricsMessage = new Message(metricsData); + metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier; + messageList.Add(metricsMessage); + } + return messageList; + } + } + private string Transform(IEnumerable metrics) { DateTime timeGeneratedUtc = DateTime.MaxValue; @@ -267,4 +308,4 @@ public ExportMetric(Metric baseMetric) } } } -} +} \ No newline at end of file From 73c741d03f3e661ac35d6f5800f48e7111959a2e Mon Sep 17 00:00:00 2001 From: Liam Davey <104245487+LiamDavey@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:27:04 +1000 Subject: [PATCH 2/3] reduced metrics-collector max message size to prevent errors --- edge-modules/metrics-collector/src/Constants.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/edge-modules/metrics-collector/src/Constants.cs b/edge-modules/metrics-collector/src/Constants.cs index 364d83c2684..b92f238e35a 100755 --- a/edge-modules/metrics-collector/src/Constants.cs +++ b/edge-modules/metrics-collector/src/Constants.cs @@ -13,6 +13,6 @@ public static class Constants public const string DefaultLogAnalyticsWorkspaceDomainPrefixOds = ".ods.opinsights."; public const string DefaultLogAnalyticsWorkspaceDomainPrefixOms = ".oms.opinsights."; public const string ProductInfo = "IoTEdgeMetricsCollectorModule"; - public static readonly int MaxMessageSize = 255000; + public static readonly int MaxMessageSize = 250000; } } \ No newline at end of file From 8d618c8bd5795edbf2194309721cd4a44d62de29 Mon Sep 17 00:00:00 2001 From: LiamDavey <104245487+LiamDavey@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:26:06 +1000 Subject: [PATCH 3/3] reduce split msg size to fix still breaching limit rarely --- edge-modules/metrics-collector/src/Constants.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/edge-modules/metrics-collector/src/Constants.cs b/edge-modules/metrics-collector/src/Constants.cs index b92f238e35a..e28f675fdbd 100755 --- a/edge-modules/metrics-collector/src/Constants.cs +++ b/edge-modules/metrics-collector/src/Constants.cs @@ -13,6 +13,6 @@ public static class Constants public const string DefaultLogAnalyticsWorkspaceDomainPrefixOds = ".ods.opinsights."; public const string DefaultLogAnalyticsWorkspaceDomainPrefixOms = ".oms.opinsights."; public const string ProductInfo = "IoTEdgeMetricsCollectorModule"; - public static readonly int MaxMessageSize = 250000; + public static readonly int MaxMessageSize = 225000; } } \ No newline at end of file