Skip to content

Commit

Permalink
Support ability to resume export (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
namadabu authored Jun 28, 2019
1 parent cdf4be1 commit c8cfcd0
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 93 deletions.
8 changes: 7 additions & 1 deletion Microsoft.Health.Fhir.sln
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4.Te
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Azure", "src\Microsoft.Health.Fhir.Azure\Microsoft.Health.Fhir.Azure.csproj", "{6A804695-44D9-4335-B559-9E9A0DAB1102}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Azure.UnitTests", "src\Microsoft.Health.Fhir.Azure.UnitTests\Microsoft.Health.Fhir.Azure.UnitTests.csproj", "{4C4701AA-8DE4-44B2-8134-102F040C34F8}"
EndProject
Global
GlobalSection(SharedMSBuildProjectFiles) = preSolution
src\Microsoft.Health.Fhir.Shared.Tests\Microsoft.Health.Fhir.Shared.Tests.projitems*{1495bba7-07b8-47ca-b2ed-511003b57667}*SharedItemsImports = 13
Expand Down Expand Up @@ -261,6 +263,10 @@ Global
{6A804695-44D9-4335-B559-9E9A0DAB1102}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6A804695-44D9-4335-B559-9E9A0DAB1102}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6A804695-44D9-4335-B559-9E9A0DAB1102}.Release|Any CPU.Build.0 = Release|Any CPU
{4C4701AA-8DE4-44B2-8134-102F040C34F8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4C4701AA-8DE4-44B2-8134-102F040C34F8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4C4701AA-8DE4-44B2-8134-102F040C34F8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4C4701AA-8DE4-44B2-8134-102F040C34F8}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -313,7 +319,7 @@ Global
{7F2B9209-2C50-42ED-961B-A09CC8A26A07} = {BA0D5243-CFC7-4DEB-9836-657C50195DD1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
RESX_SortFileContentOnSave = True
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using Microsoft.Health.Fhir.Azure.ExportDestinationClient;
using Xunit;

namespace Microsoft.Health.Fhir.Azure.UnitTests.ExportDestinationClient
{
public class OrderedSetOfBlockIdsTests
{
[Fact]
public void GivenOrderedSetOfBlockIds_WhenAddingExistingItem_ThenItemDoesNotGetAdded()
{
string item1 = "item1";
string item2 = "item2";

var orderedSetOfBlockIds = new OrderedSetOfBlockIds();
orderedSetOfBlockIds.Add(item1);
orderedSetOfBlockIds.Add(item1);
orderedSetOfBlockIds.Add(item2);

var result = orderedSetOfBlockIds.ToList();

Assert.Equal(2, result.Count);
Assert.Contains(item1, result);
Assert.Contains(item2, result);
}

[Fact]
public void GivenListContainingDuplicateItems_WhenCreatingOrderedSetOfBlockIds_ThenOnlyDistinctItemsAreAdded()
{
string item1 = "item1";
string item2 = "item2";
var input = new List<string>() { item1, item2, item2, item2, item1 };

var orderedSetOfBlockIds = new OrderedSetOfBlockIds(input);

var result = orderedSetOfBlockIds.ToList();

Assert.Equal(2, result.Count);
Assert.Contains(item1, result);
Assert.Contains(item2, result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" />
<PackageReference Include="NSubstitute" Version="4.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Microsoft.Health.Fhir.Azure\Microsoft.Health.Fhir.Azure.csproj" />
</ItemGroup>

</Project>
2 changes: 2 additions & 0 deletions src/Microsoft.Health.Fhir.Azure/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@
// -------------------------------------------------------------------------------------------------

using System.Resources;
using System.Runtime.CompilerServices;

[assembly: NeutralResourcesLanguage("en-us")]
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.Azure.UnitTests")]
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Fhir.Core.Features.Operations.Export.ExportDestinationClient;

namespace Microsoft.Health.Fhir.Azure.ExportDestinationClient
Expand All @@ -24,6 +26,15 @@ public class AzureExportDestinationClient : IExportDestinationClient
private Dictionary<Uri, CloudBlockBlobWrapper> _uriToBlobMapping = new Dictionary<Uri, CloudBlockBlobWrapper>();
private Dictionary<(Uri FileUri, uint PartId), Stream> _streamMappings = new Dictionary<(Uri FileUri, uint PartId), Stream>();

private readonly ILogger _logger;

public AzureExportDestinationClient(ILogger<AzureExportDestinationClient> logger)
{
EnsureArg.IsNotNull(logger, nameof(logger));

_logger = logger;
}

public string DestinationType => "azure-block-blob";

public async Task ConnectAsync(string connectionSettings, CancellationToken cancellationToken, string containerId = null)
Expand Down Expand Up @@ -66,8 +77,12 @@ public Task<Uri> CreateFileAsync(string fileName, CancellationToken cancellation
CheckIfClientIsConnected();

CloudBlockBlob blockBlob = _blobContainer.GetBlockBlobReference(fileName);
blockBlob.Properties.ContentType = "application/fhir+ndjson";
_uriToBlobMapping.Add(blockBlob.Uri, new CloudBlockBlobWrapper(blockBlob));

if (!_uriToBlobMapping.ContainsKey(blockBlob.Uri))
{
blockBlob.Properties.ContentType = "application/fhir+ndjson";
_uriToBlobMapping.Add(blockBlob.Uri, new CloudBlockBlobWrapper(blockBlob));
}

return Task.FromResult(blockBlob.Uri);
}
Expand All @@ -93,35 +108,69 @@ public async Task CommitAsync(CancellationToken cancellationToken)
{
CheckIfClientIsConnected();

var uploadAndCommitTasks = new List<Task>();
// Upload all blocks for each blob that was modified.
Task[] uploadTasks = new Task[_streamMappings.Count];
CloudBlockBlobWrapper[] wrappersToCommit = new CloudBlockBlobWrapper[_streamMappings.Count];

int index = 0;
foreach (KeyValuePair<(Uri, uint), Stream> mapping in _streamMappings)
{
// Reset stream position.
Stream stream = mapping.Value;

// Reset the position.
stream.Position = 0;

CloudBlockBlobWrapper blobWrapper = _uriToBlobMapping[mapping.Key.Item1];

var blockId = Convert.ToBase64String(Encoding.ASCII.GetBytes(mapping.Key.Item2.ToString("d6")));
uploadAndCommitTasks.Add(Task.Run(async () =>
{
await blobWrapper.UploadBlockAsync(blockId, stream, md5Hash: null, cancellationToken);
await blobWrapper.CommitBlockListAsync(cancellationToken);

stream.Dispose();
}));
uploadTasks[index] = blobWrapper.UploadBlockAsync(blockId, stream, md5Hash: null, cancellationToken);
wrappersToCommit[index] = blobWrapper;
index++;
}

await Task.WhenAll(uploadAndCommitTasks);
await Task.WhenAll(uploadTasks);

// Commit all the blobs that were uploaded.
Task[] commitTasks = wrappersToCommit.Select(wrapper => wrapper.CommitBlockListAsync(cancellationToken)).ToArray();
await Task.WhenAll(commitTasks);

// We can clear the stream mappings once we commit everything.
foreach (Stream stream in _streamMappings.Values)
{
stream.Dispose();
}

// We can clear the stream mappings once we commit everything in memory.
_streamMappings.Clear();
}

public async Task OpenFileAsync(Uri fileUri, CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(fileUri, nameof(fileUri));
CheckIfClientIsConnected();

if (_uriToBlobMapping.ContainsKey(fileUri))
{
_logger.LogInformation("Trying to open a file that the client already knows about.");
return;
}

var blob = new CloudBlockBlob(fileUri, _blobClient.Credentials);

// We are going to consider only committed blocks.
IEnumerable<ListBlockItem> result = await blob.DownloadBlockListAsync(
BlockListingFilter.Committed,
accessCondition: null,
options: null,
operationContext: null,
cancellationToken);

// Update the internal mapping with the block lists of the blob.
var wrapper = new CloudBlockBlobWrapper(blob, result.Select(x => x.Name).ToList());
_uriToBlobMapping.Add(fileUri, wrapper);
}

private void CheckIfClientIsConnected()
{
if (_blobClient == null)
if (_blobClient == null || _blobContainer == null)
{
throw new DestinationConnectionException(Resources.DestinationClientNotConnected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ namespace Microsoft.Health.Fhir.Azure.ExportDestinationClient
/// </summary>
public class CloudBlockBlobWrapper
{
private readonly List<string> _existingBlockIds;
private readonly OrderedSetOfBlockIds _existingBlockIds;
private readonly CloudBlockBlob _cloudBlob;

public CloudBlockBlobWrapper(CloudBlockBlob blockBlob)
: this(blockBlob, new List<string>())
{
}

public CloudBlockBlobWrapper(CloudBlockBlob blockBlob, IEnumerable<string> blockList)
{
EnsureArg.IsNotNull(blockBlob, nameof(blockBlob));
EnsureArg.IsNotNull(blockList, nameof(blockList));

_cloudBlob = blockBlob;

// For now we assume that the blockblob we are using is always new. If it is an existing one,
// we will have to get the existing list of block ids.
_existingBlockIds = new List<string>();
_existingBlockIds = new OrderedSetOfBlockIds(blockList);
}

public async Task UploadBlockAsync(string blockId, Stream data, string md5Hash, CancellationToken cancellationToken)
Expand All @@ -46,7 +49,7 @@ public async Task UploadBlockAsync(string blockId, Stream data, string md5Hash,

public async Task CommitBlockListAsync(CancellationToken cancellationToken)
{
await _cloudBlob.PutBlockListAsync(_existingBlockIds, cancellationToken);
await _cloudBlob.PutBlockListAsync(_existingBlockIds.ToList(), cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using EnsureThat;

namespace Microsoft.Health.Fhir.Azure.ExportDestinationClient
{
/// <summary>
/// A modified implementation of an ordered hash set using a list and hashset. This class is specifically used
/// for maintaining the existing block ids that are part of a blob in <see cref="CloudBlockBlobWrapper"/>. We need to
/// maintain ordering of the block ids but also make sure that we don't have multiple copies of a block id. Hence
/// the need for an ordered hash set.
/// </summary>
internal class OrderedSetOfBlockIds
{
private readonly List<string> _itemList;
private readonly HashSet<string> _itemSet;

public OrderedSetOfBlockIds()
{
_itemList = new List<string>();
_itemSet = new HashSet<string>();
}

public OrderedSetOfBlockIds(IEnumerable<string> existingItems)
: this()
{
EnsureArg.IsNotNull(existingItems, nameof(existingItems));

foreach (string item in existingItems)
{
Add(item);
}
}

/// <summary>
/// Will add the given <see cref="item"/> if it is not present already.
/// </summary>
/// <param name="item">Item to be added to the <see cref="OrderedSetOfBlockIds"/></param>
public void Add(string item)
{
EnsureArg.IsNotNullOrWhiteSpace(item, nameof(item));

// Add item to list if the hashset does not contain it.
if (_itemSet.Add(item))
{
_itemList.Add(item);
}
}

/// <summary>
/// Returns a list of all items that were added to the <see cref="OrderedSetOfBlockIds"/>
/// </summary>
public List<string> ToList()
{
return new List<string>(_itemList);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<SecretWrapper> DeleteSecretAsync(string secretName, Cancellati
{
EnsureArg.IsNotNullOrWhiteSpace(secretName, nameof(secretName));

SecretBundle result = await _keyVaultClient.DeleteSecretAsync(_keyVaultUri.AbsoluteUri, secretName, cancellationToken);
DeletedSecretBundle result = await _keyVaultClient.DeleteSecretAsync(_keyVaultUri.AbsoluteUri, secretName, cancellationToken);

return new SecretWrapper(result.Id, result.Value);
}
Expand Down
Loading

0 comments on commit c8cfcd0

Please sign in to comment.