From 5a96218eff599cf513c3e7506dd52e1e3a19ab15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Henrique=20Inoc=C3=AAncio=20Borba=20Ferreira?= Date: Fri, 1 Mar 2024 09:55:59 -0800 Subject: [PATCH] [Bundle] Parallel bundle perf improvements (#3727) * Switch from Task.WaitAll to Task.WhenAll * Removing calls to GC --- .../BundleOrchestratorOperation.cs | 2 +- .../Resources/Bundle/BundleHandler.cs | 26 ------------------- .../Bundle/BundleHandlerParallelOperations.cs | 14 +++------- 3 files changed, 4 insertions(+), 38 deletions(-) diff --git a/src/Microsoft.Health.Fhir.Core/Features/Persistence/Orchestration/BundleOrchestratorOperation.cs b/src/Microsoft.Health.Fhir.Core/Features/Persistence/Orchestration/BundleOrchestratorOperation.cs index 804a10e707..f64614925b 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Persistence/Orchestration/BundleOrchestratorOperation.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Persistence/Orchestration/BundleOrchestratorOperation.cs @@ -316,7 +316,7 @@ private async Task response = await _dataStore.MergeAsync(_resources.Values.ToList(), mergeOptions, cancellationToken); SetStatusSafe(BundleOrchestratorOperationStatus.Completed); diff --git a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs index 9165d2bed6..78f682474a 100644 --- a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs +++ b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandler.cs @@ -589,15 +589,8 @@ private static void AddHeaderIfNeeded(string headerKey, string headerValue, Http private async Task ExecuteRequestsWithSingleHttpVerbInSequenceAsync(Hl7.Fhir.Model.Bundle responseBundle, HTTPVerb httpVerb, EntryComponent throttledEntryComponent, BundleHandlerStatistics statistics, CancellationToken cancellationToken) { - const int GCCollectTrigger = 150; - foreach (ResourceExecutionContext resourceContext in _requests[httpVerb]) { - if (resourceContext.Index % GCCollectTrigger == 0 && resourceContext.Index > 0) - { - RunGarbageCollection(); - } - EntryComponent entryComponent; Stopwatch watch = Stopwatch.StartNew(); @@ -873,25 +866,6 @@ private void PopulateReferenceIdDictionary(IEnumerable bundleEnt } } - private void RunGarbageCollection() - { - try - { - _logger.LogInformation("{Origin} - MemoryWatch - Memory used before collection: {MemoryInUse:N0}", nameof(BundleHandler), GC.GetTotalMemory(forceFullCollection: false)); - - // Collecting memory up to Generation 2 using default collection mode. - // No blocking, allowing a collection to be performed as soon as possible, if another collection is not in progress. - // SOH compacting is set to true. - GC.Collect(GC.MaxGeneration, GCCollectionMode.Default, blocking: false, compacting: true); - - _logger.LogInformation("{Origin} - MemoryWatch - Memory used after full collection: {MemoryInUse:N0}", nameof(BundleHandler), GC.GetTotalMemory(forceFullCollection: false)); - } - catch (Exception ex) - { - _logger.LogInformation(ex, "{Origin} - MemoryWatch - Error running garbage collection.", nameof(BundleHandler)); - } - } - private static OperationOutcome CreateOperationOutcome(OperationOutcome.IssueSeverity issueSeverity, OperationOutcome.IssueType issueType, string diagnostics) { return new OperationOutcome diff --git a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandlerParallelOperations.cs b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandlerParallelOperations.cs index eb0072e7f5..fbeabf35cb 100644 --- a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandlerParallelOperations.cs +++ b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/Bundle/BundleHandlerParallelOperations.cs @@ -72,8 +72,6 @@ private async Task ExecuteRequestsInParallelAsync( return await Task.FromResult(throttledEntryComponent); } - const int GCCollectTrigger = 150; - using (CancellationTokenSource requestCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { IAuditEventTypeMapping auditEventTypeMapping = _auditEventTypeMapping; @@ -84,11 +82,6 @@ private async Task ExecuteRequestsInParallelAsync( // Parallel Resource Handling Function. Func handleRequestFunction = async (ResourceExecutionContext resourceExecutionContext, CancellationToken ct) => { - if (resourceExecutionContext.Index > 0 && resourceExecutionContext.Index % GCCollectTrigger == 0) - { - RunGarbageCollection(); - } - _logger.LogInformation("BundleHandler - Running '{HttpVerb}' Request #{RequestNumber} out of {TotalNumberOfRequests}.", resourceExecutionContext.HttpVerb, resourceExecutionContext.Index, bundleOperation.OriginalExpectedNumberOfResources); // Creating new instances per record in the bundle, and making their access thread-safe. @@ -147,12 +140,11 @@ private async Task ExecuteRequestsInParallelAsync( try { - // The following Task.WaitAll should wait for all requests to finish. + // The following Task.WhenAll should wait for all requests to finish. - // Parallel requests are not supossed to raise exceptions, unless they are FhirTransactionFailedExceptions. + // Parallel requests are not supposed to raise exceptions, unless they are FhirTransactionFailedExceptions. // FhirTransactionFailedExceptions are a special case to invalidate an entire bundle. - - Task.WaitAll(requestsPerResource.ToArray(), cancellationToken); + await Task.WhenAll(requestsPerResource); } catch (AggregateException age) {