From 1465c684175c470c81ea9d24d8f51b96a28fced3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Sharma?= Date: Wed, 18 Oct 2017 15:21:59 -0700 Subject: [PATCH] [Service Bus] Add SubscriptionProcessor (#74) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a `SubscriptionProcessor` to the `NuGet.Services.ServiceBus` project. Its purpose is to facilitate processing a specific message type from a given Service Bus subscription. This is similar to @agr's [`Orchestrator`](https://github.com/NuGet/NuGet.Jobs/pull/226/files#diff-0bcef9abb72b34c9e7c76d551f9ae9cd). The pattern to use this class should follow the Orchestrator’s code: 1. A job should call the `SubscriptionProcessor`’s `Start` method within its `Run` method 2. A job should “block” the `Run`’s thread 3. On shutdown, a job should: a. Call the `SubscriptionProcessor`’s `StartShutdownAsync` method b. Wait until the `SubscriptionProcessor`’s `NumberOfMessagesInProgress` goes to 0 This will be used by the Package Signing jobs to listen to Service Bus queues and process validation requests. --- NuGet.Server.Common.sln | 10 +- .../NuGet.Services.Contracts.csproj | 1 + .../ServiceBus/IBrokeredMessage.cs | 1 + .../ServiceBus/IOnMessageOptions.cs | 10 + .../ServiceBus/ISubscriptionClient.cs | 3 + .../BrokeredMessageSerializer.cs | 2 +- .../IBrokeredMessageSerializer.cs | 26 +++ .../IMessageHandler.cs | 21 ++ .../ISubscriptionProcessor.cs | 35 ++++ .../NuGet.Services.ServiceBus.csproj | 5 + .../OnMessageOptionsWrapper.cs | 24 +++ .../SubscriptionClientWrapper.cs | 30 ++- .../SubscriptionProcessor.cs | 95 +++++++++ src/NuGet.Services.ServiceBus/project.json | 1 + .../NuGet.Services.ServiceBus.Tests.csproj | 1 + .../SubscriptionProcessorFacts.cs | 194 ++++++++++++++++++ 16 files changed, 448 insertions(+), 11 deletions(-) create mode 100644 src/NuGet.Services.Contracts/ServiceBus/IOnMessageOptions.cs create mode 100644 src/NuGet.Services.ServiceBus/IBrokeredMessageSerializer.cs create mode 100644 src/NuGet.Services.ServiceBus/IMessageHandler.cs create mode 100644 src/NuGet.Services.ServiceBus/ISubscriptionProcessor.cs create mode 100644 src/NuGet.Services.ServiceBus/OnMessageOptionsWrapper.cs create mode 100644 src/NuGet.Services.ServiceBus/SubscriptionProcessor.cs create mode 100644 tests/NuGet.Services.ServiceBus.Tests/SubscriptionProcessorFacts.cs diff --git a/NuGet.Server.Common.sln b/NuGet.Server.Common.sln index 003ad1c4..3c2422a6 100644 --- a/NuGet.Server.Common.sln +++ b/NuGet.Server.Common.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.0 +VisualStudioVersion = 15.0.27005.2 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{8415FED7-1BED-4227-8B4F-BB7C24E041CD}" EndProject @@ -9,8 +9,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.KeyVault", " EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{7783A106-0F4C-4055-9AB4-413FB2C7B8F0}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.KeyVault.Tests", "tests\NuGet.Services.KeyVault.Tests\NuGet.Services.KeyVault.Tests.csproj", "{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{39C4007F-90EB-49A9-A41E-21861F81869C}" ProjectSection(SolutionItems) = preProject global.json = global.json @@ -62,10 +60,6 @@ Global {C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}.Debug|Any CPU.Build.0 = Debug|Any CPU {C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}.Release|Any CPU.ActiveCfg = Release|Any CPU {C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}.Release|Any CPU.Build.0 = Release|Any CPU - {BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Debug|Any CPU.Build.0 = Debug|Any CPU - {BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Release|Any CPU.ActiveCfg = Release|Any CPU - {BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Release|Any CPU.Build.0 = Release|Any CPU {6597FCAE-81FE-44F1-A9C8-69BEA01DB094}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6597FCAE-81FE-44F1-A9C8-69BEA01DB094}.Debug|Any CPU.Build.0 = Debug|Any CPU {6597FCAE-81FE-44F1-A9C8-69BEA01DB094}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -144,7 +138,6 @@ Global EndGlobalSection GlobalSection(NestedProjects) = preSolution {C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD} - {BA1FB5F1-8F6B-4558-862B-F47C3995B06A} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0} {6597FCAE-81FE-44F1-A9C8-69BEA01DB094} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD} {088F2BF5-1220-4125-BC64-601C2F032C13} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD} {8C24FB60-456A-4570-8400-08970F421415} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0} @@ -162,6 +155,7 @@ Global {C1E36A2C-1C1B-4521-B256-AD42505D9EFB} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD} {E29F54DF-DFB8-4E27-940D-21ECCB9B6FC1} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0} {79F72C83-E94D-4D04-B904-5A4DA161168E} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0} + {FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AA413DB0-5475-4B5D-A3AF-6323DA8D538B} diff --git a/src/NuGet.Services.Contracts/NuGet.Services.Contracts.csproj b/src/NuGet.Services.Contracts/NuGet.Services.Contracts.csproj index 7274c6a0..5b452f30 100644 --- a/src/NuGet.Services.Contracts/NuGet.Services.Contracts.csproj +++ b/src/NuGet.Services.Contracts/NuGet.Services.Contracts.csproj @@ -43,6 +43,7 @@ + diff --git a/src/NuGet.Services.Contracts/ServiceBus/IBrokeredMessage.cs b/src/NuGet.Services.Contracts/ServiceBus/IBrokeredMessage.cs index f953bed4..dee98368 100644 --- a/src/NuGet.Services.Contracts/ServiceBus/IBrokeredMessage.cs +++ b/src/NuGet.Services.Contracts/ServiceBus/IBrokeredMessage.cs @@ -12,6 +12,7 @@ public interface IBrokeredMessage : IDisposable IDictionary Properties { get; } DateTimeOffset ScheduledEnqueueTimeUtc { get; set; } Task CompleteAsync(); + Task AbandonAsync(); string GetBody(); IBrokeredMessage Clone(); } diff --git a/src/NuGet.Services.Contracts/ServiceBus/IOnMessageOptions.cs b/src/NuGet.Services.Contracts/ServiceBus/IOnMessageOptions.cs new file mode 100644 index 00000000..573b12b9 --- /dev/null +++ b/src/NuGet.Services.Contracts/ServiceBus/IOnMessageOptions.cs @@ -0,0 +1,10 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace NuGet.Services.ServiceBus +{ + public interface IOnMessageOptions + { + bool AutoComplete { get; set; } + } +} \ No newline at end of file diff --git a/src/NuGet.Services.Contracts/ServiceBus/ISubscriptionClient.cs b/src/NuGet.Services.Contracts/ServiceBus/ISubscriptionClient.cs index b3f078f3..68be564b 100644 --- a/src/NuGet.Services.Contracts/ServiceBus/ISubscriptionClient.cs +++ b/src/NuGet.Services.Contracts/ServiceBus/ISubscriptionClient.cs @@ -9,6 +9,9 @@ namespace NuGet.Services.ServiceBus public interface ISubscriptionClient { void OnMessageAsync(Func onMessageAsync); + + void OnMessageAsync(Func onMessageAsync, IOnMessageOptions options); + Task CloseAsync(); } } \ No newline at end of file diff --git a/src/NuGet.Services.ServiceBus/BrokeredMessageSerializer.cs b/src/NuGet.Services.ServiceBus/BrokeredMessageSerializer.cs index fc0d155b..ad11dd40 100644 --- a/src/NuGet.Services.ServiceBus/BrokeredMessageSerializer.cs +++ b/src/NuGet.Services.ServiceBus/BrokeredMessageSerializer.cs @@ -12,7 +12,7 @@ namespace NuGet.Services.ServiceBus /// type and schema version. /// /// A type decorated with a . - public class BrokeredMessageSerializer + public class BrokeredMessageSerializer : IBrokeredMessageSerializer { private const string SchemaNameKey = "SchemaName"; private const string SchemaVersionKey = "SchemaVersion"; diff --git a/src/NuGet.Services.ServiceBus/IBrokeredMessageSerializer.cs b/src/NuGet.Services.ServiceBus/IBrokeredMessageSerializer.cs new file mode 100644 index 00000000..2269c83f --- /dev/null +++ b/src/NuGet.Services.ServiceBus/IBrokeredMessageSerializer.cs @@ -0,0 +1,26 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace NuGet.Services.ServiceBus +{ + /// + /// Serializes objects into Service Bus . + /// + /// The type this serializer can serialize into . + public interface IBrokeredMessageSerializer + { + /// + /// Serialize a into a . + /// + /// The message to be serialized. + /// The serialized message. + IBrokeredMessage Serialize(TMessage message); + + /// + /// Deserialize a into a . + /// + /// The message to be deserialized. + /// The deserialized message. + TMessage Deserialize(IBrokeredMessage message); + } +} diff --git a/src/NuGet.Services.ServiceBus/IMessageHandler.cs b/src/NuGet.Services.ServiceBus/IMessageHandler.cs new file mode 100644 index 00000000..1c05319a --- /dev/null +++ b/src/NuGet.Services.ServiceBus/IMessageHandler.cs @@ -0,0 +1,21 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace NuGet.Services.ServiceBus +{ + /// + /// The class that handles messages received by a + /// + /// The type of messages this handler handles. + public interface IMessageHandler + { + /// + /// Handle the message. + /// + /// The received message. + /// Whether the message has been handled. If false, the message will be requeued to be handled again later. + Task HandleAsync(TMessage message); + } +} diff --git a/src/NuGet.Services.ServiceBus/ISubscriptionProcessor.cs b/src/NuGet.Services.ServiceBus/ISubscriptionProcessor.cs new file mode 100644 index 00000000..47baaf9b --- /dev/null +++ b/src/NuGet.Services.ServiceBus/ISubscriptionProcessor.cs @@ -0,0 +1,35 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace NuGet.Services.ServiceBus +{ + /// + /// Processes messages that were received from a Service Bus subscription. + /// + /// The type of message listened by this listener. + public interface ISubscriptionProcessor + { + /// + /// The number of messages that are currently being handled. + /// + int NumberOfMessagesInProgress { get; } + + /// + /// Start handling messages emitted to the Service Bus subscription. + /// + void Start(); + + /// + /// Deregisters the message handler. + /// + /// + /// There may still be messages in progress after the returned has completed! + /// The property should be polled to determine when all + /// messages have been completed. + /// + /// A task that completes when the message handler has been deregistered. + Task StartShutdownAsync(); + } +} diff --git a/src/NuGet.Services.ServiceBus/NuGet.Services.ServiceBus.csproj b/src/NuGet.Services.ServiceBus/NuGet.Services.ServiceBus.csproj index ace33025..f49cab35 100644 --- a/src/NuGet.Services.ServiceBus/NuGet.Services.ServiceBus.csproj +++ b/src/NuGet.Services.ServiceBus/NuGet.Services.ServiceBus.csproj @@ -42,6 +42,11 @@ + + + + + diff --git a/src/NuGet.Services.ServiceBus/OnMessageOptionsWrapper.cs b/src/NuGet.Services.ServiceBus/OnMessageOptionsWrapper.cs new file mode 100644 index 00000000..1c7ded53 --- /dev/null +++ b/src/NuGet.Services.ServiceBus/OnMessageOptionsWrapper.cs @@ -0,0 +1,24 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using Microsoft.ServiceBus.Messaging; + +namespace NuGet.Services.ServiceBus +{ + public sealed class OnMessageOptionsWrapper : IOnMessageOptions + { + public OnMessageOptions OnMessageOptions { get; set; } + + public bool AutoComplete + { + get => OnMessageOptions.AutoComplete; + set => OnMessageOptions.AutoComplete = value; + } + + public OnMessageOptionsWrapper(OnMessageOptions options = null) + { + OnMessageOptions = OnMessageOptions ?? new OnMessageOptions(); + } + } +} diff --git a/src/NuGet.Services.ServiceBus/SubscriptionClientWrapper.cs b/src/NuGet.Services.ServiceBus/SubscriptionClientWrapper.cs index 5148c56c..404ab00c 100644 --- a/src/NuGet.Services.ServiceBus/SubscriptionClientWrapper.cs +++ b/src/NuGet.Services.ServiceBus/SubscriptionClientWrapper.cs @@ -18,11 +18,37 @@ public SubscriptionClientWrapper(string connectionString, string topicPath, stri public void OnMessageAsync(Func onMessageAsync) { - _client.OnMessageAsync(innerMessage => + var callback = CreateOnMessageAsyncCallback(onMessageAsync); + + _client.OnMessageAsync(callback); + } + + public void OnMessageAsync(Func onMessageAsync, IOnMessageOptions options) + { + if (onMessageAsync == null) throw new ArgumentNullException(nameof(onMessageAsync)); + if (options == null) throw new ArgumentNullException(nameof(options)); + + // For now, assume the only implementation is the wrapper type. + if (! (options is OnMessageOptionsWrapper optionsWrapper)) + { + throw new ArgumentException( + $"Options must be of type {nameof(OnMessageOptionsWrapper)}", + nameof(options)); + } + + _client.OnMessageAsync( + CreateOnMessageAsyncCallback(onMessageAsync), + optionsWrapper.OnMessageOptions); + } + + private Func CreateOnMessageAsyncCallback(Func onMessageAsync) + { + return innerMessage => { var message = new BrokeredMessageWrapper(innerMessage); + return onMessageAsync(message); - }); + }; } public Task CloseAsync() diff --git a/src/NuGet.Services.ServiceBus/SubscriptionProcessor.cs b/src/NuGet.Services.ServiceBus/SubscriptionProcessor.cs new file mode 100644 index 00000000..03b06535 --- /dev/null +++ b/src/NuGet.Services.ServiceBus/SubscriptionProcessor.cs @@ -0,0 +1,95 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.ServiceBus.Messaging; + +namespace NuGet.Services.ServiceBus +{ + public class SubscriptionProcessor : ISubscriptionProcessor + { + private readonly ISubscriptionClient _client; + private readonly IBrokeredMessageSerializer _serializer; + private readonly IMessageHandler _handler; + private readonly ILogger> _logger; + + private int _numberOfMessagesInProgress; + + public int NumberOfMessagesInProgress => _numberOfMessagesInProgress; + + /// + /// Constructs a new subscription processor. + /// + /// The client used to receive messages from the subscription. + /// The serializer used to deserialize received messages. + /// The handler used to handle received messages. + /// The logger used to record debug information. + public SubscriptionProcessor( + ISubscriptionClient client, + IBrokeredMessageSerializer serializer, + IMessageHandler handler, + ILogger> logger) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _numberOfMessagesInProgress = 0; + } + + public void Start() + { + _logger.LogInformation("Registering the handler to begin listening to the Service Bus subscription"); + + _client.OnMessageAsync(OnMessageAsync, new OnMessageOptionsWrapper + { + AutoComplete = false, + }); + } + + private async Task OnMessageAsync(IBrokeredMessage brokeredMessage) + { + Interlocked.Increment(ref _numberOfMessagesInProgress); + + try + { + _logger.LogInformation("Received message from Service Bus subscription, processing"); + + var message = _serializer.Deserialize(brokeredMessage); + + if (await _handler.HandleAsync(message)) + { + _logger.LogInformation("Message was successfully handled, marking the brokered message as completed"); + + await brokeredMessage.CompleteAsync(); + } + else + { + _logger.LogInformation("Handler did not finish processing message, requeueing message to be reprocessed"); + } + } + catch (Exception e) + { + _logger.LogError("Requeueing message as it was unsuccessfully processed due to exception: {Exception}", e); + throw; + } + finally + { + Interlocked.Decrement(ref _numberOfMessagesInProgress); + } + } + + public async Task StartShutdownAsync() + { + _logger.LogInformation( + "Shutting down the subscription listener with {NumberOfMessagesInProgress} messages in progress", + NumberOfMessagesInProgress); + + await _client.CloseAsync(); + } + } +} diff --git a/src/NuGet.Services.ServiceBus/project.json b/src/NuGet.Services.ServiceBus/project.json index 458aa054..8ec43fa3 100644 --- a/src/NuGet.Services.ServiceBus/project.json +++ b/src/NuGet.Services.ServiceBus/project.json @@ -1,5 +1,6 @@ { "dependencies": { + "Microsoft.Extensions.Logging": "1.1.2", "Newtonsoft.Json": "9.0.1", "WindowsAzure.ServiceBus": "4.1.3" }, diff --git a/tests/NuGet.Services.ServiceBus.Tests/NuGet.Services.ServiceBus.Tests.csproj b/tests/NuGet.Services.ServiceBus.Tests/NuGet.Services.ServiceBus.Tests.csproj index 776a8ac8..48a7d773 100644 --- a/tests/NuGet.Services.ServiceBus.Tests/NuGet.Services.ServiceBus.Tests.csproj +++ b/tests/NuGet.Services.ServiceBus.Tests/NuGet.Services.ServiceBus.Tests.csproj @@ -43,6 +43,7 @@ + diff --git a/tests/NuGet.Services.ServiceBus.Tests/SubscriptionProcessorFacts.cs b/tests/NuGet.Services.ServiceBus.Tests/SubscriptionProcessorFacts.cs new file mode 100644 index 00000000..a6812982 --- /dev/null +++ b/tests/NuGet.Services.ServiceBus.Tests/SubscriptionProcessorFacts.cs @@ -0,0 +1,194 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace NuGet.Services.ServiceBus.Tests +{ + public class SubscriptionProcessorFacts + { + public class TestMessage + { + } + + public class TheStartMethod : Base + { + [Fact] + public async Task DoesntCallHandlerOnDeserializationException() + { + // Arrange + // Retrieve the OnMessageAsync callback that is registered to Service Bus's subscription client. + Func onMessageAsync = null; + + _client + .Setup(c => c.OnMessageAsync( + It.IsAny>(), + It.IsAny())) + .Callback, IOnMessageOptions>((callback, options) => onMessageAsync = callback); + + _serializer + .Setup(s => s.Deserialize(It.IsAny())) + .Throws(new Exception()); + + // Act + // Start processing messages and trigger the OnMessageAsync callback. + _target.Start(); + + await Assert.ThrowsAsync(() => onMessageAsync(_brokeredMessage.Object)); + + // Assert + Assert.Equal(0, _target.NumberOfMessagesInProgress); + + _serializer.Verify(s => s.Deserialize(It.IsAny()), Times.Once); + _handler.Verify(h => h.HandleAsync(It.IsAny()), Times.Never); + _brokeredMessage.Verify(m => m.CompleteAsync(), Times.Never); + } + + [Fact] + public async Task CallsHandlerWhenMessageIsReceived() + { + // Arrange + // Retrieve the OnMessageAsync callback that is registered to Service Bus's subscription client. + Func onMessageAsync = null; + int? messagesInProgressDuringHandler = null; + + _client + .Setup(c => c.OnMessageAsync( + It.IsAny>(), + It.IsAny())) + .Callback, IOnMessageOptions>((callback, options) => onMessageAsync = callback); + + _handler + .Setup(h => h.HandleAsync(It.IsAny())) + .Callback(() => messagesInProgressDuringHandler = _target.NumberOfMessagesInProgress) + .Returns(Task.FromResult(true)); + + // Act + // Start processing messages and trigger the OnMessageAsync callback. + _target.Start(); + + await onMessageAsync(_brokeredMessage.Object); + + // Assert + Assert.Equal(1, messagesInProgressDuringHandler); + Assert.Equal(0, _target.NumberOfMessagesInProgress); + + _serializer.Verify(s => s.Deserialize(It.IsAny()), Times.Once); + _handler.Verify(h => h.HandleAsync(It.IsAny()), Times.Once); + _brokeredMessage.Verify(m => m.CompleteAsync(), Times.Once); + } + + [Fact] + public async Task DoesNotCompleteMessageIfHandlerReturnsFalse() + { + // Arrange + // Retrieve the OnMessageAsync callback that is registered to Service Bus's subscription client. + Func onMessageAsync = null; + int? messagesInProgressDuringHandler = null; + + _client + .Setup(c => c.OnMessageAsync( + It.IsAny>(), + It.IsAny())) + .Callback, IOnMessageOptions>((callback, options) => onMessageAsync = callback); + + _handler + .Setup(h => h.HandleAsync(It.IsAny())) + .Callback(() => messagesInProgressDuringHandler = _target.NumberOfMessagesInProgress) + .Returns(Task.FromResult(false)); + + // Act + // Start processing messages and trigger the OnMessageAsync callback. + _target.Start(); + + await onMessageAsync(_brokeredMessage.Object); + + // Assert + Assert.Equal(1, messagesInProgressDuringHandler); + Assert.Equal(0, _target.NumberOfMessagesInProgress); + + _serializer.Verify(s => s.Deserialize(It.IsAny()), Times.Once); + _handler.Verify(h => h.HandleAsync(It.IsAny()), Times.Once); + _brokeredMessage.Verify(m => m.CompleteAsync(), Times.Never); + } + + [Fact] + public async Task BrokedMessageIsntCompletedIfHandlerThrows() + { + // Arrange + // Retrieve the OnMessageAsync callback that is registered to Service Bus's subscription client. + Func onMessageAsync = null; + int? messagesInProgressDuringHandler = null; + + _client + .Setup(c => c.OnMessageAsync( + It.IsAny>(), + It.IsAny())) + .Callback, IOnMessageOptions>((callback, options) => onMessageAsync = callback); + + _handler + .Setup(h => h.HandleAsync(It.IsAny())) + .Callback(() => messagesInProgressDuringHandler = _target.NumberOfMessagesInProgress) + .Throws(new Exception()); + + // Act + // Start processing messages and trigger the OnMessageAsync callback. + _target.Start(); + + await Assert.ThrowsAsync(() => onMessageAsync(_brokeredMessage.Object)); + + // Assert + Assert.Equal(1, messagesInProgressDuringHandler); + Assert.Equal(0, _target.NumberOfMessagesInProgress); + + _serializer.Verify(s => s.Deserialize(It.IsAny()), Times.Once); + _handler.Verify(h => h.HandleAsync(It.IsAny()), Times.Once); + _brokeredMessage.Verify(m => m.CompleteAsync(), Times.Never); + } + } + + public class TheStartShutdownAsyncMethod : Base + { + [Fact] + public async Task StopCallsTheClientsCloseAsyncMethod() + { + // Act + await _target.StartShutdownAsync(); + + // Assert + _client.Verify(c => c.CloseAsync(), Times.Once); + } + } + + public abstract class Base + { + protected readonly Mock _client; + protected readonly Mock> _serializer; + protected readonly Mock> _handler; + protected readonly SubscriptionProcessor _target; + + protected readonly Mock _brokeredMessage; + + public Base() + { + _client = new Mock(); + _serializer = new Mock>(); + _handler = new Mock>(); + + _brokeredMessage = new Mock(); + + var logger = new Mock>>(); + + _target = new SubscriptionProcessor( + _client.Object, + _serializer.Object, + _handler.Object, + logger.Object); + } + } + } +}