From 97c3784078c529b9430c4cede2c5db39801e6a90 Mon Sep 17 00:00:00 2001 From: Karthik Balasubramanian Date: Fri, 10 Nov 2023 14:46:06 -0800 Subject: [PATCH] Dicom bulk update improvements (#3175) * Dicom bulk update improvements * Fix merge conflict * Fix comment * fix Tostring() * Fix comment * Update doc * Fix logging metric * Fix comment * Add IsExternal --- docs/concepts/bulk-update.md | 53 +- .../Model/VersionedInstanceIdentifier.cs | 2 +- .../Features/Update/UpdateRequestValidator.cs | 3 - .../host.json | 2 +- .../UpdateDurableFunctionTests.Activity.cs | 129 +-- ...pdateDurableFunctionTests.Orchestration.cs | 934 +++--------------- .../Update/Models/CleanupBlobArguments.cs | 25 - .../Update/Models/UpdateInstanceResponse.cs | 23 + .../Update/UpdateDurableFunction.Activity.cs | 201 ++-- .../UpdateDurableFunction.Orchestration.cs | 120 ++- .../appsettings.json | 2 +- 11 files changed, 414 insertions(+), 1080 deletions(-) delete mode 100644 src/Microsoft.Health.Dicom.Functions/Update/Models/CleanupBlobArguments.cs create mode 100644 src/Microsoft.Health.Dicom.Functions/Update/Models/UpdateInstanceResponse.cs diff --git a/docs/concepts/bulk-update.md b/docs/concepts/bulk-update.md index 6ab2b44b96..aa4852c3f0 100644 --- a/docs/concepts/bulk-update.md +++ b/docs/concepts/bulk-update.md @@ -63,14 +63,21 @@ POST ...v1/partitions/{PartitionName}/studies/$bulkUpdate #### Request Body +Below `UpdateSpecification` is passed as the request body. The `UpdateSpecification` needs both `studyInstanceUids` and `changeDataset` to be specified. + ```json { - "studyInstanceUids": ["12.3.4.5"], - "changeDataset": { - "00100010": { - "vr": "LO", - "Value": ["New patient name"] - } + "studyInstanceUids": ["1.113654.3.13.1026"], + "changeDataset": { + "00100010": { + "vr": "PN", + "Value": + [ + { + "Alphabetic": "New Patient Name 1" + } + ] + } } } ``` @@ -89,7 +96,7 @@ Content-Type: application/json | Name | Type | Description | | ----------------- | ------------------------------------------- | ------------------------------------------------------------ | -| 202 (Accepted) | [Operation Reference](#operation-reference) | Extended query tag(s) have been added, and a long-running operation has been started to re-index existing DICOM instances | +| 202 (Accepted) | [Operation Reference](#operation-reference) | A long-running operation has been started to update DICOM attributes | | 400 (Bad Request) | | Request body has invalid data | ### Operation Status @@ -107,6 +114,8 @@ GET .../operations/{operationId} #### Responses +**Successful response** + ```json { "operationId": "1323c079a1b64efcb8943ef7707b5438", @@ -117,12 +126,34 @@ GET .../operations/{operationId} "percentComplete": 100, "results": { "studyUpdated": 1, - "instanceUpdated": 16, - // Errors will go here + "instanceUpdated": 16 + } +} +``` + +**Failure respose** +``` +{ + "operationId": "1323c079a1b64efcb8943ef7707b5438", + "type": "update", + "createdTime": "2023-05-08T05:01:30.1441374Z", + "lastUpdatedTime": "2023-05-08T05:01:42.9067335Z", + "status": "failed", + "percentComplete": 100, + "results": { + "studyUpdated": 0, + "studyFailed": 1, + "instanceUpdated": 0, + "errors": [ + "Failed to update instances for study 1.113654.3.13.1026" + ] } } ``` +If there are any instance specific exception, it will be added to the `errors` list. It will include all the UIDs of the instance like +`Instance UIDs - PartitionKey: 1, StudyInstanceUID: 1.113654.3.13.1026, SeriesInstanceUID: 1.113654.3.13.1035, SOPInstanceUID: 1.113654.3.13.1510` + | Name | Type | Description | | --------------- | ----------------------- | -------------------------------------------- | | 200 (OK) | [Operation](#operation) | The operation with the specified ID has completed | @@ -178,4 +209,6 @@ There is no change in other APIs. All the other APIs supports only latest versio > Only one update operation can be performed at a time. -> There is no way to delete only the latest version or revert back to original version. \ No newline at end of file +> There is no way to delete only the latest version or revert back to original version. + +> QIDO doesn't support querying extended query tag after it is updated. \ No newline at end of file diff --git a/src/Microsoft.Health.Dicom.Core/Features/Model/VersionedInstanceIdentifier.cs b/src/Microsoft.Health.Dicom.Core/Features/Model/VersionedInstanceIdentifier.cs index 82297dcde9..44988b43b3 100644 --- a/src/Microsoft.Health.Dicom.Core/Features/Model/VersionedInstanceIdentifier.cs +++ b/src/Microsoft.Health.Dicom.Core/Features/Model/VersionedInstanceIdentifier.cs @@ -36,5 +36,5 @@ public override int GetHashCode() => base.GetHashCode() ^ Version.GetHashCode(); public override string ToString() - => base.ToString() + $"Version: {Version}"; + => base.ToString() + $", Version: {Version}"; } diff --git a/src/Microsoft.Health.Dicom.Core/Features/Update/UpdateRequestValidator.cs b/src/Microsoft.Health.Dicom.Core/Features/Update/UpdateRequestValidator.cs index d238cfa9b1..bc840538c7 100644 --- a/src/Microsoft.Health.Dicom.Core/Features/Update/UpdateRequestValidator.cs +++ b/src/Microsoft.Health.Dicom.Core/Features/Update/UpdateRequestValidator.cs @@ -81,7 +81,4 @@ public static DicomDataset ValidateDicomDataset(DicomDataset dataset) } return failedSop; } - - private static string GetCommaSeparatedTags(List tags) - => string.Join(", ", tags.Select(x => $"'{x}'")); } diff --git a/src/Microsoft.Health.Dicom.Functions.App/host.json b/src/Microsoft.Health.Dicom.Functions.App/host.json index 1136e13dfb..06a4ded735 100644 --- a/src/Microsoft.Health.Dicom.Functions.App/host.json +++ b/src/Microsoft.Health.Dicom.Functions.App/host.json @@ -89,7 +89,7 @@ "Frequency": "0 0 * * *", "MinimumAgeDays": 7, "Statuses": [ "Completed" ], - "ExcludeFunctions": [ "MigratingFrameRangeFilesAsync", "UpdateInstancesV2Async", "UpdateInstancesV3Async" ] + "ExcludeFunctions": [ "MigratingFrameRangeFilesAsync", "UpdateInstancesV3Async", "UpdateInstancesV4Async" ] }, "SqlServer": { "Retry": { diff --git a/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Activity.cs b/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Activity.cs index 42af55f812..109533d96c 100644 --- a/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Activity.cs +++ b/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Activity.cs @@ -3,19 +3,20 @@ // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- -using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using FellowOakDicom; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Health.Dicom.Core.Exceptions; using Microsoft.Health.Dicom.Core.Features.Common; using Microsoft.Health.Dicom.Core.Features.Model; using Microsoft.Health.Dicom.Core.Features.Partitioning; using Microsoft.Health.Dicom.Functions.Update.Models; using Microsoft.Health.Dicom.Tests.Common; using NSubstitute; +using NSubstitute.ExceptionExtensions; using Xunit; namespace Microsoft.Health.Dicom.Functions.UnitTests.Update; @@ -81,7 +82,7 @@ public async Task GivenInstanceMetadata_WhenUpdatingBlobInBatches_ThenShouldUpda .Returns(DefaultFileProperties); } - await _updateDurableFunction.UpdateInstanceBlobsV2Async( + await _updateDurableFunction.UpdateInstanceBlobsV3Async( new UpdateInstanceBlobArgumentsV2(Partition.Default, expected, dataset), NullLogger.Instance); @@ -95,6 +96,41 @@ await _updateInstanceService } } + [Fact] + public async Task GivenInstanceMetadata_WhenUpdatingWithDataStoreException_ThenShouldReturnFailureCorrectly() + { + var studyInstanceUid = TestUidGenerator.Generate(); + var expected = GetInstanceIdentifiersList(studyInstanceUid); + + var dataset = "{\"00100010\":{\"vr\":\"PN\",\"Value\":[{\"Alphabetic\":\"Patient Name\"}]}}"; + + foreach (var instance in expected) + { + _updateInstanceService + .UpdateInstanceBlobAsync( + instance, + Arg.Is(x => x.GetSingleValue(DicomTag.PatientName) == "Patient Name"), + Partition.Default, + Arg.Any()) + .ThrowsAsync(new DataStoreException("Error")); + } + + var response = await _updateDurableFunction.UpdateInstanceBlobsV3Async( + new UpdateInstanceBlobArgumentsV2(Partition.Default, expected, dataset), + NullLogger.Instance); + + foreach (var instance in expected) + { + await _updateInstanceService + .Received(1) + .UpdateInstanceBlobAsync(Arg.Is(GetPredicate(instance)), Arg.Is(x => x.GetSingleValue(DicomTag.PatientName) == "Patient Name"), + Partition.Default, + Arg.Any()); + } + + Assert.Equal(expected.Count, response.Errors.Count); + } + [Fact] public async Task GivenCompleteInstanceArgument_WhenCompleting_ThenShouldComplete() { @@ -108,7 +144,7 @@ public async Task GivenCompleteInstanceArgument_WhenCompleting_ThenShouldComplet { DicomTag.PatientName, "Patient Name" } }; - await _updateDurableFunction.CompleteUpdateStudyV2Async( + await _updateDurableFunction.CompleteUpdateStudyV3Async( new CompleteStudyArgumentsV2( Partition.DefaultKey, studyInstanceUid, @@ -121,35 +157,6 @@ await _indexStore .EndUpdateInstanceAsync(Partition.DefaultKey, studyInstanceUid, Arg.Is(x => x.GetSingleValue(DicomTag.PatientName) == "Patient Name"), instanceMetadataList, CancellationToken.None); } - [Fact] - [Obsolete("Obsolete")] - public async Task GivenInstanceUpdateFails_WhenDeleteFileV2_ThenShouldDeleteSuccessfully() - { - var studyInstanceUid = TestUidGenerator.Generate(); - var identifiers = GetInstanceIdentifiersList(studyInstanceUid, instanceProperty: new InstanceProperties { NewVersion = 1 }); - IReadOnlyList expected = identifiers.Select(x => - new InstanceFileState - { - Version = x.VersionedInstanceIdentifier.Version, - OriginalVersion = x.InstanceProperties.OriginalVersion, - NewVersion = x.InstanceProperties.NewVersion - }).Take(1).ToList(); - - _updateInstanceService - .DeleteInstanceBlobAsync(Arg.Any(), Partition.Default, null, Arg.Any()) - .Returns(Task.CompletedTask); - - // Call the activity - await _updateDurableFunction.CleanupNewVersionBlobV2Async( - new CleanupBlobArguments(expected, Partition.Default), - NullLogger.Instance); - - // Assert behavior - await _updateInstanceService - .Received(1) - .DeleteInstanceBlobAsync(Arg.Any(), Partition.Default, null, Arg.Any()); - } - [Fact] public async Task GivenInstanceUpdateFails_WhenDeleteFileWithV3_ThenShouldDeleteNewVersionSuccessfullyWithoutFileProperties() { @@ -194,35 +201,6 @@ await _updateInstanceService .DeleteInstanceBlobAsync(Arg.Any(), Partition.Default, DefaultFileProperties, Arg.Any()); } - [Fact] - [Obsolete("Obsolete")] - public async Task GivenInstanceMetadataList_WhenDeleteFileV2_ThenShouldDeleteSuccessfully() - { - var studyInstanceUid = TestUidGenerator.Generate(); - var identifiers = GetInstanceIdentifiersList(studyInstanceUid, partition: Partition.Default); - IReadOnlyList expected = identifiers.Select(x => - new InstanceFileState - { - Version = x.VersionedInstanceIdentifier.Version, - OriginalVersion = x.InstanceProperties.OriginalVersion, - NewVersion = x.InstanceProperties.NewVersion - }).Take(1).ToList(); - - _updateInstanceService - .DeleteInstanceBlobAsync(expected[0].Version, identifiers[0].VersionedInstanceIdentifier.Partition, null, Arg.Any()) - .Returns(Task.CompletedTask); - - // Call the activity - await _updateDurableFunction.DeleteOldVersionBlobV2Async( - new CleanupBlobArguments(expected, identifiers[0].VersionedInstanceIdentifier.Partition), - NullLogger.Instance); - - // Assert behavior - await _updateInstanceService - .Received(1) - .DeleteInstanceBlobAsync(expected[0].Version, identifiers[0].VersionedInstanceIdentifier.Partition, null, Arg.Any()); - } - [Fact] public async Task GivenInstanceMetadataList_WhenDeleteFileV3_ThenShouldDeleteSuccessfullyWithoutFileProperties() { @@ -276,35 +254,6 @@ await _updateInstanceService .DeleteInstanceBlobAsync(identifiers[0].VersionedInstanceIdentifier.Version, identifiers[0].VersionedInstanceIdentifier.Partition, identifiers[0].InstanceProperties.FileProperties, Arg.Any()); } - [Fact] - [Obsolete("Should use SetOriginalBlobToColdAccessTierV2Async instead")] - public async Task GivenInstanceMetadataList_WhenChangeAccessTier_ThenShoulChangeSuccessfully() - { - var studyInstanceUid = TestUidGenerator.Generate(); - var identifiers = GetInstanceIdentifiersList(studyInstanceUid, Partition.Default, new InstanceProperties { NewVersion = 2 }); - IReadOnlyList expected = identifiers.Select(x => - new InstanceFileState - { - Version = x.VersionedInstanceIdentifier.Version, - OriginalVersion = x.InstanceProperties.OriginalVersion, - NewVersion = x.InstanceProperties.NewVersion - }).Take(1).ToList(); - - _fileStore - .SetBlobToColdAccessTierAsync(Arg.Any(), Partition.Default, null, Arg.Any()) - .Returns(Task.CompletedTask); - - // Call the activity - await _updateDurableFunction.SetOriginalBlobToColdAccessTierAsync( - new CleanupBlobArguments(expected, Partition.Default), - NullLogger.Instance); - - // Assert behavior - await _fileStore - .Received(1) - .SetBlobToColdAccessTierAsync(Arg.Any(), Partition.Default, null, Arg.Any()); - } - [Fact] public async Task GivenInstanceMetadataList_WhenChangeAccessTierV2_ThenShouldChangeSuccessfullyUsingFileProperties() { diff --git a/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Orchestration.cs b/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Orchestration.cs index 25e4a3f75f..afe8b559f4 100644 --- a/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Orchestration.cs +++ b/src/Microsoft.Health.Dicom.Functions.UnitTests/Update/UpdateDurableFunctionTests.Orchestration.cs @@ -28,8 +28,7 @@ namespace Microsoft.Health.Dicom.Functions.UnitTests.Update; public partial class UpdateDurableFunctionTests { [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithInput_WhenUpdatingInstances_ThenComplete() + public async Task GivenV4OrchestrationWithInput_WhenUpdatingInstances_ThenComplete() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -80,737 +79,15 @@ public async Task GivenV2OrchestrationWithInput_WhenUpdatingInstances_ThenComple Arg.Any()) .Returns(instanceMetadataList); context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) - ) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - - // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV2Async(context, NullLogger.Instance); - - // Assert behavior - context - .Received(1) - .GetInput(); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) - ); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()); - context - .Received(1) - .ContinueAsNew( - Arg.Is(x => x.NumberOfStudyCompleted == 1), - false); - } - - [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithInputAndExternalStoreEnabled_WhenUpdatingInstances_ThenInstanceMetadataListWithFilePropertiesPassedInToCompleteUpdate() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - var expectedInput = GetUpdateCheckpoint(); - var studyInstanceUid = expectedInput.StudyInstanceUids[expectedInput.NumberOfStudyCompleted]; - - var expectedInstances = new List - { - new InstanceFileState - { - Version = 1 - }, - new InstanceFileState - { - Version = 2 - } - }; - - var expectedInstancesWithNewWatermark = new List - { - new InstanceFileState - { - Version = 1, - NewVersion = 3, - }, - new InstanceFileState - { - Version = 2, - NewVersion = 4, - } - }; - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - List instanceMetadataList = CreateExpectedInstanceMetadataList(expectedInstancesWithNewWatermark, studyInstanceUid); - - context - .GetInput() - .Returns(expectedInput); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) - ) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, instanceMetadataList))) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - expectedInstances) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - - // Invoke the orchestration - await _updateDurableFunctionWithExternalStore.UpdateInstancesV2Async(context, NullLogger.Instance); - - // Assert behavior - context - .Received(1) - .GetInput(); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) - ); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, instanceMetadataList))); - context - .Received(1) - .ContinueAsNew( - Arg.Is(x => x.NumberOfStudyCompleted == 1), - false); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()); - } - - [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithInputAndExternalStoreNotEnabled_WhenUpdatingInstances_ThenEmptyInstanceMetadataListPassedInToCompleteUpdate() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - var expectedInput = GetUpdateCheckpoint(); - var studyInstanceUid = expectedInput.StudyInstanceUids[expectedInput.NumberOfStudyCompleted]; - - var expectedInstances = new List - { - new InstanceFileState - { - Version = 1 - }, - new InstanceFileState - { - Version = 2 - } - }; - - var expectedInstancesWithNewWatermark = new List - { - new InstanceFileState - { - Version = 1, - NewVersion = 3, - }, - new InstanceFileState - { - Version = 2, - NewVersion = 4, - } - }; - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - List instanceMetadataList = CreateExpectedInstanceMetadataList(expectedInstancesWithNewWatermark); - - context - .GetInput() - .Returns(expectedInput); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) - ) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, new List()))) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - expectedInstances) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - - // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV2Async(context, NullLogger.Instance); - - // Assert behavior - context - .Received(1) - .GetInput(); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) - ); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, null, expectEmptyList: true))); - context - .Received(1) - .ContinueAsNew( - Arg.Is(x => x.NumberOfStudyCompleted == 1), - false); - } - - [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithNoInstancesFound_WhenUpdatingInstances_ThenComplete() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - DateTime createdTime = DateTime.UtcNow; - - var expectedInput = new UpdateCheckpoint - { - Partition = Partition.Default, - ChangeDataset = string.Empty, - StudyInstanceUids = new List { - TestUidGenerator.Generate() - }, - CreatedTime = createdTime, - }; - - var expectedInstances = new List(); - - var expectedInstancesWithNewWatermark = new List(); - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - List instanceMetadataList = CreateExpectedInstanceMetadataList(expectedInstancesWithNewWatermark); - - context - .GetInput() - .Returns(expectedInput); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - expectedInstances) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - - // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV2Async(context, NullLogger.Instance); - - // Assert behavior - context - .Received(1) - .GetInput(); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .DidNotReceive() - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))); - await context - .DidNotReceive() - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()); - context - .Received(1) - .ContinueAsNew( - Arg.Any(), - false); - await context - .DidNotReceive() - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .DidNotReceive() - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.SetOriginalBlobToColdAccessTierAsync), - _options.RetryOptions, - Arg.Any()); - - _meterProvider.ForceFlush(); - Assert.Empty(_exportedItems.Where(item => item.Name.Equals(_updateMeter.UpdatedInstances.Name, StringComparison.Ordinal))); - } - - - [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithInput_WhenUpdatingInstancesWithException_ThenFails() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - DateTime createdTime = DateTime.UtcNow; - - var expectedInput = new UpdateCheckpoint - { - Partition = Partition.Default, - ChangeDataset = string.Empty, - StudyInstanceUids = new List(), - CreatedTime = createdTime, - Errors = new List() - { - "Failed Study" - } - }; - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - context - .GetInput() - .Returns(expectedInput); - - // Invoke the orchestration - await Assert.ThrowsAsync(() => _updateDurableFunction.UpdateInstancesV2Async(context, NullLogger.Instance)); - - // Assert behavior - context - .Received(1) - .GetInput(); - await context - .DidNotReceive() - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .DidNotReceive() - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .DidNotReceive() - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()); - context - .DidNotReceive() - .ContinueAsNew( - Arg.Any(), - false); - - _meterProvider.ForceFlush(); - Assert.Empty(_exportedItems.Where(item => item.Name.Equals(_updateMeter.UpdatedInstances.Name, StringComparison.Ordinal))); - } - - [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithInput_WhenUpdatingInstancesWithException_ThenCallCleanupActivity() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - DateTime createdTime = DateTime.UtcNow; - - List instanceMetadataList = new List { - new InstanceMetadata( - new VersionedInstanceIdentifier( - TestUidGenerator.Generate(), - TestUidGenerator.Generate(), - TestUidGenerator.Generate(), - version: 1, - Partition.Default), - new InstanceProperties - { - FileProperties = new FileProperties { ETag = $"etag-{1}", Path = $"path-{1}" }, - NewVersion = 3 - } - ), - new InstanceMetadata( - new VersionedInstanceIdentifier( - TestUidGenerator.Generate(), - TestUidGenerator.Generate(), - TestUidGenerator.Generate(), - version: 2, - Partition.Default), - new InstanceProperties - { - FileProperties = new FileProperties { ETag = $"etag-{2}", Path = $"path-{2}" }, - NewVersion = 4 - } - ) - }; - - var expectedInstancesWithNewWatermark = instanceMetadataList.Select(x => x.ToInstanceFileState()).ToList(); - - var expectedInput = new UpdateCheckpoint - { - Partition = Partition.Default, - ChangeDataset = string.Empty, - StudyInstanceUids = instanceMetadataList.Select(x => x.VersionedInstanceIdentifier.StudyInstanceUid).ToList(), - CreatedTime = createdTime, - }; - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - context - .GetInput() - .Returns(expectedInput); - - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, Arg.Any()).Returns(instanceMetadataList); - - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) - .ThrowsAsync(new FunctionFailedException("Function failed")); - - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CleanupNewVersionBlobV2Async), - _options.RetryOptions, - expectedInstancesWithNewWatermark) - .Returns(Task.CompletedTask); - - // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV2Async(context, NullLogger.Instance); - - // Assert behavior - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CleanupNewVersionBlobV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(expectedInstancesWithNewWatermark, Partition.Default))); - - _meterProvider.ForceFlush(); - Assert.Empty(_exportedItems.Where(item => item.Name.Equals(_updateMeter.UpdatedInstances.Name, StringComparison.Ordinal))); - } - - [Fact] - [Obsolete("Obsolete")] - public async Task GivenV2OrchestrationWithInput_WhenUpdatingInstances_ThenCompleteWithUpdateProgress() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - DateTime createdTime = DateTime.UtcNow; - - var expectedInput = GetUpdateCheckpoint(); - - var expectedInstances = new List - { - new InstanceFileState - { - Version = 1 - }, - new InstanceFileState - { - Version = 2 - } - }; - - var expectedInstancesWithNewWatermark = new List - { - new InstanceFileState - { - Version = 1, - NewVersion = 3, - }, - new InstanceFileState - { - Version = 2, - NewVersion = 4, - } - }; - - List instanceMetadataList = CreateExpectedInstanceMetadataList(expectedInstancesWithNewWatermark); - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - context - .GetInput() - .Returns(expectedInput); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(Task.CompletedTask); - context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.DeleteOldVersionBlobV2Async), - _options.RetryOptions, - expectedInstances) - .Returns(Task.CompletedTask); - - // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV2Async(context, NullLogger.Instance); - - // Assert behavior - context - .Received(1) - .GetInput(); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()); - await context - .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), - _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))); - await context - .Received(1) - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), - _options.RetryOptions, - Arg.Any()); - context - .Received(1) - .ContinueAsNew( - Arg.Is(GetPredicate(expectedInstancesWithNewWatermark.Count, 1)), - false); - } - - [Fact] - public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstances_ThenComplete() - { - const int batchSize = 5; - _options.BatchSize = batchSize; - - DateTime createdTime = DateTime.UtcNow; - - var expectedInput = GetUpdateCheckpoint(); - - var expectedInstances = new List - { - new InstanceFileState - { - Version = 1 - }, - new InstanceFileState - { - Version = 2 - } - }; - - var expectedInstancesWithNewWatermark = new List - { - new InstanceFileState - { - Version = 1, - NewVersion = 3, - }, - new InstanceFileState - { - Version = 2, - NewVersion = 4, - } - }; - - List instanceMetadataList = CreateExpectedInstanceMetadataList(expectedInstancesWithNewWatermark); - - // Arrange the input - string operationId = OperationId.Generate(); - IDurableOrchestrationContext context = CreateContext(operationId); - - context - .GetInput() - .Returns(expectedInput); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), - _options.RetryOptions, - Arg.Any()) - .Returns(instanceMetadataList); - context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) ) - .Returns(instanceMetadataList); + .Returns(new UpdateInstanceResponse(instanceMetadataList, new List())); context .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()) .Returns(Task.CompletedTask); @@ -828,7 +105,7 @@ public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstances_ThenComple .Returns(Task.CompletedTask); // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV3Async(context, NullLogger.Instance); + await _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance); // Assert behavior context @@ -842,15 +119,15 @@ await context Arg.Any()); await context .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) ); await context .Received(1) .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()); await context @@ -873,7 +150,7 @@ await context } [Fact] - public async Task GivenV3OrchestrationWithInputAndExternalStoreEnabled_WhenUpdatingInstances_ThenInstanceMetadataListWithFilePropertiesPassedInToCompleteUpdate() + public async Task GivenV4OrchestrationWithInputAndExternalStoreEnabled_WhenUpdatingInstances_ThenInstanceMetadataListWithFilePropertiesPassedInToCompleteUpdate() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -923,15 +200,15 @@ public async Task GivenV3OrchestrationWithInputAndExternalStoreEnabled_WhenUpdat Arg.Any()) .Returns(instanceMetadataList); context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) ) - .Returns(instanceMetadataList); + .Returns(new UpdateInstanceResponse(instanceMetadataList, new List())); context .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, instanceMetadataList))) .Returns(Task.CompletedTask); @@ -949,7 +226,7 @@ public async Task GivenV3OrchestrationWithInputAndExternalStoreEnabled_WhenUpdat .Returns(Task.CompletedTask); // Invoke the orchestration - await _updateDurableFunctionWithExternalStore.UpdateInstancesV3Async(context, NullLogger.Instance); + await _updateDurableFunctionWithExternalStore.UpdateInstancesV4Async(context, NullLogger.Instance); // Assert behavior context @@ -963,15 +240,15 @@ await context Arg.Any()); await context .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) ); await context .Received(1) .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, instanceMetadataList))); context @@ -994,7 +271,7 @@ await context } [Fact] - public async Task GivenV3OrchestrationWithInputAndExternalStoreNotEnabled_WhenUpdatingInstances_ThenEmptyInstanceMetadataListPassedInToCompleteUpdate() + public async Task GivenV4OrchestrationWithInputAndExternalStoreNotEnabled_WhenUpdatingInstances_ThenEmptyInstanceMetadataListPassedInToCompleteUpdate() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -1044,15 +321,15 @@ public async Task GivenV3OrchestrationWithInputAndExternalStoreNotEnabled_WhenUp Arg.Any()) .Returns(instanceMetadataList); context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) ) - .Returns(instanceMetadataList); + .Returns(new UpdateInstanceResponse(instanceMetadataList, new List())); context .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, new List()))) .Returns(Task.CompletedTask); @@ -1070,7 +347,7 @@ public async Task GivenV3OrchestrationWithInputAndExternalStoreNotEnabled_WhenUp .Returns(Task.CompletedTask); // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV3Async(context, NullLogger.Instance); + await _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance); // Assert behavior context @@ -1084,15 +361,15 @@ await context Arg.Any()); await context .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) ); await context .Received(1) .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Is(GetPredicate(expectedInput.Partition.Key, studyInstanceUid, expectedInput.ChangeDataset, null, expectEmptyList: true))); context @@ -1104,7 +381,7 @@ await context [Fact] - public async Task GivenV3OrchestrationWithNoInstancesFound_WhenUpdatingInstances_ThenComplete() + public async Task GivenV4OrchestrationWithNoInstancesFound_WhenUpdatingInstances_ThenComplete() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -1141,14 +418,15 @@ public async Task GivenV3OrchestrationWithNoInstancesFound_WhenUpdatingInstances Arg.Any()) .Returns(instanceMetadataList); context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) - .Returns(Task.CompletedTask); + Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) + ) + .Returns(new UpdateInstanceResponse(instanceMetadataList, new List())); context .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()) .Returns(Task.CompletedTask); @@ -1166,7 +444,7 @@ public async Task GivenV3OrchestrationWithNoInstancesFound_WhenUpdatingInstances .Returns(Task.CompletedTask); // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV3Async(context, NullLogger.Instance); + await _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance); // Assert behavior context @@ -1181,13 +459,13 @@ await context await context .DidNotReceive() .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))); await context .DidNotReceive() .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()); context @@ -1214,7 +492,7 @@ await context [Fact] - public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstancesWithException_ThenFails() + public async Task GivenV4OrchestrationWithInput_WhenUpdatingInstancesWithException_ThenFails() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -1242,7 +520,7 @@ public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstancesWithExcepti .Returns(expectedInput); // Invoke the orchestration - await Assert.ThrowsAsync(() => _updateDurableFunction.UpdateInstancesV3Async(context, NullLogger.Instance)); + await Assert.ThrowsAsync(() => _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance)); // Assert behavior context @@ -1256,14 +534,14 @@ await context Arg.Any()); await context .DidNotReceive() - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Any()); await context .DidNotReceive() .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()); context @@ -1277,7 +555,7 @@ await context } [Fact] - public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstancesWithException_ThenCallCleanupActivity() + public async Task GivenV4OrchestrationWithInput_WhenUpdatingInstancesWithException_ThenCallCleanupActivity() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -1337,8 +615,8 @@ public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstancesWithExcepti _options.RetryOptions, Arg.Any()).Returns(instanceMetadataList); context - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) .ThrowsAsync(new FunctionFailedException("Function failed")); @@ -1351,7 +629,96 @@ public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstancesWithExcepti .Returns(Task.CompletedTask); // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV3Async(context, NullLogger.Instance); + await _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance); + + // Assert behavior + await context + .Received(1) + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.CleanupNewVersionBlobV3Async), + _options.RetryOptions, + Arg.Is(GetPredicate(instanceMetadataList, Partition.Default))); + + _meterProvider.ForceFlush(); + Assert.Empty(_exportedItems.Where(item => item.Name.Equals(_updateMeter.UpdatedInstances.Name, StringComparison.Ordinal))); + } + + [Fact] + public async Task GivenV4OrchestrationWithInput_WhenUpdatingInstancesWithDataStoreFailure_ThenCallCleanupActivity() + { + const int batchSize = 5; + _options.BatchSize = batchSize; + + DateTime createdTime = DateTime.UtcNow; + + List instanceMetadataList = new List { + new InstanceMetadata( + new VersionedInstanceIdentifier( + TestUidGenerator.Generate(), + TestUidGenerator.Generate(), + TestUidGenerator.Generate(), + version: 1, + Partition.Default), + new InstanceProperties + { + FileProperties = new FileProperties { ETag = $"etag-{1}", Path = $"path-{1}" }, + NewVersion = 3 + } + ), + new InstanceMetadata( + new VersionedInstanceIdentifier( + TestUidGenerator.Generate(), + TestUidGenerator.Generate(), + TestUidGenerator.Generate(), + version: 2, + Partition.Default), + new InstanceProperties + { + FileProperties = new FileProperties { ETag = $"etag-{2}", Path = $"path-{2}" }, + NewVersion = 4 + } + ) + }; + + var expectedInstancesWithNewWatermark = instanceMetadataList.Select(x => x.ToInstanceFileState()).ToList(); + + var expectedInput = new UpdateCheckpoint + { + Partition = Partition.Default, + ChangeDataset = string.Empty, + StudyInstanceUids = instanceMetadataList.Select(x => x.VersionedInstanceIdentifier.StudyInstanceUid).ToList(), + CreatedTime = createdTime, + }; + + // Arrange the input + string operationId = OperationId.Generate(); + IDurableOrchestrationContext context = CreateContext(operationId); + + context + .GetInput() + .Returns(expectedInput); + + context + .CallActivityWithRetryAsync>( + nameof(UpdateDurableFunction.UpdateInstanceWatermarkV2Async), + _options.RetryOptions, Arg.Any()).Returns(instanceMetadataList); + + context + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), + _options.RetryOptions, + Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) + .Returns(new UpdateInstanceResponse(instanceMetadataList, new List { "Instance Error" })); + + context + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.CleanupNewVersionBlobV3Async), + _options.RetryOptions, + expectedInstancesWithNewWatermark) + .Returns(Task.CompletedTask); + + // Invoke the orchestration + await _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance); // Assert behavior await context @@ -1366,7 +733,7 @@ await context } [Fact] - public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstances_ThenCompleteWithUpdateProgress() + public async Task GivenV4OrchestrationWithInput_WhenUpdatingInstances_ThenCompleteWithUpdateProgress() { const int batchSize = 5; _options.BatchSize = batchSize; @@ -1417,14 +784,15 @@ public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstances_ThenComple Arg.Any()) .Returns(instanceMetadataList); context - .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, - Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))) - .Returns(Task.CompletedTask); + Arg.Is(GetPredicate(expectedInput.Partition, instanceMetadataList, expectedInput.ChangeDataset)) + ) + .Returns(new UpdateInstanceResponse(instanceMetadataList, new List())); context .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()) .Returns(Task.CompletedTask); @@ -1436,7 +804,7 @@ public async Task GivenV3OrchestrationWithInput_WhenUpdatingInstances_ThenComple .Returns(Task.CompletedTask); // Invoke the orchestration - await _updateDurableFunction.UpdateInstancesV3Async(context, NullLogger.Instance); + await _updateDurableFunction.UpdateInstancesV4Async(context, NullLogger.Instance); // Assert behavior context @@ -1450,14 +818,14 @@ await context Arg.Any()); await context .Received(1) - .CallActivityWithRetryAsync>( - nameof(UpdateDurableFunction.UpdateInstanceBlobsV2Async), + .CallActivityWithRetryAsync( + nameof(UpdateDurableFunction.UpdateInstanceBlobsV3Async), _options.RetryOptions, Arg.Is(GetPredicate(Partition.Default, instanceMetadataList, expectedInput.ChangeDataset))); await context .Received(1) .CallActivityWithRetryAsync( - nameof(UpdateDurableFunction.CompleteUpdateStudyV2Async), + nameof(UpdateDurableFunction.CompleteUpdateStudyV3Async), _options.RetryOptions, Arg.Any()); context @@ -1467,7 +835,6 @@ await context false); } - private static IDurableOrchestrationContext CreateContext() => CreateContext(OperationId.Generate()); @@ -1516,15 +883,6 @@ private static Expression> GetPredicate(int && expectEmptyList ? x.InstanceMetadataList.IsNullOrEmpty() : x.InstanceMetadataList == instanceMetadataList; } - [Obsolete("Obsolete")] - private static Expression> GetPredicate(IReadOnlyList instanceWatermarks, Partition partition) - { - return x => x.InstanceWatermarks.IsNullOrEmpty() == false - && x.InstanceWatermarks[0].Version == instanceWatermarks[0].Version - && x.InstanceWatermarks[1].Version == instanceWatermarks[1].Version - && x.Partition == partition; - } - private static Expression> GetPredicate(IReadOnlyList instances, Partition partition) { return x => x.Instances.IsNullOrEmpty() == false diff --git a/src/Microsoft.Health.Dicom.Functions/Update/Models/CleanupBlobArguments.cs b/src/Microsoft.Health.Dicom.Functions/Update/Models/CleanupBlobArguments.cs deleted file mode 100644 index b6544cbfac..0000000000 --- a/src/Microsoft.Health.Dicom.Functions/Update/Models/CleanupBlobArguments.cs +++ /dev/null @@ -1,25 +0,0 @@ -// ------------------------------------------------------------------------------------------------- -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. -// ------------------------------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using EnsureThat; -using Microsoft.Health.Dicom.Core.Features.Model; -using Microsoft.Health.Dicom.Core.Features.Partitioning; - -namespace Microsoft.Health.Dicom.Functions.Update.Models; - -[Obsolete("Use CleanupBlobArgumentsV2 instead")] -public sealed class CleanupBlobArguments -{ - public IReadOnlyList InstanceWatermarks { get; } - public Partition Partition { get; } - - public CleanupBlobArguments(IReadOnlyList instanceWatermarks, Partition partition) - { - InstanceWatermarks = EnsureArg.IsNotNull(instanceWatermarks, nameof(instanceWatermarks)); - Partition = EnsureArg.IsNotNull(partition, nameof(partition)); - } -} diff --git a/src/Microsoft.Health.Dicom.Functions/Update/Models/UpdateInstanceResponse.cs b/src/Microsoft.Health.Dicom.Functions/Update/Models/UpdateInstanceResponse.cs new file mode 100644 index 0000000000..03e2a64db5 --- /dev/null +++ b/src/Microsoft.Health.Dicom.Functions/Update/Models/UpdateInstanceResponse.cs @@ -0,0 +1,23 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using EnsureThat; +using Microsoft.Health.Dicom.Core.Features.Model; + +namespace Microsoft.Health.Dicom.Functions.Update.Models; + +public class UpdateInstanceResponse +{ + public IReadOnlyList InstanceMetadataList { get; } + + public IReadOnlyList Errors { get; } + + public UpdateInstanceResponse(IReadOnlyList instanceMetadataList, IReadOnlyList errors) + { + InstanceMetadataList = EnsureArg.IsNotNull(instanceMetadataList, nameof(instanceMetadataList)); + Errors = errors; + } +} diff --git a/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Activity.cs b/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Activity.cs index 6463a8dc2a..4004cf7444 100644 --- a/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Activity.cs +++ b/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Activity.cs @@ -65,6 +65,7 @@ public async Task> UpdateInstanceWatermarkV2Async( /// or is . /// [FunctionName(nameof(UpdateInstanceBlobsV2Async))] + [Obsolete("This function is obsolete. Use UpdateInstanceBlobsV3Async instead.")] public async Task> UpdateInstanceBlobsV2Async( [ActivityTrigger] UpdateInstanceBlobArgumentsV2 arguments, ILogger logger) @@ -75,7 +76,7 @@ public async Task> UpdateInstanceBlobsV2As EnsureArg.IsNotNull(arguments.Partition, nameof(arguments.Partition)); EnsureArg.IsNotNull(logger, nameof(logger)); - DicomDataset datasetToUpdate = GetDeserialzedDataset(arguments.ChangeDataset); + DicomDataset datasetToUpdate = GetDeserializedDataset(arguments.ChangeDataset); int processed = 0; @@ -125,6 +126,74 @@ await Parallel.ForEachAsync( return updatedInstances; } + /// + /// Asynchronously batches the instance watermarks and calls the update instance. + /// + /// BatchUpdateArguments + /// A diagnostic logger. + /// + /// The result of the task contains the updated instances with file properties representing newly created blobs and any error. + /// + /// + /// or is . + /// + [FunctionName(nameof(UpdateInstanceBlobsV3Async))] + public async Task UpdateInstanceBlobsV3Async( + [ActivityTrigger] UpdateInstanceBlobArgumentsV2 arguments, + ILogger logger) + { + EnsureArg.IsNotNull(arguments, nameof(arguments)); + EnsureArg.IsNotNull(arguments.ChangeDataset, nameof(arguments.ChangeDataset)); + EnsureArg.IsNotNull(arguments.InstanceMetadataList, nameof(arguments.InstanceMetadataList)); + EnsureArg.IsNotNull(arguments.Partition, nameof(arguments.Partition)); + EnsureArg.IsNotNull(logger, nameof(logger)); + + DicomDataset datasetToUpdate = GetDeserializedDataset(arguments.ChangeDataset); + + logger.LogInformation("Beginning to update all instance blobs, Total count {TotalCount}", arguments.InstanceMetadataList.Count); + + var updatedInstances = new ConcurrentBag(); + var errors = new ConcurrentBag(); + + await Parallel.ForEachAsync( + arguments.InstanceMetadataList, + new ParallelOptions + { + CancellationToken = default, + MaxDegreeOfParallelism = _options.MaxParallelThreads, + }, + async (instance, token) => + { + try + { + FileProperties fileProperties = await _updateInstanceService.UpdateInstanceBlobAsync(instance, datasetToUpdate, arguments.Partition, token); + updatedInstances.Add( + new InstanceMetadata( + instance.VersionedInstanceIdentifier, + new InstanceProperties + { + FileProperties = fileProperties, + NewVersion = instance.InstanceProperties.NewVersion, + OriginalVersion = instance.InstanceProperties.OriginalVersion + })); + } + catch (DataStoreRequestFailedException ex) + { + logger.LogInformation("Failed to update instance with watermark {Watermark}, IsExternal {IsExternal}", instance.VersionedInstanceIdentifier.Version, ex.IsExternal); + errors.Add($"{ex.Message}. {ToInstanceString(instance.VersionedInstanceIdentifier)}"); + } + catch (DataStoreException ex) + { + logger.LogInformation("Failed to update instance with watermark {Watermark}, IsExternal {IsExternal}", instance.VersionedInstanceIdentifier.Version, ex.IsExternal); + errors.Add($"Failed to update instance. {ToInstanceString(instance.VersionedInstanceIdentifier)}"); + } + }); + + logger.LogInformation("Completed updating all instance blobs. Total instace count {TotalCount}. Total Failed {FailedCount}", arguments.InstanceMetadataList.Count, errors.Count); + + return new UpdateInstanceResponse(updatedInstances.ToList(), errors.ToList()); + } + /// /// Asynchronously commits all the instances in a study and creates new entries for changefeed. /// @@ -137,6 +206,7 @@ await Parallel.ForEachAsync( /// or is . /// [FunctionName(nameof(CompleteUpdateStudyV2Async))] + [Obsolete("This function is obsolete. Use CompleteUpdateStudyV3Async instead.")] public async Task CompleteUpdateStudyV2Async([ActivityTrigger] CompleteStudyArgumentsV2 arguments, ILogger logger) { EnsureArg.IsNotNull(arguments, nameof(arguments)); @@ -152,7 +222,7 @@ public async Task CompleteUpdateStudyV2Async([ActivityTrigger] CompleteStudyArgu await _indexStore.EndUpdateInstanceAsync( arguments.PartitionKey, arguments.StudyInstanceUid, - GetDeserialzedDataset(arguments.ChangeDataset), + GetDeserializedDataset(arguments.ChangeDataset), arguments.InstanceMetadataList, CancellationToken.None); @@ -166,43 +236,35 @@ await _indexStore.EndUpdateInstanceAsync( } /// - /// Asynchronously delete all the old blobs if it has more than 2 version. + /// Asynchronously commits all the instances in a study and creates new entries for changefeed. /// - /// Activity context which has list of watermarks to cleanup + /// CompleteInstanceArguments /// A diagnostic logger. /// - /// A task representing the operation. + /// A task representing the operation. /// /// /// or is . /// - [FunctionName(nameof(DeleteOldVersionBlobV2Async))] - [Obsolete("Use DeleteOldVersionBlobV3Async instead")] - public async Task DeleteOldVersionBlobV2Async([ActivityTrigger] CleanupBlobArguments arguments, ILogger logger) + [FunctionName(nameof(CompleteUpdateStudyV3Async))] + public async Task CompleteUpdateStudyV3Async([ActivityTrigger] CompleteStudyArgumentsV2 arguments, ILogger logger) { EnsureArg.IsNotNull(arguments, nameof(arguments)); - EnsureArg.IsNotNull(arguments.Partition, nameof(arguments.Partition)); + EnsureArg.IsNotNull(arguments.ChangeDataset, nameof(arguments.ChangeDataset)); + EnsureArg.IsNotNull(arguments.StudyInstanceUid, nameof(arguments.StudyInstanceUid)); + EnsureArg.IsNotNull(arguments.InstanceMetadataList, nameof(arguments.InstanceMetadataList)); EnsureArg.IsNotNull(logger, nameof(logger)); - IReadOnlyList fileIdentifiers = arguments.InstanceWatermarks; - Partition partition = arguments.Partition; - int fileCount = fileIdentifiers.Where(f => f.OriginalVersion.HasValue).Count(); + logger.LogInformation("Completing updating operation for study."); - logger.LogInformation("Begin deleting old blobs. Total size {TotalCount}", fileCount); + await _indexStore.EndUpdateInstanceAsync( + arguments.PartitionKey, + arguments.StudyInstanceUid, + GetDeserializedDataset(arguments.ChangeDataset), + arguments.InstanceMetadataList, + CancellationToken.None); - await Parallel.ForEachAsync( - fileIdentifiers.Where(f => f.OriginalVersion.HasValue), - new ParallelOptions - { - CancellationToken = default, - MaxDegreeOfParallelism = _options.MaxParallelThreads, - }, - async (fileIdentifier, token) => - { - await _updateInstanceService.DeleteInstanceBlobAsync(fileIdentifier.Version, partition, null, token); - }); - - logger.LogInformation("Old blobs deleted successfully. Total size {TotalCount}", fileCount); + logger.LogInformation("Updating study completed successfully."); } /// @@ -285,88 +347,6 @@ await Parallel.ForEachAsync( logger.LogInformation("New blobs deleted successfully. Total size {TotalCount}", fileCount); } - /// - /// Asynchronously delete the new blob when there is a failure while updating the study instances. - /// - /// arguments which have a list of watermarks to cleanup along with partition they belong to - /// A diagnostic logger. - /// - /// A task representing the operation. - /// - /// - /// or is . - /// - [FunctionName(nameof(CleanupNewVersionBlobV2Async))] - [Obsolete("Use CleanupNewVersionBlobV3Async instead")] - public async Task CleanupNewVersionBlobV2Async([ActivityTrigger] CleanupBlobArguments arguments, ILogger logger) - { - EnsureArg.IsNotNull(arguments, nameof(arguments)); - EnsureArg.IsNotNull(arguments.Partition, nameof(arguments.Partition)); - EnsureArg.IsNotNull(logger, nameof(logger)); - - IReadOnlyList fileIdentifiers = arguments.InstanceWatermarks; - Partition partition = arguments.Partition; - - int fileCount = fileIdentifiers.Where(f => f.NewVersion.HasValue).Count(); - logger.LogInformation("Begin cleaning up new blobs. Total size {TotalCount}", fileCount); - - await Parallel.ForEachAsync( - fileIdentifiers.Where(f => f.NewVersion.HasValue), - new ParallelOptions - { - CancellationToken = default, - MaxDegreeOfParallelism = _options.MaxParallelThreads, - }, - async (fileIdentifier, token) => - { - await _updateInstanceService.DeleteInstanceBlobAsync(fileIdentifier.NewVersion.Value, partition, null, token); - }); - - logger.LogInformation("New blobs deleted successfully. Total size {TotalCount}", fileCount); - } - - /// - /// Asynchronously move all the original version blobs to cold access tier. - /// - /// arguments which have a list of watermarks to move to cold access tier along with partition they belong to - /// A diagnostic logger. - /// - /// A task representing the operation. - /// - /// - /// or is . - /// - [FunctionName(nameof(SetOriginalBlobToColdAccessTierAsync))] - [Obsolete("Use SetOriginalBlobToColdAccessTierV2Async instead")] - public async Task SetOriginalBlobToColdAccessTierAsync([ActivityTrigger] CleanupBlobArguments arguments, ILogger logger) - { - EnsureArg.IsNotNull(arguments, nameof(arguments)); - EnsureArg.IsNotNull(arguments.Partition, nameof(arguments.Partition)); - EnsureArg.IsNotNull(logger, nameof(logger)); - - IReadOnlyList fileIdentifiers = arguments.InstanceWatermarks; - Partition partition = arguments.Partition; - - int fileCount = fileIdentifiers.Where(f => f.NewVersion.HasValue && !f.OriginalVersion.HasValue).Count(); - logger.LogInformation("Begin moving original version blob from hot to cold access tier. Total size {TotalCount}", fileCount); - - // Set to cold tier only for first time update, not for subsequent updates. This is to avoid moving the blob to cold tier multiple times. - // If the original version is set, then it means that the instance is updated already. - await Parallel.ForEachAsync( - fileIdentifiers.Where(f => f.NewVersion.HasValue && !f.OriginalVersion.HasValue), - new ParallelOptions - { - CancellationToken = default, - MaxDegreeOfParallelism = _options.MaxParallelThreads, - }, - async (fileIdentifier, token) => - { - await _fileStore.SetBlobToColdAccessTierAsync(fileIdentifier.Version, partition, null, token); - }); - - logger.LogInformation("Original version blob is moved to cold access tier successfully. Total size {TotalCount}", fileCount); - } - /// /// Asynchronously move all the original version blobs to cold access tier. /// @@ -408,5 +388,8 @@ await Parallel.ForEachAsync( logger.LogInformation("Original version blob is moved to cold access tier successfully. Total size {TotalCount}", fileCount); } - private DicomDataset GetDeserialzedDataset(string dataset) => JsonSerializer.Deserialize(dataset, _jsonSerializerOptions); + private static string ToInstanceString(VersionedInstanceIdentifier versionedInstanceIdentifier) + => $"PartitionKey: {versionedInstanceIdentifier.Partition.Name}, StudyInstanceUID: {versionedInstanceIdentifier.StudyInstanceUid}, SeriesInstanceUID: {versionedInstanceIdentifier.SeriesInstanceUid}, SOPInstanceUID: {versionedInstanceIdentifier.SopInstanceUid}"; + + private DicomDataset GetDeserializedDataset(string dataset) => JsonSerializer.Deserialize(dataset, _jsonSerializerOptions); } diff --git a/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Orchestration.cs b/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Orchestration.cs index d686123377..f26de8207c 100644 --- a/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Orchestration.cs +++ b/src/Microsoft.Health.Dicom.Functions/Update/UpdateDurableFunction.Orchestration.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; +using System.Data; using System.Linq; using System.Threading.Tasks; using EnsureThat; @@ -30,14 +31,14 @@ public partial class UpdateDurableFunction /// /// The context for the orchestration instance. /// A diagnostic logger. - /// A task representing the operation. + /// A task representing the operation. /// /// or is . /// /// Orchestration instance ID is invalid. - [FunctionName(nameof(UpdateInstancesV2Async))] - [Obsolete("Use UpdateInstancesV3Async instead")] - public async Task UpdateInstancesV2Async( + [FunctionName(nameof(UpdateInstancesV3Async))] + [Obsolete("Use UpdateInstancesV4Async instead")] + public async Task UpdateInstancesV3Async( [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger) { @@ -102,20 +103,20 @@ await context.CallActivityWithRetryAsync( numberofStudyFailed++; // Cleanup the new version when the update activity fails - await TryCleanupActivityV2(context, instanceWatermarks, input.Partition); + await TryCleanupActivityV3(context, instances, input.Partition); } if (!isFailedToUpdateStudy) { await context.CallActivityWithRetryAsync( - nameof(DeleteOldVersionBlobV2Async), + nameof(DeleteOldVersionBlobV3Async), _options.RetryOptions, - new CleanupBlobArguments(instanceWatermarks, input.Partition)); + new CleanupBlobArgumentsV2(instances, input.Partition)); await context.CallActivityWithRetryAsync( - nameof(SetOriginalBlobToColdAccessTierAsync), + nameof(SetOriginalBlobToColdAccessTierV2Async), _options.RetryOptions, - new CleanupBlobArguments(instanceWatermarks, input.Partition)); + new CleanupBlobArgumentsV2(instances, input.Partition)); } } @@ -175,13 +176,13 @@ await context.CallActivityWithRetryAsync( /// /// The context for the orchestration instance. /// A diagnostic logger. - /// A task representing the operation. + /// A task representing the operation. /// /// or is . /// /// Orchestration instance ID is invalid. - [FunctionName(nameof(UpdateInstancesV3Async))] - public async Task UpdateInstancesV3Async( + [FunctionName(nameof(UpdateInstancesV4Async))] + public async Task UpdateInstancesV4Async( [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger) { @@ -208,7 +209,6 @@ public async Task UpdateInstancesV3Async( logger.LogInformation("Updated all instances new watermark in a study. Found {InstanceCount} instance for study", instances.Count); var totalNoOfInstances = input.TotalNumberOfInstanceUpdated; - int numberofStudyFailed = input.NumberOfStudyFailed; if (instances.Count > 0) { @@ -216,37 +216,36 @@ public async Task UpdateInstancesV3Async( try { - instanceMetadataList = await context.CallActivityWithRetryAsync>( - nameof(UpdateInstanceBlobsV2Async), + UpdateInstanceResponse response = await context.CallActivityWithRetryAsync( + nameof(UpdateInstanceBlobsV3Async), _options.RetryOptions, new UpdateInstanceBlobArgumentsV2(input.Partition, instances, input.ChangeDataset)); - await context.CallActivityWithRetryAsync( - nameof(CompleteUpdateStudyV2Async), - _options.RetryOptions, - new CompleteStudyArgumentsV2(input.Partition.Key, studyInstanceUid, input.ChangeDataset, GetInstanceMetadataList(instanceMetadataList))); + instanceMetadataList = response.InstanceMetadataList; - totalNoOfInstances += instances.Count; + if (response.Errors?.Count > 0) + { + isFailedToUpdateStudy = true; + logger.LogWarning("Failed to update instances for study. Total instance failed for study {TotalFailed}", response.Errors.Count); + await HandleException(context, input, studyInstanceUid, instances, response.Errors); + } + else + { + await context.CallActivityWithRetryAsync( + nameof(CompleteUpdateStudyV3Async), + _options.RetryOptions, + new CompleteStudyArgumentsV2(input.Partition.Key, studyInstanceUid, input.ChangeDataset, GetInstanceMetadataList(instanceMetadataList))); + + totalNoOfInstances += instances.Count; + } } catch (FunctionFailedException ex) { isFailedToUpdateStudy = true; logger.LogError(ex, "Failed to update instances for study", ex); - var errors = new List - { - $"Failed to update instances for study {studyInstanceUid}", - }; - - if (input.Errors != null) - errors.AddRange(errors); - - input.Errors = errors; - - numberofStudyFailed++; - // Cleanup the new version when the update activity fails - await TryCleanupActivityV3(context, instances, input.Partition); + await HandleException(context, input, studyInstanceUid, instances, null); } if (!isFailedToUpdateStudy) @@ -278,7 +277,7 @@ await context.CallActivityWithRetryAsync( Partition = input.Partition, PartitionKey = input.PartitionKey, NumberOfStudyCompleted = numberOfStudyCompleted, - NumberOfStudyFailed = numberofStudyFailed, + NumberOfStudyFailed = input.NumberOfStudyFailed, TotalNumberOfInstanceUpdated = totalNoOfInstances, Errors = input.Errors, CreatedTime = input.CreatedTime ?? await context.GetCreatedTimeAsync(_options.RetryOptions), @@ -286,6 +285,11 @@ await context.CallActivityWithRetryAsync( } else { + if (input.TotalNumberOfInstanceUpdated > 0) + { + replaySafeCounter.Add(input.TotalNumberOfInstanceUpdated); + } + if (input.Errors?.Count > 0) { logger.LogWarning("Update operation completed with errors. {NumberOfStudyUpdated}, {NumberOfStudyFailed}, {TotalNumberOfInstanceUpdated}.", @@ -302,12 +306,38 @@ await context.CallActivityWithRetryAsync( input.NumberOfStudyCompleted, input.TotalNumberOfInstanceUpdated); } + } + } - if (input.TotalNumberOfInstanceUpdated > 0) - { - replaySafeCounter.Add(input.TotalNumberOfInstanceUpdated); - } + private async Task HandleException( + IDurableOrchestrationContext context, + UpdateCheckpoint input, + string studyInstanceUid, + IReadOnlyList instances, + IReadOnlyList instanceErrors) + { + var errors = new List(); + + if (input.Errors != null) + { + errors.AddRange(input.Errors); } + + errors.Add($"Failed to update instances for study {studyInstanceUid}"); + + if (instanceErrors != null) + { + // We don't want to populate all the errors in Azure Table Storage, DTFx may attempt to compress the entry as needed using GZip and storing in blob storage + // But I think we should also be wary of what the user experience is for this via the response, so restricting to 5 errors for now. We can update based on feedback. + // TODO: Inform the user that the remaining failures can be found in the logs. + errors.AddRange(instanceErrors.Take(5)); + } + + input.Errors = errors; + input.NumberOfStudyFailed++; + + // Cleanup the new version when the update activity fails + await TryCleanupActivityV3(context, instances, input.Partition); } private IReadOnlyList GetInstanceMetadataList(IReadOnlyList instanceMetadataList) @@ -316,20 +346,6 @@ private IReadOnlyList GetInstanceMetadataList(IReadOnlyList(); } - [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Using a generic exception to catch all scenarios.")] - [Obsolete("Use TryCleanupActivityV3 instead")] - private async Task TryCleanupActivityV2(IDurableOrchestrationContext context, IReadOnlyList instanceWatermarks, Partition partition) - { - try - { - await context.CallActivityWithRetryAsync( - nameof(CleanupNewVersionBlobV2Async), - _options.RetryOptions, - new CleanupBlobArguments(instanceWatermarks, partition)); - } - catch (Exception) { } - } - [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Using a generic exception to catch all scenarios.")] private async Task TryCleanupActivityV3(IDurableOrchestrationContext context, IReadOnlyList instances, Partition partition) { diff --git a/src/Microsoft.Health.Dicom.Web/appsettings.json b/src/Microsoft.Health.Dicom.Web/appsettings.json index 3809bc1bdb..26dd5970ec 100644 --- a/src/Microsoft.Health.Dicom.Web/appsettings.json +++ b/src/Microsoft.Health.Dicom.Web/appsettings.json @@ -58,7 +58,7 @@ } }, "Update": { - "Name": "UpdateInstancesV3Async" + "Name": "UpdateInstancesV4Async" } }, "DicomServer": {