Skip to content
This repository has been archived by the owner on Aug 3, 2024. It is now read-only.
/ ServerCommon Public archive

Commit

Permalink
[Service Bus] Add SubscriptionProcessor (#74)
Browse files Browse the repository at this point in the history
Added a `SubscriptionProcessor` to the `NuGet.Services.ServiceBus` project. Its purpose is to facilitate processing a specific message type from a given Service Bus subscription. This is similar to @agr's [`Orchestrator`](https://github.com/NuGet/NuGet.Jobs/pull/226/files#diff-0bcef9abb72b34c9e7c76d551f9ae9cd). The pattern to use this class should follow the Orchestrator’s code:

1. A job should call the `SubscriptionProcessor`’s `Start` method within its `Run` method
2. A job should “block” the `Run`’s thread
3. On shutdown, a job should:
  a. Call the `SubscriptionProcessor`’s `StartShutdownAsync` method
  b. Wait until the `SubscriptionProcessor`’s `NumberOfMessagesInProgress` goes to 0

This will be used by the Package Signing jobs to listen to Service Bus queues and process validation requests.
  • Loading branch information
loic-sharma authored Oct 18, 2017
1 parent e7ef8ee commit 1465c68
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 11 deletions.
10 changes: 2 additions & 8 deletions NuGet.Server.Common.sln
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.0
VisualStudioVersion = 15.0.27005.2
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{8415FED7-1BED-4227-8B4F-BB7C24E041CD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.KeyVault", "src\NuGet.Services.KeyVault\NuGet.Services.KeyVault.csproj", "{C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{7783A106-0F4C-4055-9AB4-413FB2C7B8F0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.KeyVault.Tests", "tests\NuGet.Services.KeyVault.Tests\NuGet.Services.KeyVault.Tests.csproj", "{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{39C4007F-90EB-49A9-A41E-21861F81869C}"
ProjectSection(SolutionItems) = preProject
global.json = global.json
Expand Down Expand Up @@ -62,10 +60,6 @@ Global
{C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11}.Release|Any CPU.Build.0 = Release|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Release|Any CPU.Build.0 = Release|Any CPU
{6597FCAE-81FE-44F1-A9C8-69BEA01DB094}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6597FCAE-81FE-44F1-A9C8-69BEA01DB094}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6597FCAE-81FE-44F1-A9C8-69BEA01DB094}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -144,7 +138,6 @@ Global
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{C87D0EF1-54AA-4B0B-89DE-CFF2DC941D11} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD}
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
{6597FCAE-81FE-44F1-A9C8-69BEA01DB094} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD}
{088F2BF5-1220-4125-BC64-601C2F032C13} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD}
{8C24FB60-456A-4570-8400-08970F421415} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
Expand All @@ -162,6 +155,7 @@ Global
{C1E36A2C-1C1B-4521-B256-AD42505D9EFB} = {8415FED7-1BED-4227-8B4F-BB7C24E041CD}
{E29F54DF-DFB8-4E27-940D-21ECCB9B6FC1} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
{79F72C83-E94D-4D04-B904-5A4DA161168E} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
{FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AA413DB0-5475-4B5D-A3AF-6323DA8D538B}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<Compile Include="ServiceBus\IBrokeredMessage.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Properties\AssemblyInfo.*.cs" />
<Compile Include="ServiceBus\IOnMessageOptions.cs" />
<Compile Include="ServiceBus\ISubscriptionClient.cs" />
<Compile Include="ServiceBus\ITopicClient.cs" />
<Compile Include="Validation\IValidationRequest.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public interface IBrokeredMessage : IDisposable
IDictionary<string, object> Properties { get; }
DateTimeOffset ScheduledEnqueueTimeUtc { get; set; }
Task CompleteAsync();
Task AbandonAsync();
string GetBody();
IBrokeredMessage Clone();
}
Expand Down
10 changes: 10 additions & 0 deletions src/NuGet.Services.Contracts/ServiceBus/IOnMessageOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

