Skip to content

Commit

Permalink
[Bundle] Parallel bundle perf improvements (#3727)
Browse files Browse the repository at this point in the history
* Switch from Task.WaitAll to Task.WhenAll
* Removing calls to GC
  • Loading branch information
fhibf authored Mar 1, 2024
1 parent e34b3f6 commit 5a96218
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationO
// The database will be responsible for handling it internally.
MergeOptions mergeOptions = new MergeOptions(enlistTransaction: false);

// 2 - Merge all resources in the data base.
// 2 - Merge all resources in the database.
IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome> response = await _dataStore.MergeAsync(_resources.Values.ToList(), mergeOptions, cancellationToken);

SetStatusSafe(BundleOrchestratorOperationStatus.Completed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,8 @@ private static void AddHeaderIfNeeded(string headerKey, string headerValue, Http

private async Task<EntryComponent> ExecuteRequestsWithSingleHttpVerbInSequenceAsync(Hl7.Fhir.Model.Bundle responseBundle, HTTPVerb httpVerb, EntryComponent throttledEntryComponent, BundleHandlerStatistics statistics, CancellationToken cancellationToken)
{
const int GCCollectTrigger = 150;

foreach (ResourceExecutionContext resourceContext in _requests[httpVerb])
{
if (resourceContext.Index % GCCollectTrigger == 0 && resourceContext.Index > 0)
{
RunGarbageCollection();
}

EntryComponent entryComponent;

Stopwatch watch = Stopwatch.StartNew();
Expand Down Expand Up @@ -873,25 +866,6 @@ private void PopulateReferenceIdDictionary(IEnumerable<EntryComponent> bundleEnt
}
}

private void RunGarbageCollection()
{
try
{
_logger.LogInformation("{Origin} - MemoryWatch - Memory used before collection: {MemoryInUse:N0}", nameof(BundleHandler), GC.GetTotalMemory(forceFullCollection: false));

// Collecting memory up to Generation 2 using default collection mode.
// No blocking, allowing a collection to be performed as soon as possible, if another collection is not in progress.
// SOH compacting is set to true.
GC.Collect(GC.MaxGeneration, GCCollectionMode.Default, blocking: false, compacting: true);

_logger.LogInformation("{Origin} - MemoryWatch - Memory used after full collection: {MemoryInUse:N0}", nameof(BundleHandler), GC.GetTotalMemory(forceFullCollection: false));
}
catch (Exception ex)
{
_logger.LogInformation(ex, "{Origin} - MemoryWatch - Error running garbage collection.", nameof(BundleHandler));
}
}

private static OperationOutcome CreateOperationOutcome(OperationOutcome.IssueSeverity issueSeverity, OperationOutcome.IssueType issueType, string diagnostics)
{
return new OperationOutcome
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ private async Task<EntryComponent> ExecuteRequestsInParallelAsync(
return await Task.FromResult(throttledEntryComponent);
}

const int GCCollectTrigger = 150;

using (CancellationTokenSource requestCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
IAuditEventTypeMapping auditEventTypeMapping = _auditEventTypeMapping;
Expand All @@ -84,11 +82,6 @@ private async Task<EntryComponent> ExecuteRequestsInParallelAsync(
// Parallel Resource Handling Function.
Func<ResourceExecutionContext, CancellationToken, Task> handleRequestFunction = async (ResourceExecutionContext resourceExecutionContext, CancellationToken ct) =>
{
if (resourceExecutionContext.Index > 0 && resourceExecutionContext.Index % GCCollectTrigger == 0)
{
RunGarbageCollection();
}

_logger.LogInformation("BundleHandler - Running '{HttpVerb}' Request #{RequestNumber} out of {TotalNumberOfRequests}.", resourceExecutionContext.HttpVerb, resourceExecutionContext.Index, bundleOperation.OriginalExpectedNumberOfResources);

// Creating new instances per record in the bundle, and making their access thread-safe.
Expand Down Expand Up @@ -147,12 +140,11 @@ private async Task<EntryComponent> ExecuteRequestsInParallelAsync(

try
{
// The following Task.WaitAll should wait for all requests to finish.
// The following Task.WhenAll should wait for all requests to finish.

// Parallel requests are not supossed to raise exceptions, unless they are FhirTransactionFailedExceptions.
// Parallel requests are not supposed to raise exceptions, unless they are FhirTransactionFailedExceptions.
// FhirTransactionFailedExceptions are a special case to invalidate an entire bundle.

Task.WaitAll(requestsPerResource.ToArray(), cancellationToken);
await Task.WhenAll(requestsPerResource);
}
catch (AggregateException age)
{
Expand Down

0 comments on commit 5a96218

Please sign in to comment.