diff --git a/Microsoft.Health.Fhir.sln b/Microsoft.Health.Fhir.sln index 623ced9b6b..e815d3482a 100644 --- a/Microsoft.Health.Fhir.sln +++ b/Microsoft.Health.Fhir.sln @@ -110,6 +110,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4.Te EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Azure", "src\Microsoft.Health.Fhir.Azure\Microsoft.Health.Fhir.Azure.csproj", "{6A804695-44D9-4335-B559-9E9A0DAB1102}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Azure.UnitTests", "src\Microsoft.Health.Fhir.Azure.UnitTests\Microsoft.Health.Fhir.Azure.UnitTests.csproj", "{4C4701AA-8DE4-44B2-8134-102F040C34F8}" +EndProject Global GlobalSection(SharedMSBuildProjectFiles) = preSolution src\Microsoft.Health.Fhir.Shared.Tests\Microsoft.Health.Fhir.Shared.Tests.projitems*{1495bba7-07b8-47ca-b2ed-511003b57667}*SharedItemsImports = 13 @@ -261,6 +263,10 @@ Global {6A804695-44D9-4335-B559-9E9A0DAB1102}.Debug|Any CPU.Build.0 = Debug|Any CPU {6A804695-44D9-4335-B559-9E9A0DAB1102}.Release|Any CPU.ActiveCfg = Release|Any CPU {6A804695-44D9-4335-B559-9E9A0DAB1102}.Release|Any CPU.Build.0 = Release|Any CPU + {4C4701AA-8DE4-44B2-8134-102F040C34F8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4C4701AA-8DE4-44B2-8134-102F040C34F8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4C4701AA-8DE4-44B2-8134-102F040C34F8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4C4701AA-8DE4-44B2-8134-102F040C34F8}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -313,7 +319,7 @@ Global {7F2B9209-2C50-42ED-961B-A09CC8A26A07} = {BA0D5243-CFC7-4DEB-9836-657C50195DD1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution - SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8} RESX_SortFileContentOnSave = True + SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8} EndGlobalSection EndGlobal diff --git a/src/Microsoft.Health.Fhir.Azure.UnitTests/ExportDestinationClient/OrderedSetOfBlockIdsTests.cs b/src/Microsoft.Health.Fhir.Azure.UnitTests/ExportDestinationClient/OrderedSetOfBlockIdsTests.cs new file mode 100644 index 0000000000..394a12dffe --- /dev/null +++ b/src/Microsoft.Health.Fhir.Azure.UnitTests/ExportDestinationClient/OrderedSetOfBlockIdsTests.cs @@ -0,0 +1,48 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using Microsoft.Health.Fhir.Azure.ExportDestinationClient; +using Xunit; + +namespace Microsoft.Health.Fhir.Azure.UnitTests.ExportDestinationClient +{ + public class OrderedSetOfBlockIdsTests + { + [Fact] + public void GivenOrderedSetOfBlockIds_WhenAddingExistingItem_ThenItemDoesNotGetAdded() + { + string item1 = "item1"; + string item2 = "item2"; + + var orderedSetOfBlockIds = new OrderedSetOfBlockIds(); + orderedSetOfBlockIds.Add(item1); + orderedSetOfBlockIds.Add(item1); + orderedSetOfBlockIds.Add(item2); + + var result = orderedSetOfBlockIds.ToList(); + + Assert.Equal(2, result.Count); + Assert.Contains(item1, result); + Assert.Contains(item2, result); + } + + [Fact] + public void GivenListContainingDuplicateItems_WhenCreatingOrderedSetOfBlockIds_ThenOnlyDistinctItemsAreAdded() + { + string item1 = "item1"; + string item2 = "item2"; + var input = new List() { item1, item2, item2, item2, item1 }; + + var orderedSetOfBlockIds = new OrderedSetOfBlockIds(input); + + var result = orderedSetOfBlockIds.ToList(); + + Assert.Equal(2, result.Count); + Assert.Contains(item1, result); + Assert.Contains(item2, result); + } + } +} diff --git a/src/Microsoft.Health.Fhir.Azure.UnitTests/Microsoft.Health.Fhir.Azure.UnitTests.csproj b/src/Microsoft.Health.Fhir.Azure.UnitTests/Microsoft.Health.Fhir.Azure.UnitTests.csproj new file mode 100644 index 0000000000..c05091ef3c --- /dev/null +++ b/src/Microsoft.Health.Fhir.Azure.UnitTests/Microsoft.Health.Fhir.Azure.UnitTests.csproj @@ -0,0 +1,18 @@ + + + + netcoreapp2.2 + + + + + + + + + + + + + + diff --git a/src/Microsoft.Health.Fhir.Azure/AssemblyInfo.cs b/src/Microsoft.Health.Fhir.Azure/AssemblyInfo.cs index c7a051867e..0cca8a805a 100644 --- a/src/Microsoft.Health.Fhir.Azure/AssemblyInfo.cs +++ b/src/Microsoft.Health.Fhir.Azure/AssemblyInfo.cs @@ -4,5 +4,7 @@ // ------------------------------------------------------------------------------------------------- using System.Resources; +using System.Runtime.CompilerServices; [assembly: NeutralResourcesLanguage("en-us")] +[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.Azure.UnitTests")] diff --git a/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/AzureExportDestinationClient.cs b/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/AzureExportDestinationClient.cs index 605101bf17..88e6e08403 100644 --- a/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/AzureExportDestinationClient.cs +++ b/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/AzureExportDestinationClient.cs @@ -6,12 +6,14 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using EnsureThat; using Microsoft.Azure.Storage; using Microsoft.Azure.Storage.Blob; +using Microsoft.Extensions.Logging; using Microsoft.Health.Fhir.Core.Features.Operations.Export.ExportDestinationClient; namespace Microsoft.Health.Fhir.Azure.ExportDestinationClient @@ -24,6 +26,15 @@ public class AzureExportDestinationClient : IExportDestinationClient private Dictionary _uriToBlobMapping = new Dictionary(); private Dictionary<(Uri FileUri, uint PartId), Stream> _streamMappings = new Dictionary<(Uri FileUri, uint PartId), Stream>(); + private readonly ILogger _logger; + + public AzureExportDestinationClient(ILogger logger) + { + EnsureArg.IsNotNull(logger, nameof(logger)); + + _logger = logger; + } + public string DestinationType => "azure-block-blob"; public async Task ConnectAsync(string connectionSettings, CancellationToken cancellationToken, string containerId = null) @@ -66,8 +77,12 @@ public Task CreateFileAsync(string fileName, CancellationToken cancellation CheckIfClientIsConnected(); CloudBlockBlob blockBlob = _blobContainer.GetBlockBlobReference(fileName); - blockBlob.Properties.ContentType = "application/fhir+ndjson"; - _uriToBlobMapping.Add(blockBlob.Uri, new CloudBlockBlobWrapper(blockBlob)); + + if (!_uriToBlobMapping.ContainsKey(blockBlob.Uri)) + { + blockBlob.Properties.ContentType = "application/fhir+ndjson"; + _uriToBlobMapping.Add(blockBlob.Uri, new CloudBlockBlobWrapper(blockBlob)); + } return Task.FromResult(blockBlob.Uri); } @@ -93,35 +108,69 @@ public async Task CommitAsync(CancellationToken cancellationToken) { CheckIfClientIsConnected(); - var uploadAndCommitTasks = new List(); + // Upload all blocks for each blob that was modified. + Task[] uploadTasks = new Task[_streamMappings.Count]; + CloudBlockBlobWrapper[] wrappersToCommit = new CloudBlockBlobWrapper[_streamMappings.Count]; + + int index = 0; foreach (KeyValuePair<(Uri, uint), Stream> mapping in _streamMappings) { + // Reset stream position. Stream stream = mapping.Value; - - // Reset the position. stream.Position = 0; CloudBlockBlobWrapper blobWrapper = _uriToBlobMapping[mapping.Key.Item1]; - var blockId = Convert.ToBase64String(Encoding.ASCII.GetBytes(mapping.Key.Item2.ToString("d6"))); - uploadAndCommitTasks.Add(Task.Run(async () => - { - await blobWrapper.UploadBlockAsync(blockId, stream, md5Hash: null, cancellationToken); - await blobWrapper.CommitBlockListAsync(cancellationToken); - stream.Dispose(); - })); + uploadTasks[index] = blobWrapper.UploadBlockAsync(blockId, stream, md5Hash: null, cancellationToken); + wrappersToCommit[index] = blobWrapper; + index++; } - await Task.WhenAll(uploadAndCommitTasks); + await Task.WhenAll(uploadTasks); + + // Commit all the blobs that were uploaded. + Task[] commitTasks = wrappersToCommit.Select(wrapper => wrapper.CommitBlockListAsync(cancellationToken)).ToArray(); + await Task.WhenAll(commitTasks); + + // We can clear the stream mappings once we commit everything. + foreach (Stream stream in _streamMappings.Values) + { + stream.Dispose(); + } - // We can clear the stream mappings once we commit everything in memory. _streamMappings.Clear(); } + public async Task OpenFileAsync(Uri fileUri, CancellationToken cancellationToken) + { + EnsureArg.IsNotNull(fileUri, nameof(fileUri)); + CheckIfClientIsConnected(); + + if (_uriToBlobMapping.ContainsKey(fileUri)) + { + _logger.LogInformation("Trying to open a file that the client already knows about."); + return; + } + + var blob = new CloudBlockBlob(fileUri, _blobClient.Credentials); + + // We are going to consider only committed blocks. + IEnumerable result = await blob.DownloadBlockListAsync( + BlockListingFilter.Committed, + accessCondition: null, + options: null, + operationContext: null, + cancellationToken); + + // Update the internal mapping with the block lists of the blob. + var wrapper = new CloudBlockBlobWrapper(blob, result.Select(x => x.Name).ToList()); + _uriToBlobMapping.Add(fileUri, wrapper); + } + private void CheckIfClientIsConnected() { - if (_blobClient == null) + if (_blobClient == null || _blobContainer == null) { throw new DestinationConnectionException(Resources.DestinationClientNotConnected); } diff --git a/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/CloudBlockBlobWrapper.cs b/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/CloudBlockBlobWrapper.cs index dd9cd05959..e69eb73319 100644 --- a/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/CloudBlockBlobWrapper.cs +++ b/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/CloudBlockBlobWrapper.cs @@ -20,18 +20,21 @@ namespace Microsoft.Health.Fhir.Azure.ExportDestinationClient /// public class CloudBlockBlobWrapper { - private readonly List _existingBlockIds; + private readonly OrderedSetOfBlockIds _existingBlockIds; private readonly CloudBlockBlob _cloudBlob; public CloudBlockBlobWrapper(CloudBlockBlob blockBlob) + : this(blockBlob, new List()) + { + } + + public CloudBlockBlobWrapper(CloudBlockBlob blockBlob, IEnumerable blockList) { EnsureArg.IsNotNull(blockBlob, nameof(blockBlob)); + EnsureArg.IsNotNull(blockList, nameof(blockList)); _cloudBlob = blockBlob; - - // For now we assume that the blockblob we are using is always new. If it is an existing one, - // we will have to get the existing list of block ids. - _existingBlockIds = new List(); + _existingBlockIds = new OrderedSetOfBlockIds(blockList); } public async Task UploadBlockAsync(string blockId, Stream data, string md5Hash, CancellationToken cancellationToken) @@ -46,7 +49,7 @@ public async Task UploadBlockAsync(string blockId, Stream data, string md5Hash, public async Task CommitBlockListAsync(CancellationToken cancellationToken) { - await _cloudBlob.PutBlockListAsync(_existingBlockIds, cancellationToken); + await _cloudBlob.PutBlockListAsync(_existingBlockIds.ToList(), cancellationToken); } } } diff --git a/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/OrderedSetOfBlockIds.cs b/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/OrderedSetOfBlockIds.cs new file mode 100644 index 0000000000..31d7f047cb --- /dev/null +++ b/src/Microsoft.Health.Fhir.Azure/ExportDestinationClient/OrderedSetOfBlockIds.cs @@ -0,0 +1,62 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using EnsureThat; + +namespace Microsoft.Health.Fhir.Azure.ExportDestinationClient +{ + /// + /// A modified implementation of an ordered hash set using a list and hashset. This class is specifically used + /// for maintaining the existing block ids that are part of a blob in . We need to + /// maintain ordering of the block ids but also make sure that we don't have multiple copies of a block id. Hence + /// the need for an ordered hash set. + /// + internal class OrderedSetOfBlockIds + { + private readonly List _itemList; + private readonly HashSet _itemSet; + + public OrderedSetOfBlockIds() + { + _itemList = new List(); + _itemSet = new HashSet(); + } + + public OrderedSetOfBlockIds(IEnumerable existingItems) + : this() + { + EnsureArg.IsNotNull(existingItems, nameof(existingItems)); + + foreach (string item in existingItems) + { + Add(item); + } + } + + /// + /// Will add the given if it is not present already. + /// + /// Item to be added to the + public void Add(string item) + { + EnsureArg.IsNotNullOrWhiteSpace(item, nameof(item)); + + // Add item to list if the hashset does not contain it. + if (_itemSet.Add(item)) + { + _itemList.Add(item); + } + } + + /// + /// Returns a list of all items that were added to the + /// + public List ToList() + { + return new List(_itemList); + } + } +} diff --git a/src/Microsoft.Health.Fhir.Azure/KeyVault/KeyVaultSecretStore.cs b/src/Microsoft.Health.Fhir.Azure/KeyVault/KeyVaultSecretStore.cs index df0aab379b..39373cc707 100644 --- a/src/Microsoft.Health.Fhir.Azure/KeyVault/KeyVaultSecretStore.cs +++ b/src/Microsoft.Health.Fhir.Azure/KeyVault/KeyVaultSecretStore.cs @@ -60,7 +60,7 @@ public async Task DeleteSecretAsync(string secretName, Cancellati { EnsureArg.IsNotNullOrWhiteSpace(secretName, nameof(secretName)); - SecretBundle result = await _keyVaultClient.DeleteSecretAsync(_keyVaultUri.AbsoluteUri, secretName, cancellationToken); + DeletedSecretBundle result = await _keyVaultClient.DeleteSecretAsync(_keyVaultUri.AbsoluteUri, secretName, cancellationToken); return new SecretWrapper(result.Id, result.Value); } diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Export/ExportJobTaskTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Export/ExportJobTaskTests.cs index 11c9e87527..65e7f2aa2c 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Export/ExportJobTaskTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Export/ExportJobTaskTests.cs @@ -28,20 +28,17 @@ namespace Microsoft.Health.Fhir.Core.UnitTests.Features.Operations.Export { public class ExportJobTaskTests { - private static readonly ExportJobRecord _exportJobRecord = new ExportJobRecord( - new Uri("https://localhost/ExportJob/"), - "Patient", - "hash"); - private static readonly WeakETag _weakETag = WeakETag.FromVersionId("0"); + private ExportJobRecord _exportJobRecord; + private IExportDestinationClientFactory _exportDestinationClientFactory = Substitute.For(); + private InMemoryExportDestinationClient _inMemoryDestinationClient = new InMemoryExportDestinationClient(); + private readonly IFhirOperationDataStore _fhirOperationDataStore = Substitute.For(); private readonly ISecretStore _secretStore = Substitute.For(); private readonly ExportJobConfiguration _exportJobConfiguration = new ExportJobConfiguration(); private readonly ISearchService _searchService = Substitute.For(); private readonly IResourceToByteArraySerializer _resourceToByteArraySerializer = Substitute.For(); - private readonly IExportDestinationClientFactory _exportDestinationClientFactory = Substitute.For(); - private readonly InMemoryExportDestinationClient _inMemoryDestinationClient = new InMemoryExportDestinationClient(); private readonly ExportJobTask _exportJobTask; @@ -53,6 +50,10 @@ public class ExportJobTaskTests public ExportJobTaskTests() { _cancellationToken = _cancellationTokenSource.Token; + _exportJobRecord = new ExportJobRecord( + new Uri("https://localhost/ExportJob/"), + "Patient", + "hash"); _fhirOperationDataStore.UpdateExportJobAsync(_exportJobRecord, _weakETag, _cancellationToken).Returns(x => { @@ -301,6 +302,76 @@ public async Task GivenDeleteSecretFailed_WhenExecuted_ThenJobStatusShouldBeUpda Assert.Equal(OperationStatus.Completed, _lastExportJobOutcome.JobRecord.Status); } + [Fact] + public async Task GivenAnExportJobToResume_WhenExecuted_ThenItShouldExportAllRecordsAsExpected() + { + // We are using the SearchService to throw an exception in order to simulate the export job task + // "crashing" while in the middle of the process. + _exportJobConfiguration.NumberOfPagesPerCommit = 2; + + int numberOfCalls = 0; + int numberOfSuccessfulPages = 2; + + _searchService.SearchAsync( + Arg.Any(), + Arg.Any>>(), + _cancellationToken) + .Returns(x => + { + int count = numberOfCalls; + + if (count == numberOfSuccessfulPages) + { + throw new Exception(); + } + + numberOfCalls++; + return CreateSearchResult( + new[] + { + new ResourceWrapper( + count.ToString(CultureInfo.InvariantCulture), + "1", + "Patient", + new RawResource("data", Core.Models.FhirResourceFormat.Json), + null, + DateTimeOffset.MinValue, + false, + null, + null, + null), + }, + continuationToken: "ct"); + }); + + await _exportJobTask.ExecuteAsync(_exportJobRecord, _weakETag, _cancellationToken); + + string exportedIds = _inMemoryDestinationClient.GetExportedData(new Uri("Patient.ndjson", UriKind.Relative)); + + Assert.Equal("01", exportedIds); + Assert.NotNull(_exportJobRecord.Progress); + + // We create a new export job task here to simulate the worker picking up the "old" export job record + // and resuming the export process. The export destination client contains data that has + // been committed up until the "crash". + _inMemoryDestinationClient = new InMemoryExportDestinationClient(); + _exportDestinationClientFactory.Create("in-memory").Returns(_inMemoryDestinationClient); + var secondExportJobTask = new ExportJobTask( + _fhirOperationDataStore, + _secretStore, + Options.Create(_exportJobConfiguration), + _searchService, + _resourceToByteArraySerializer, + _exportDestinationClientFactory, + NullLogger.Instance); + + numberOfSuccessfulPages = 5; + await secondExportJobTask.ExecuteAsync(_exportJobRecord, _weakETag, _cancellationToken); + + exportedIds = _inMemoryDestinationClient.GetExportedData(new Uri("Patient.ndjson", UriKind.Relative)); + Assert.Equal("23", exportedIds); + } + private SearchResult CreateSearchResult(IEnumerable resourceWrappers = null, string continuationToken = null) { if (resourceWrappers == null) diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/IExportDestinationClient.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/IExportDestinationClient.cs index fcac299b6e..92971bac70 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/IExportDestinationClient.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/IExportDestinationClient.cs @@ -4,6 +4,7 @@ // ------------------------------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Task = System.Threading.Tasks.Task; @@ -53,5 +54,13 @@ public interface IExportDestinationClient /// The cancellation token. /// A representing the asynchronous commit operation. Task CommitAsync(CancellationToken cancellationToken); + + /// + /// Opens an existing file from the destination. + /// + /// Uri of the file to be opened. + /// The cancellation token. + /// A representing the asynchronous initialize operation. + Task OpenFileAsync(Uri fileUri, CancellationToken cancellationToken); } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/InMemoryExportDestinationClient.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/InMemoryExportDestinationClient.cs index b766043adc..123aa8ce87 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/InMemoryExportDestinationClient.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportDestinationClient/InMemoryExportDestinationClient.cs @@ -32,7 +32,10 @@ public async Task CreateFileAsync(string fileName, CancellationToken cancel var fileUri = new Uri(fileName, UriKind.Relative); - _exportedData.Add(fileUri, new StringBuilder()); + if (!_exportedData.ContainsKey(fileUri)) + { + _exportedData.Add(fileUri, new StringBuilder()); + } return await Task.FromResult(fileUri); } @@ -74,6 +77,16 @@ public async Task CommitAsync(CancellationToken cancellationToken) _streamMappings.Clear(); } + public Task OpenFileAsync(Uri fileUri, CancellationToken cancellationToken) + { + if (!_exportedData.ContainsKey(fileUri)) + { + _exportedData.Add(fileUri, new StringBuilder()); + } + + return Task.CompletedTask; + } + public string GetExportedData(Uri fileUri) { if (_exportedData.TryGetValue(fileUri, out StringBuilder sb)) diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs index d58d9ce617..797eb07cd6 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs @@ -7,7 +7,6 @@ using System.Collections.Generic; using System.Globalization; using System.Threading; -using System.Threading.Tasks; using EnsureThat; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -35,7 +34,7 @@ public class ExportJobTask : IExportJobTask // Currently we will have only one file per resource type. In the future we will add the ability to split // individual files based on a max file size. This could result in a single resource having multiple files. // We will have to update the below mapping to support multiple ExportFileInfo per resource type. - private readonly Dictionary _resourceTypeToFileInfoMapping = new Dictionary(); + private readonly IDictionary _resourceTypeToFileInfoMapping = new Dictionary(); private ExportJobRecord _exportJobRecord; private WeakETag _weakETag; @@ -77,26 +76,25 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa try { - // Get destination type from secret store. - DestinationInfo destinationInfo = await GetDestinationInfo(cancellationToken); + // Get destination type from secret store and connect to the destination using appropriate client. + await GetDestinationInfoAndConnect(cancellationToken); - // Connect to the destination using appropriate client. - _exportDestinationClient = _exportDestinationClientFactory.Create(destinationInfo.DestinationType); - - await _exportDestinationClient.ConnectAsync(destinationInfo.DestinationConnectionString, cancellationToken, _exportJobRecord.Id); - - // TODO: For now, always restart from the beginning. We will support resume in another work item. - _exportJobRecord.Progress = new ExportJobProgress(continuationToken: null, page: 0); + // If we are resuming a job, we can detect that by checking the progress info from the job record. + // If it is null, then we know we are processing a new job. + if (_exportJobRecord.Progress == null) + { + _exportJobRecord.Progress = new ExportJobProgress(continuationToken: null, page: 0); + } ExportJobProgress progress = _exportJobRecord.Progress; - // Current page will be used to organize a set of search results into a group so that they can be committed together. + // Current batch will be used to organize a set of search results into a group so that they can be committed together. uint currentBatchId = progress.Page; // The first item is placeholder for continuation token so that it can be updated efficiently later. var queryParameters = new Tuple[] { - null, + Tuple.Create(KnownQueryParameterNames.ContinuationToken, progress.ContinuationToken), Tuple.Create(KnownQueryParameterNames.Count, _exportJobConfiguration.MaximumNumberOfResourcesPerQuery.ToString(CultureInfo.InvariantCulture)), Tuple.Create(KnownQueryParameterNames.LastUpdated, $"le{_exportJobRecord.QueuedTime.ToString("o", CultureInfo.InvariantCulture)}"), }; @@ -106,26 +104,9 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa // 2. There is no continuation token but the page is 0, which means it's the initial export. while (progress.ContinuationToken != null || progress.Page == 0) { - // Commit the changes if necessary. - if (progress.Page != 0 && progress.Page % _exportJobConfiguration.NumberOfPagesPerCommit == 0) - { - await _exportDestinationClient.CommitAsync(cancellationToken); - - // Update the job record. - await UpdateJobRecord(_exportJobRecord, cancellationToken); - - currentBatchId = progress.Page; - } - - // Set the continuation token. - queryParameters[0] = Tuple.Create(KnownQueryParameterNames.ContinuationToken, progress.ContinuationToken); - + // Search and process the results. SearchResult searchResult = await _searchService.SearchAsync(_exportJobRecord.ResourceType, queryParameters, cancellationToken); - - foreach (ResourceWrapper resourceWrapper in searchResult.Results) - { - await ProcessResourceWrapperAsync(resourceWrapper, currentBatchId, cancellationToken); - } + await ProcessSearchResults(searchResult.Results, currentBatchId, cancellationToken); if (searchResult.ContinuationToken == null) { @@ -133,19 +114,29 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa break; } - // Update the job record. + // Update the continuation token (local cache). progress.UpdateContinuationToken(searchResult.ContinuationToken); + queryParameters[0] = Tuple.Create(KnownQueryParameterNames.ContinuationToken, progress.ContinuationToken); + + // Commit the changes if necessary. + if (progress.Page % _exportJobConfiguration.NumberOfPagesPerCommit == 0) + { + await _exportDestinationClient.CommitAsync(cancellationToken); + + // Update the job record. + await UpdateAndCommitJobRecord(_exportJobRecord, cancellationToken); + + currentBatchId = progress.Page; + } } // Commit one last time for any pending changes. await _exportDestinationClient.CommitAsync(cancellationToken); - _exportJobRecord.Output.AddRange(_resourceTypeToFileInfoMapping.Values); + await UpdateAndCommitJobStatus(OperationStatus.Completed, updateEndTimestamp: true, cancellationToken); _logger.LogTrace("Successfully completed the job."); - await UpdateJobStatus(OperationStatus.Completed, updateEndTimestamp: true, cancellationToken); - try { // Best effort to delete the secret. If it fails to delete, then move on. @@ -170,11 +161,11 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa // Try to update the job to failed state. _logger.LogError(ex, "Encountered an unhandled exception. The job will be marked as failed."); - await UpdateJobStatus(OperationStatus.Failed, updateEndTimestamp: true, cancellationToken); + await UpdateAndCommitJobStatus(OperationStatus.Failed, updateEndTimestamp: true, cancellationToken); } } - private async Task UpdateJobStatus(OperationStatus operationStatus, bool updateEndTimestamp, CancellationToken cancellationToken) + private async Task UpdateAndCommitJobStatus(OperationStatus operationStatus, bool updateEndTimestamp, CancellationToken cancellationToken) { _exportJobRecord.Status = operationStatus; @@ -183,10 +174,10 @@ private async Task UpdateJobStatus(OperationStatus operationStatus, bool updateE _exportJobRecord.EndTime = Clock.UtcNow; } - await UpdateJobRecord(_exportJobRecord, cancellationToken); + await UpdateAndCommitJobRecord(_exportJobRecord, cancellationToken); } - private async Task UpdateJobRecord(ExportJobRecord jobRecord, CancellationToken cancellationToken) + private async Task UpdateAndCommitJobRecord(ExportJobRecord jobRecord, CancellationToken cancellationToken) { ExportJobOutcome updatedExportJobOutcome = await _fhirOperationDataStore.UpdateExportJobAsync(jobRecord, _weakETag, cancellationToken); @@ -194,37 +185,56 @@ private async Task UpdateJobRecord(ExportJobRecord jobRecord, CancellationToken _weakETag = updatedExportJobOutcome.ETag; } - private async Task GetDestinationInfo(CancellationToken cancellationToken) + // Get destination info from secret store, create appropriate export client and connect to destination. + private async Task GetDestinationInfoAndConnect(CancellationToken cancellationToken) { SecretWrapper secret = await _secretStore.GetSecretAsync(_exportJobRecord.SecretName, cancellationToken); DestinationInfo destinationInfo = JsonConvert.DeserializeObject(secret.SecretValue); - return destinationInfo; + + _exportDestinationClient = _exportDestinationClientFactory.Create(destinationInfo.DestinationType); + + await _exportDestinationClient.ConnectAsync(destinationInfo.DestinationConnectionString, cancellationToken, _exportJobRecord.Id); } - private async Task ProcessResourceWrapperAsync(ResourceWrapper resourceWrapper, uint partId, CancellationToken cancellationToken) + private async Task ProcessSearchResults(IEnumerable searchResults, uint partId, CancellationToken cancellationToken) { - string resourceType = resourceWrapper.ResourceTypeName; - - // Check whether we already have an existing file for the current resource type. - if (!_resourceTypeToFileInfoMapping.TryGetValue(resourceType, out ExportFileInfo exportFileInfo)) + foreach (ResourceWrapper resourceWrapper in searchResults) { - string fileName = resourceType + ".ndjson"; + string resourceType = resourceWrapper.ResourceTypeName; + + // Check whether we already have an existing file for the current resource type. + if (!_resourceTypeToFileInfoMapping.TryGetValue(resourceType, out ExportFileInfo exportFileInfo)) + { + // Check whether we have seen this file previously (in situations where we are resuming an export) + if (_exportJobRecord.Output.TryGetValue(resourceType, out exportFileInfo)) + { + // A file already exists for this resource type. Let us open the file on the client. + await _exportDestinationClient.OpenFileAsync(exportFileInfo.FileUri, cancellationToken); + } + else + { + // File does not exist. Create it. + string fileName = resourceType + ".ndjson"; + Uri fileUri = await _exportDestinationClient.CreateFileAsync(fileName, cancellationToken); - Uri fileUri = await _exportDestinationClient.CreateFileAsync(fileName, cancellationToken); + exportFileInfo = new ExportFileInfo(resourceType, fileUri, sequence: 0); - exportFileInfo = new ExportFileInfo(resourceType, fileUri, sequence: 0); + // Since we created a new file the JobRecord Output also needs to know about it. + _exportJobRecord.Output.TryAdd(resourceType, exportFileInfo); + } - _resourceTypeToFileInfoMapping.Add(resourceType, exportFileInfo); - } + _resourceTypeToFileInfoMapping.Add(resourceType, exportFileInfo); + } - // Serialize into NDJson and write to the file. - byte[] bytesToWrite = _resourceToByteArraySerializer.Serialize(resourceWrapper); + // Serialize into NDJson and write to the file. + byte[] bytesToWrite = _resourceToByteArraySerializer.Serialize(resourceWrapper); - await _exportDestinationClient.WriteFilePartAsync(exportFileInfo.FileUri, partId, bytesToWrite, cancellationToken); + await _exportDestinationClient.WriteFilePartAsync(exportFileInfo.FileUri, partId, bytesToWrite, cancellationToken); - // Increment the file information. - exportFileInfo.IncrementCount(bytesToWrite.Length); + // Increment the file information. + exportFileInfo.IncrementCount(bytesToWrite.Length); + } } } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/GetExportRequestHandler.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/GetExportRequestHandler.cs index c8f22411d6..77a8c516e7 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/GetExportRequestHandler.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/GetExportRequestHandler.cs @@ -3,6 +3,8 @@ // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- +using System; +using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -38,7 +40,7 @@ public async Task Handle(GetExportRequest request, Cancellati outcome.JobRecord.QueuedTime, outcome.JobRecord.RequestUri, requiresAccessToken: false, - outcome.JobRecord.Output, + outcome.JobRecord.Output.Values.OrderBy(x => x.Type, StringComparer.Ordinal).ToList(), outcome.JobRecord.Error); exportResponse = new GetExportResponse(HttpStatusCode.OK, jobResult); diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/Models/ExportJobRecord.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/Models/ExportJobRecord.cs index d1bdd73ab9..3ef488ded1 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/Models/ExportJobRecord.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Export/Models/ExportJobRecord.cs @@ -66,10 +66,10 @@ protected ExportJobRecord() public int SchemaVersion { get; private set; } [JsonProperty(JobRecordProperties.Output)] - public List Output { get; private set; } = new List(); + public IDictionary Output { get; private set; } = new Dictionary(); [JsonProperty(JobRecordProperties.Error)] - public List Error { get; private set; } = new List(); + public IList Error { get; private set; } = new List(); [JsonProperty(JobRecordProperties.Status)] public OperationStatus Status { get; set; } diff --git a/src/Microsoft.Health.Fhir.Core/Features/SecretStore/SecretWrapper.cs b/src/Microsoft.Health.Fhir.Core/Features/SecretStore/SecretWrapper.cs index 4f79838f37..8062692939 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/SecretStore/SecretWrapper.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/SecretStore/SecretWrapper.cs @@ -16,7 +16,7 @@ public class SecretWrapper public SecretWrapper(string secretName, string secretValue) { EnsureArg.IsNotNullOrWhiteSpace(secretName, nameof(secretName)); - EnsureArg.IsNotNullOrWhiteSpace(secretValue, nameof(secretValue)); + EnsureArg.IsNotEmptyOrWhitespace(secretValue, nameof(secretValue)); SecretName = secretName; SecretValue = secretValue; diff --git a/src/Microsoft.Health.Fhir.Stu3.Web/appsettings.json b/src/Microsoft.Health.Fhir.Stu3.Web/appsettings.json index 5e9037635e..9f5c2a38fa 100644 --- a/src/Microsoft.Health.Fhir.Stu3.Web/appsettings.json +++ b/src/Microsoft.Health.Fhir.Stu3.Web/appsettings.json @@ -39,7 +39,8 @@ "MaximumNumberOfConcurrentJobsAllowed": 1, "JobHeartbeatTimeoutThreshold": "00:10:00", "JobPollingFrequency": "00:00:10", - "SupportedDestinations": [ "" ] + "MaximumNumberOfResourcesPerQuery": 100, + "NumberOfPagesPerCommit": 10 } } },