namespace NuGet.Services.ServiceBus
{
public interface IOnMessageOptions
{
bool AutoComplete { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ namespace NuGet.Services.ServiceBus
public interface ISubscriptionClient
{
void OnMessageAsync(Func<IBrokeredMessage, Task> onMessageAsync);

void OnMessageAsync(Func<IBrokeredMessage, Task> onMessageAsync, IOnMessageOptions options);

Task CloseAsync();
}
}
2 changes: 1 addition & 1 deletion src/NuGet.Services.ServiceBus/BrokeredMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NuGet.Services.ServiceBus
/// type and schema version.
/// </summary>
/// <typeparam name="TMessage">A type decorated with a <see cref="SchemaAttribute"/>.</typeparam>
public class BrokeredMessageSerializer<TMessage>
public class BrokeredMessageSerializer<TMessage> : IBrokeredMessageSerializer<TMessage>
{
private const string SchemaNameKey = "SchemaName";
private const string SchemaVersionKey = "SchemaVersion";
Expand Down
26 changes: 26 additions & 0 deletions src/NuGet.Services.ServiceBus/IBrokeredMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

namespace NuGet.Services.ServiceBus
{
/// <summary>
/// Serializes objects into Service Bus <see cref="IBrokeredMessage"/>.
/// </summary>
/// <typeparam name="TMessage">The type this serializer can serialize into <see cref="IBrokeredMessage"/>.</typeparam>
public interface IBrokeredMessageSerializer<TMessage>
{
/// <summary>
/// Serialize a <see cref="TMessage"/> into a <see cref="IBrokeredMessage"/>.
/// </summary>
/// <param name="message">The message to be serialized.</param>
/// <returns>The serialized message.</returns>
IBrokeredMessage Serialize(TMessage message);

/// <summary>
/// Deserialize a <see cref="IBrokeredMessage"/> into a <see cref="TMessage"/>.
/// </summary>
/// <param name="message">The message to be deserialized.</param>
/// <returns>The deserialized message.</returns>
TMessage Deserialize(IBrokeredMessage message);
}
}
21 changes: 21 additions & 0 deletions src/NuGet.Services.ServiceBus/IMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Threading.Tasks;

namespace NuGet.Services.ServiceBus
{
/// <summary>
/// The class that handles messages received by a <see cref="ISubscriptionProcessor{TMessage}"/>
/// </summary>
/// <typeparam name="TMessage">The type of messages this handler handles.</typeparam>
public interface IMessageHandler<TMessage>
{
/// <summary>
/// Handle the message.
/// </summary>
/// <param name="message">The received message.</param>
/// <returns>Whether the message has been handled. If false, the message will be requeued to be handled again later.</returns>
Task<bool> HandleAsync(TMessage message);
}
}
35 changes: 35 additions & 0 deletions src/NuGet.Services.ServiceBus/ISubscriptionProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Threading.Tasks;

namespace NuGet.Services.ServiceBus
{
/// <summary>
/// Processes messages that were received from a Service Bus subscription.
/// </summary>
/// <typeparam name="TMessage">The type of message listened by this listener.</typeparam>
public interface ISubscriptionProcessor<TMessage>
{
/// <summary>
/// The number of messages that are currently being handled.
/// </summary>
int NumberOfMessagesInProgress { get; }

/// <summary>
/// Start handling messages emitted to the Service Bus subscription.
/// </summary>
void Start();

/// <summary>
/// Deregisters the message handler.
/// </summary>
/// <remarks>
/// There may still be messages in progress after the returned <see cref="Task"/> has completed!
/// The <see cref="NumberOfMessagesInProgress"/> property should be polled to determine when all
/// messages have been completed.
/// </remarks>
/// <returns>A task that completes when the message handler has been deregistered.</returns>
Task StartShutdownAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
<ItemGroup>
<Compile Include="BrokeredMessageSerializer.cs" />
<Compile Include="BrokeredMessageWrapper.cs" />
<Compile Include="IBrokeredMessageSerializer.cs" />
<Compile Include="IMessageHandler.cs" />
<Compile Include="ISubscriptionProcessor.cs" />
<Compile Include="OnMessageOptionsWrapper.cs" />
<Compile Include="SubscriptionProcessor.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Properties\AssemblyInfo.*.cs" />
<Compile Include="SchemaAttribute.cs" />
Expand Down
24 changes: 24 additions & 0 deletions src/NuGet.Services.ServiceBus/OnMessageOptionsWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using Microsoft.ServiceBus.Messaging;

namespace NuGet.Services.ServiceBus
{
public sealed class OnMessageOptionsWrapper : IOnMessageOptions
{
public OnMessageOptions OnMessageOptions { get; set; }

public bool AutoComplete
{
get => OnMessageOptions.AutoComplete;
set => OnMessageOptions.AutoComplete = value;
}

public OnMessageOptionsWrapper(OnMessageOptions options = null)
{
OnMessageOptions = OnMessageOptions ?? new OnMessageOptions();
}
}
}
30 changes: 28 additions & 2 deletions src/NuGet.Services.ServiceBus/SubscriptionClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,37 @@ public SubscriptionClientWrapper(string connectionString, string topicPath, stri

public void OnMessageAsync(Func<IBrokeredMessage, Task> onMessageAsync)
{
_client.OnMessageAsync(innerMessage =>
var callback = CreateOnMessageAsyncCallback(onMessageAsync);

_client.OnMessageAsync(callback);
}

public void OnMessageAsync(Func<IBrokeredMessage, Task> onMessageAsync, IOnMessageOptions options)
{
if (onMessageAsync == null) throw new ArgumentNullException(nameof(onMessageAsync));
if (options == null) throw new ArgumentNullException(nameof(options));

// For now, assume the only implementation is the wrapper type.
if (! (options is OnMessageOptionsWrapper optionsWrapper))
{
throw new ArgumentException(
$"Options must be of type {nameof(OnMessageOptionsWrapper)}",
nameof(options));
}

_client.OnMessageAsync(
CreateOnMessageAsyncCallback(onMessageAsync),
optionsWrapper.OnMessageOptions);
}

private Func<BrokeredMessage, Task> CreateOnMessageAsyncCallback(Func<IBrokeredMessage, Task> onMessageAsync)
{
return innerMessage =>
{
var message = new BrokeredMessageWrapper(innerMessage);

return onMessageAsync(message);
});
};
}

public Task CloseAsync()
Expand Down
95 changes: 95 additions & 0 deletions src/NuGet.Services.ServiceBus/SubscriptionProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.ServiceBus.Messaging;

namespace NuGet.Services.ServiceBus
{
public class SubscriptionProcessor<TMessage> : ISubscriptionProcessor<TMessage>
{
private readonly ISubscriptionClient _client;
private readonly IBrokeredMessageSerializer<TMessage> _serializer;
private readonly IMessageHandler<TMessage> _handler;
private readonly ILogger<SubscriptionProcessor<TMessage>> _logger;

private int _numberOfMessagesInProgress;

public int NumberOfMessagesInProgress => _numberOfMessagesInProgress;

/// <summary>
/// Constructs a new subscription processor.
/// </summary>
/// <param name="client">The client used to receive messages from the subscription.</param>
/// <param name="serializer">The serializer used to deserialize received messages.</param>
/// <param name="handler">The handler used to handle received messages.</param>
/// <param name="logger">The logger used to record debug information.</param>
public SubscriptionProcessor(
ISubscriptionClient client,
IBrokeredMessageSerializer<TMessage> serializer,
IMessageHandler<TMessage> handler,
ILogger<SubscriptionProcessor<TMessage>> logger)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_numberOfMessagesInProgress = 0;
}

public void Start()
{
_logger.LogInformation("Registering the handler to begin listening to the Service Bus subscription");

_client.OnMessageAsync(OnMessageAsync, new OnMessageOptionsWrapper
{
AutoComplete = false,
});
}

private async Task OnMessageAsync(IBrokeredMessage brokeredMessage)
{
Interlocked.Increment(ref _numberOfMessagesInProgress);

try
{
_logger.LogInformation("Received message from Service Bus subscription, processing");

var message = _serializer.Deserialize(brokeredMessage);

if (await _handler.HandleAsync(message))
{
_logger.LogInformation("Message was successfully handled, marking the brokered message as completed");

await brokeredMessage.CompleteAsync();
}
else
{
_logger.LogInformation("Handler did not finish processing message, requeueing message to be reprocessed");
}
}
catch (Exception e)
{
_logger.LogError("Requeueing message as it was unsuccessfully processed due to exception: {Exception}", e);
throw;
}
finally
{
Interlocked.Decrement(ref _numberOfMessagesInProgress);
}
}

public async Task StartShutdownAsync()
{
_logger.LogInformation(
"Shutting down the subscription listener with {NumberOfMessagesInProgress} messages in progress",
NumberOfMessagesInProgress);

await _client.CloseAsync();
}
}
}
1 change: 1 addition & 0 deletions src/NuGet.Services.ServiceBus/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"Microsoft.Extensions.Logging": "1.1.2",
"Newtonsoft.Json": "9.0.1",
"WindowsAzure.ServiceBus": "4.1.3"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<Compile Include="BrokeredMessageWrapperFacts.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="BrokeredMessageSerializerFacts.cs" />
<Compile Include="SubscriptionProcessorFacts.cs" />
</ItemGroup>
<ItemGroup>
<None Include="project.json" />
Expand Down
Loading

0 comments on commit 1465c68

Please sign in to comment.