Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics-collector - split D2C messages greater than limit #6904

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion edge-modules/metrics-collector/src/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ public static class Constants
public const string DefaultLogAnalyticsWorkspaceDomainPrefixOds = ".ods.opinsights.";
public const string DefaultLogAnalyticsWorkspaceDomainPrefixOms = ".oms.opinsights.";
public const string ProductInfo = "IoTEdgeMetricsCollectorModule";
public static readonly int MaxMessageSize = 225000;
}
}
}
73 changes: 57 additions & 16 deletions edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,14 @@ public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationTo
{
Preconditions.CheckNotNull(metrics, nameof(metrics));
IEnumerable<ExportMetric> outputMetrics = metrics.Select(m => new ExportMetric(m));
List<IEnumerable<ExportMetric>> splitMetrics = new List<IEnumerable<ExportMetric>> { outputMetrics };
IEnumerable<Message> messagesToSend = BatchAndBuildMessages(splitMetrics);

string outputString = JsonConvert.SerializeObject(outputMetrics);

if (Settings.Current.TransformForIoTCentral)
{
outputString = Transform(outputMetrics);
}

byte[] metricsData = Encoding.UTF8.GetBytes(outputString);
if (Settings.Current.CompressForUpload)
foreach (Message metricsMessage in messagesToSend)
{
metricsData = Compression.CompressToGzip(metricsData);
await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage);
}

Message metricsMessage = new Message(metricsData);
metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier;

await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage);

return true;
}
catch (Exception e)
Expand All @@ -59,6 +48,58 @@ public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationTo
}
}

private byte[] serializeMetrics(IEnumerable<ExportMetric> outputMetrics)
{
string outputString = JsonConvert.SerializeObject(outputMetrics);

if (Settings.Current.TransformForIoTCentral)
{
outputString = Transform(outputMetrics);
}

byte[] metricsData = Encoding.UTF8.GetBytes(outputString);

if (Settings.Current.CompressForUpload)
{
metricsData = Compression.CompressToGzip(metricsData);
}

return metricsData;
}

private IEnumerable<Message> BatchAndBuildMessages(List<IEnumerable<ExportMetric>> splitMetrics)
{

List<Message> messageList = new List<Message>();
byte[] metricsData = serializeMetrics(splitMetrics.Last());
if (metricsData.Length > Constants.MaxMessageSize)
{
LoggerUtil.Writer.LogInformation($"IoT message is {metricsData.Length} bytes, splitting messages...");
List<IEnumerable<ExportMetric>> splitAgainMetrics = new List<IEnumerable<ExportMetric>>();
foreach (IEnumerable<ExportMetric> metrics in splitMetrics)
{
ExportMetric[] metricsarray = metrics.ToArray();
ExportMetric[] firstArray = metricsarray.Take(metricsarray.Length / 2).ToArray();
splitAgainMetrics.Add(firstArray);
ExportMetric[] secondArray = metricsarray.Skip(metricsarray.Length / 2).ToArray();
splitAgainMetrics.Add(secondArray);
}

return BatchAndBuildMessages(splitAgainMetrics);
}
else
{
foreach (IEnumerable<ExportMetric> metrics in splitMetrics)
{
metricsData = serializeMetrics(metrics);
Message metricsMessage = new Message(metricsData);
metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier;
messageList.Add(metricsMessage);
}
return messageList;
}
}

private string Transform(IEnumerable<ExportMetric> metrics)
{
DateTime timeGeneratedUtc = DateTime.MaxValue;
Expand Down Expand Up @@ -267,4 +308,4 @@ public ExportMetric(Metric baseMetric)
}
}
}
}
}