Skip to content

Commit

Permalink
Fix Function Serialization and Input Validation (#1290)
Browse files Browse the repository at this point in the history
- Fix incorrect range check on max watermark for the GetInstanceBatches*Async activities
- Remove $type from durable function extension input/output representation
- Re-enable tests for PAAS to prevent future regressions
  • Loading branch information
wsugarman authored Jan 18, 2022
1 parent d69f3a6 commit 39f5a29
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.Health.Dicom.Core.Features.Operations;
using Microsoft.Health.Dicom.Core.Registration;
using Microsoft.Health.Dicom.Operations.Client.DurableTask;
using Microsoft.Health.Dicom.Operations.Client.Serialization;

namespace Microsoft.Health.Dicom.Operations.Client
{
Expand Down Expand Up @@ -49,6 +50,7 @@ public static IDicomServerBuilder AddAzureFunctionsClient(
IServiceCollection services = dicomServerBuilder.Services;
services.TryAddSingleton(GuidFactory.Default);
services.AddDurableClientFactory(x => configuration.GetSection(ConfigSectionName).Bind(x));
services.Replace(ServiceDescriptor.Singleton<IMessageSerializerSettingsFactory, MessageSerializerSettingsFactory>());
services.TryAddScoped<IDicomOperationsClient, DicomAzureFunctionsClient>();

return dicomServerBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Newtonsoft.Json;

namespace Microsoft.Health.Dicom.Operations.Client.Serialization
{
// TODO: Migrate to common package
internal class MessageSerializerSettingsFactory : IMessageSerializerSettingsFactory
{
public JsonSerializerSettings CreateJsonSerializerSettings()
{
// Based on the framework settings:
// https://github.com/Azure/azure-functions-durable-extension/blob/v2.6.0/src/WebJobs.Extensions.DurableTask/MessageSerializerSettingsFactory.cs
return new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.None,
DateParseHandling = DateParseHandling.None,
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class BatchCreationArgumentsTests
[Fact]
public void GivenBadValues_WhenContructing_ThenThrowExceptions()
{
Assert.Throws<ArgumentOutOfRangeException>(() => new BatchCreationArguments(-1, 2, 3));
Assert.Throws<ArgumentOutOfRangeException>(() => new BatchCreationArguments(1, -2, 3));
Assert.Throws<ArgumentOutOfRangeException>(() => new BatchCreationArguments(1, 2, -3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,28 @@ await _instanceStore
.GetInstanceBatchesAsync(batchSize, maxParallelBatches, IndexStatus.Created, null, CancellationToken.None);
}

[Fact]
public async Task GivenWatermark_WhenGettingInstanceBatches_ThenShouldInvokeCorrectMethod()
[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(10)]
public async Task GivenWatermark_WhenGettingInstanceBatches_ThenShouldInvokeCorrectMethod(long max)
{
const long watermark = 12345L;
const int batchSize = 100;
const int maxParallelBatches = 3;

IReadOnlyList<WatermarkRange> expected = new List<WatermarkRange> { new WatermarkRange(10, 1000) };
IReadOnlyList<WatermarkRange> expected = new List<WatermarkRange> { new WatermarkRange(1, 2) }; // watermarks don't matter
_instanceStore
.GetInstanceBatchesAsync(batchSize, maxParallelBatches, IndexStatus.Created, watermark, CancellationToken.None)
.GetInstanceBatchesAsync(batchSize, maxParallelBatches, IndexStatus.Created, max, CancellationToken.None)
.Returns(expected);

IReadOnlyList<WatermarkRange> actual = await _reindexDurableFunction.GetInstanceBatchesV2Async(
new BatchCreationArguments(watermark, batchSize, maxParallelBatches),
new BatchCreationArguments(max, batchSize, maxParallelBatches),
NullLogger.Instance);

Assert.Same(expected, actual);
await _instanceStore
.Received(1)
.GetInstanceBatchesAsync(batchSize, maxParallelBatches, IndexStatus.Created, watermark, CancellationToken.None);
.GetInstanceBatchesAsync(batchSize, maxParallelBatches, IndexStatus.Created, max, CancellationToken.None);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public async Task GivenNewOrchestrationWithWork_WhenReindexingInstances_ThenDivi
_options.MaxParallelBatches = 3;

IReadOnlyList<WatermarkRange> expectedBatches = CreateBatches(50);
int expectedPercentage = (int)((double)(50 - expectedBatches[^1].Start + 1) / 50 * 100);
var expectedInput = new ReindexInput { QueryTagKeys = new List<int> { 1, 2, 3, 4, 5 } };
var expectedTags = new List<ExtendedQueryTagStoreEntry>
{
Expand Down Expand Up @@ -121,7 +120,6 @@ public async Task GivenExistingOrchestrationWithWork_WhenReindexingInstances_The
_options.MaxParallelBatches = 2;

IReadOnlyList<WatermarkRange> expectedBatches = CreateBatches(35);
int expectedPercentage = (int)((double)(42 - expectedBatches[^1].Start + 1) / 42 * 100);
var expectedInput = new ReindexInput
{
QueryTagKeys = new List<int> { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -285,14 +283,16 @@ await context
.ContinueAsNew(default, default);
}

[Fact]
public async Task GivenNoRemainingInstances_WhenReindexingInstances_ThenComplete()
[Theory]
[InlineData(1, 100)]
[InlineData(5, 1000)]
public async Task GivenNoRemainingInstances_WhenReindexingInstances_ThenComplete(long start, long end)
{
var expectedBatches = new List<WatermarkRange>();
var expectedInput = new ReindexInput
{
QueryTagKeys = new List<int> { 1, 2, 3, 4, 5 },
Completed = new WatermarkRange(5, 1000),
Completed = new WatermarkRange(start, end),
};
var expectedTags = new List<ExtendedQueryTagStoreEntry>
{
Expand All @@ -316,7 +316,7 @@ public async Task GivenNoRemainingInstances_WhenReindexingInstances_ThenComplete
.CallActivityWithRetryAsync<IReadOnlyList<WatermarkRange>>(
nameof(ReindexDurableFunction.GetInstanceBatchesV2Async),
_options.ActivityRetryOptions,
Arg.Is(GetPredicate(4L)))
Arg.Is(GetPredicate(start - 1)))
.Returns(expectedBatches);
context
.CallActivityWithRetryAsync<IReadOnlyList<int>>(
Expand Down Expand Up @@ -349,7 +349,7 @@ await context
.CallActivityWithRetryAsync<IReadOnlyList<WatermarkRange>>(
nameof(ReindexDurableFunction.GetInstanceBatchesV2Async),
_options.ActivityRetryOptions,
Arg.Is(GetPredicate(4L)));
Arg.Is(GetPredicate(start - 1)));
await context
.DidNotReceive()
.CallActivityWithRetryAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using Microsoft.Health.Dicom.Core.Features.ExtendedQueryTag;
using Microsoft.Health.Dicom.Core.Features.Model;
using Microsoft.Health.Dicom.Operations.Indexing.Models;
using Microsoft.Health.Dicom.Operations.Serialization;
using Newtonsoft.Json;
using Xunit;

namespace Microsoft.Health.Dicom.Operations.UnitTests.Serialization
{
public class MessageSerializerSettingsFactoryTests
{
private readonly static JsonSerializerSettings JsonSerializerSettings = new MessageSerializerSettingsFactory().CreateJsonSerializerSettings();

[Fact]
public void GivenPreviouslySerializedMessage_WhenDeserializingWithNewSettings_ThenDeserializeSuccessfully()
{
var queryTags = new List<ExtendedQueryTagStoreEntry>
{
new ExtendedQueryTagStoreEntry(1, "01", "DT", "foo", QueryTagLevel.Instance, ExtendedQueryTagStatus.Adding, QueryStatus.Enabled, 0),
new ExtendedQueryTagStoreEntry(2, "02", "DT", "bar", QueryTagLevel.Study, ExtendedQueryTagStatus.Adding, QueryStatus.Enabled, 0),
};
var range = new WatermarkRange(5, 10);
const int threadCount = 7;

var before = new ReindexBatchArguments(queryTags, range, threadCount);
string json = JsonConvert.SerializeObject(
before,
new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Objects,
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple,
});

ReindexBatchArguments actual = JsonConvert.DeserializeObject<ReindexBatchArguments>(json, JsonSerializerSettings);

Assert.Equal(before.QueryTags, actual.QueryTags, new TagEntryComparer());
Assert.Equal(before.ThreadCount, actual.ThreadCount);
Assert.Equal(before.WatermarkRange, actual.WatermarkRange);
}

private sealed class TagEntryComparer : IEqualityComparer<ExtendedQueryTagStoreEntry>
{
public bool Equals(ExtendedQueryTagStoreEntry x, ExtendedQueryTagStoreEntry y)
=> x.ErrorCount == y.ErrorCount
|| x.Key == y.Key
|| x.Level == y.Level
|| x.Path == y.Path
|| x.PrivateCreator == y.PrivateCreator
|| x.QueryStatus == y.QueryStatus
|| x.Status == y.Status
|| x.VR == y.VR;

public int GetHashCode(ExtendedQueryTagStoreEntry obj)
{
throw new System.NotImplementedException();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,12 @@ public sealed class BatchCreationArguments
/// <param name="batchSize">The number of DICOM instances processed by a single activity.</param>
/// <param name="maxParallelBatches">The maximum number of concurrent batches processed at a given time.</param>
/// <exception cref="ArgumentOutOfRangeException">
/// <para><paramref name="maxWatermark"/> is less than <c>1</c>.</para>
/// <para>-or-</para>
/// <para><paramref name="batchSize"/> is less than <c>1</c>.</para>
/// <para>-or-</para>
/// <para><paramref name="maxParallelBatches"/> is less than <c>1</c>.</para>
/// </exception>
public BatchCreationArguments(long? maxWatermark, int batchSize, int maxParallelBatches)
{
if (maxWatermark.HasValue)
{
EnsureArg.IsGte(maxWatermark.GetValueOrDefault(), 1, nameof(maxWatermark));
}

EnsureArg.IsGte(batchSize, 1, nameof(batchSize));
EnsureArg.IsGte(maxParallelBatches, 1, nameof(maxParallelBatches));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Task<IReadOnlyList<WatermarkRange>> GetInstanceBatchesV2Async(

if (arguments.MaxWatermark.HasValue)
{
logger.LogInformation("Dividing up the instances into batches starting from {Watermark}.", arguments.MaxWatermark);
logger.LogInformation("Dividing up the instances into batches starting from the largest watermark {Watermark}.", arguments.MaxWatermark);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using EnsureThat;
using FellowOakDicom;
using FellowOakDicom.Serialization;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -21,6 +22,7 @@
using Microsoft.Health.Dicom.Operations.Indexing;
using Microsoft.Health.Dicom.Operations.Management;
using Microsoft.Health.Dicom.Operations.Registration;
using Microsoft.Health.Dicom.Operations.Serialization;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.SqlServer.Configs;
using Microsoft.IO;
Expand Down Expand Up @@ -55,6 +57,7 @@ public static IDicomFunctionsBuilder ConfigureFunctions(
.AddFellowOakDicomExtension()
.AddFunctionsOptions<QueryTagIndexingOptions>(configuration, QueryTagIndexingOptions.SectionName, bindNonPublicProperties: true)
.AddFunctionsOptions<PurgeHistoryOptions>(configuration, PurgeHistoryOptions.SectionName)
.ConfigureDurableFunctionSerialization()
.AddJsonSerializerOptions(o =>
{
o.Converters.Add(new JsonStringEnumConverter());
Expand Down Expand Up @@ -160,6 +163,13 @@ private static IServiceCollection AddJsonSerializerOptions(this IServiceCollecti
return services.Configure(configure);
}

private static IServiceCollection ConfigureDurableFunctionSerialization(this IServiceCollection services)
{
EnsureArg.IsNotNull(services, nameof(services));

return services.Replace(ServiceDescriptor.Singleton<IMessageSerializerSettingsFactory, MessageSerializerSettingsFactory>());
}

private sealed class FellowOakExtensionConfiguration : IExtensionConfigProvider
{
private readonly IServiceProvider _serviceProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Newtonsoft.Json;

namespace Microsoft.Health.Dicom.Operations.Serialization
{
// TODO: Migrate to common package
internal class MessageSerializerSettingsFactory : IMessageSerializerSettingsFactory
{
public JsonSerializerSettings CreateJsonSerializerSettings()
{
// Based on the framework settings:
// https://github.com/Azure/azure-functions-durable-extension/blob/v2.6.0/src/WebJobs.Extensions.DurableTask/MessageSerializerSettingsFactory.cs
return new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.None,
DateParseHandling = DateParseHandling.None,
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ExtendedQueryTagTests(WebJobsIntegrationTestFixture<Startup> fixture)
}

[Fact]
// [Trait("Category", "bvt")] // TODO: Enable once functions are enabled in PAAS
[Trait("Category", "bvt")]
public async Task GivenExtendedQueryTag_WhenReindexing_ThenShouldSucceed()
{
DicomTag weightTag = DicomTag.PatientWeight;
Expand Down Expand Up @@ -97,6 +97,7 @@ public async Task GivenExtendedQueryTag_WhenReindexing_ThenShouldSucceed()
}

[Fact]
[Trait("Category", "bvt")]
public async Task GivenExtendedQueryTagWithErrors_WhenReindexing_ThenShouldSucceedWithErrors()
{
// Define tags
Expand Down

0 comments on commit 39f5a29

Please sign in to comment.