Skip to content
This repository has been archived by the owner on Aug 3, 2024. It is now read-only.
/ ServerCommon Public archive

Commit

Permalink
[Package Signing] Added a Scoped Service Bus Message Handler (#82)
Browse files Browse the repository at this point in the history
Often times, Service Bus message handlers will need to use some sort of Entity Framework context. This requires that the handlers' callbacks are executed within their own dependency injection scope. This change creates a new generic Service Bus message handler `ScopedMessageHandler` that, upon receiving a message, creates a new dependency injection scope.

This change also makes `CachingSecretReader` thread-safe, as otherwise, the `ScopedMessageHandler` would need a lock.
  • Loading branch information
loic-sharma authored Nov 8, 2017
1 parent 972413a commit b84a8d8
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 65 deletions.
9 changes: 8 additions & 1 deletion NuGet.Server.Common.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27005.2
VisualStudioVersion = 15.0.27004.2006
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{8415FED7-1BED-4227-8B4F-BB7C24E041CD}"
EndProject
Expand Down Expand Up @@ -50,6 +50,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.Contracts.Te
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.ServiceBus.Tests", "tests\NuGet.Services.ServiceBus.Tests\NuGet.Services.ServiceBus.Tests.csproj", "{FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.KeyVault.Tests", "tests\NuGet.Services.KeyVault.Tests\NuGet.Services.KeyVault.Tests.csproj", "{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -132,6 +134,10 @@ Global
{FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7}.Release|Any CPU.Build.0 = Release|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -156,6 +162,7 @@ Global
{E29F54DF-DFB8-4E27-940D-21ECCB9B6FC1} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
{79F72C83-E94D-4D04-B904-5A4DA161168E} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
{FF5CA51A-CD6A-463F-AE9A-5737FF0FCFA7} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
{BA1FB5F1-8F6B-4558-862B-F47C3995B06A} = {7783A106-0F4C-4055-9AB4-413FB2C7B8F0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AA413DB0-5475-4B5D-A3AF-6323DA8D538B}
Expand Down
58 changes: 42 additions & 16 deletions src/NuGet.Services.KeyVault/CachingSecretReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,73 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace NuGet.Services.KeyVault
{
public class CachingSecretReader : ISecretReader
{
public const int DefaultRefreshIntervalSec = 60 * 60 * 24; // 1 day
private readonly int _refreshIntervalSec;

private readonly ISecretReader _internalReader;
private readonly Dictionary<string, Tuple<string, DateTime>> _cache;
private readonly ConcurrentDictionary<string, CachedSecret> _cache;
private readonly TimeSpan _refreshInterval;

public CachingSecretReader(ISecretReader secretReader, int refreshIntervalSec = DefaultRefreshIntervalSec)
{
if (secretReader == null)
_internalReader = secretReader ?? throw new ArgumentNullException(nameof(secretReader));
_cache = new ConcurrentDictionary<string, CachedSecret>();

_refreshInterval = TimeSpan.FromSeconds(refreshIntervalSec);
}

public async Task<string> GetSecretAsync(string secretName)
{
if (string.IsNullOrEmpty(secretName))
{
throw new ArgumentNullException(nameof(secretReader));
throw new ArgumentException("Null or empty secret name", nameof(secretName));
}

_internalReader = secretReader;
_cache = new Dictionary<string, Tuple<string, DateTime>>();
// If the cache contains the secret and it is not expired, return the cached value.
if (_cache.TryGetValue(secretName, out CachedSecret result)
&& !IsSecretOutdated(result))
{
return result.Value;
}

// The cache does not contain a fresh copy of the secret. Fetch and cache the secret.
var updatedValue = new CachedSecret(await _internalReader.GetSecretAsync(secretName));

_refreshIntervalSec = refreshIntervalSec;
return _cache.AddOrUpdate(secretName, updatedValue, (key, old) => updatedValue)
.Value;
}

public virtual bool IsSecretOutdated(Tuple<string, DateTime> cachedSecret)
private bool IsSecretOutdated(CachedSecret secret)
{
return DateTime.UtcNow.Subtract(cachedSecret.Item2).TotalSeconds >= _refreshIntervalSec;
return (DateTime.UtcNow - secret.CacheTime) >= _refreshInterval;
}

public async Task<string> GetSecretAsync(string secretName)
/// <summary>
/// A cached secret.
/// </summary>
private class CachedSecret
{
if (!_cache.ContainsKey(secretName) || IsSecretOutdated(_cache[secretName]))
public CachedSecret(string value)
{
// Get the secret if it is not yet in the cache or it is outdated.
var secretValue = await _internalReader.GetSecretAsync(secretName);
_cache[secretName] = Tuple.Create(secretValue, DateTime.UtcNow);
Value = value;
CacheTime = DateTimeOffset.UtcNow;
}

return _cache[secretName].Item1;
/// <summary>
/// The value of the cached secret.
/// </summary>
public string Value { get; }

/// <summary>
/// The time at which the secret was cached.
/// </summary>
public DateTimeOffset CacheTime { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<Compile Include="IMessageHandler.cs" />
<Compile Include="ISubscriptionProcessor.cs" />
<Compile Include="OnMessageOptionsWrapper.cs" />
<Compile Include="ScopedMessageHandler.cs" />
<Compile Include="SubscriptionProcessor.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Properties\AssemblyInfo.*.cs" />
Expand Down
52 changes: 52 additions & 0 deletions src/NuGet.Services.ServiceBus/ScopedMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace NuGet.Services.ServiceBus
{
/// <summary>
/// Handles messages received by a <see cref="ISubscriptionProcessor{TMessage}"/>.
/// Each message will be handled within its own dependency injection scope.
/// </summary>
/// <typeparam name="TMessage">The type of messages this handler handles.</typeparam>
public class ScopedMessageHandler<TMessage>
{
/// <summary>
/// The factory used to create independent dependency injection scopes for each message.
/// </summary>
private readonly IServiceScopeFactory _scopeFactory;

public ScopedMessageHandler(IServiceScopeFactory scopeFactory)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
}

/// <summary>
/// Handle the message in its own dependency injection scope.
/// </summary>
/// <param name="message">The received message.</param>
/// <returns>Whether the message has been handled. If false, the message will be requeued to be handled again later.</returns>
public async Task<bool> HandleAsync(TMessage message)
{
// Create a new scope for this message.
using (var scope = _scopeFactory.CreateScope())
{
// Resolve a new message handler for the newly created scope and let it handle the message.
return await ResolveMessageHandler(scope).HandleAsync(message);
}
}

/// <summary>
/// Resolve the message handler given a specific dependency injection scope.
/// </summary>
/// <param name="scope">The dependency injection scope that should be used to resolve services.</param>
/// <returns>The resolved message handler service from the given scope.</returns>
private IMessageHandler<TMessage> ResolveMessageHandler(IServiceScope scope)
{
return scope.ServiceProvider.GetRequiredService<IMessageHandler<TMessage>>();
}
}
}
1 change: 1 addition & 0 deletions src/NuGet.Services.ServiceBus/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"Microsoft.Extensions.DependencyInjection.Abstractions": "1.1.1",
"Microsoft.Extensions.Logging": "1.1.2",
"Newtonsoft.Json": "9.0.1",
"WindowsAzure.ServiceBus": "4.1.3"
Expand Down
61 changes: 13 additions & 48 deletions tests/NuGet.Services.KeyVault.Tests/CachingSecretReaderFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Moq;
using Xunit;
using System.Threading;
using System.Diagnostics;

namespace NuGet.Services.KeyVault.Tests
{
Expand All @@ -31,27 +32,6 @@ public async Task WhenGetSecretIsCalledCacheIsUsed()
Assert.Equal(value1, value2);
}

[Theory]
// Secret was refreshed more recently than the refresh interval and is not outdated.
[InlineData(2, 1, false)]
// Secret was refreshed after the refresh interval is outdated
[InlineData(2, 3, true)]
// Secret was refreshed exactly on the refresh interval and is outdated.
[InlineData(2, 2, true)]
public void CorrectlyIdentifiesOutdatedSecrets(int refreshIntervalSec, int secretLastRefreshedSec, bool isOutdated)
{
// Arrange
var cachingSecretReaderMock = new Mock<CachingSecretReader>(new Mock<ISecretReader>().Object, refreshIntervalSec) {CallBase = true};
var secretToCheck = Tuple.Create("secretName",
DateTime.UtcNow.Add(new TimeSpan(0, 0, -secretLastRefreshedSec)));

// Act
var result = cachingSecretReaderMock.Object.IsSecretOutdated(secretToCheck);

// Assert
Assert.Equal(isOutdated, result);
}

[Fact]
public async Task WhenGetSecretIsCalledCacheIsRefreshedIfPastInterval()
{
Expand All @@ -62,50 +42,35 @@ public async Task WhenGetSecretIsCalledCacheIsRefreshedIfPastInterval()
const int refreshIntervalSec = 1;

var mockSecretReader = new Mock<ISecretReader>();
mockSecretReader.Setup(x => x.GetSecretAsync(It.IsAny<string>())).Returns(Task.FromResult(firstSecret));

var cachingSecretReaderMock = new Mock<CachingSecretReader>(mockSecretReader.Object, refreshIntervalSec)
{
CallBase = true
};

var hasIntervalPassed = false;
cachingSecretReaderMock.Setup(x => x.IsSecretOutdated(It.IsAny<Tuple<string, DateTime>>())).Returns(() =>
{
// If the interval hasn't passed, the secret we have stored is not outdated.
if (!hasIntervalPassed)
{
return false;
}
mockSecretReader
.SetupSequence(x => x.GetSecretAsync(It.IsAny<string>()))
.Returns(Task.FromResult(firstSecret))
.Returns(Task.FromResult(secondSecret));

// If the interval has passed, the secret is outdated.
// It will be refreshed and then the interval will not have passed again.
hasIntervalPassed = false;
return true;
});
var cachingSecretReader = new CachingSecretReader(mockSecretReader.Object, refreshIntervalSec);

// Act
var firstValue1 = await cachingSecretReaderMock.Object.GetSecretAsync(secretName);
var firstValue2 = await cachingSecretReaderMock.Object.GetSecretAsync(secretName);
var firstValue1 = await cachingSecretReader.GetSecretAsync(secretName);
var firstValue2 = await cachingSecretReader.GetSecretAsync(secretName);

// Assert
mockSecretReader.Verify(x => x.GetSecretAsync(It.IsAny<string>()), Times.Once);
Assert.Equal(firstSecret, firstValue1);
Assert.Equal(firstValue1, firstValue2);
Assert.Equal(firstSecret, firstValue2);

// Arrange 2
// We are now x seconds later after refreshIntervalSec has passed.
hasIntervalPassed = true;
mockSecretReader.Setup(x => x.GetSecretAsync(It.IsAny<string>())).Returns(Task.FromResult(secondSecret));
await Task.Delay(TimeSpan.FromSeconds(refreshIntervalSec * 2));

// Act 2
var secondValue1 = await cachingSecretReaderMock.Object.GetSecretAsync(secretName);
var secondValue2 = await cachingSecretReaderMock.Object.GetSecretAsync(secretName);
var secondValue1 = await cachingSecretReader.GetSecretAsync(secretName);
var secondValue2 = await cachingSecretReader.GetSecretAsync(secretName);

// Assert 2
mockSecretReader.Verify(x => x.GetSecretAsync(It.IsAny<string>()), Times.Exactly(2));
Assert.Equal(secondSecret, secondValue1);
Assert.Equal(secondValue1, secondValue2);
Assert.Equal(secondSecret, secondValue2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<Compile Include="BrokeredMessageWrapperFacts.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="BrokeredMessageSerializerFacts.cs" />
<Compile Include="ScopedMessageHandlerFacts.cs" />
<Compile Include="SubscriptionProcessorFacts.cs" />
</ItemGroup>
<ItemGroup>
Expand Down
47 changes: 47 additions & 0 deletions tests/NuGet.Services.ServiceBus.Tests/ScopedMessageHandlerFacts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using Xunit;

namespace NuGet.Services.ServiceBus.Tests
{
public class ScopedMessageHandlerFacts
{
[Fact]
public async Task CreatesMessageHandlerUsingScope()
{
// Arrange
var scopeFactoryMock = new Mock<IServiceScopeFactory>();
var scopeMock = new Mock<IServiceScope>();
var serviceProviderMock = new Mock<IServiceProvider>();
var handlerMock = new Mock<IMessageHandler<object>>();

scopeFactoryMock
.Setup(f => f.CreateScope())
.Returns(scopeMock.Object);

scopeMock
.SetupGet(s => s.ServiceProvider)
.Returns(serviceProviderMock.Object);

serviceProviderMock
.Setup(p => p.GetService(typeof(IMessageHandler<object>)))
.Returns(handlerMock.Object);

var target = new ScopedMessageHandler<object>(scopeFactoryMock.Object);

// Act - send two empty messages.
await target.HandleAsync(new object());
await target.HandleAsync(new object());

// Assert
scopeFactoryMock.Verify(f => f.CreateScope(), Times.Exactly(2));
serviceProviderMock.Verify(p => p.GetService(typeof(IMessageHandler<object>)), Times.Exactly(2));
handlerMock.Verify(h => h.HandleAsync(It.IsAny<object>()), Times.Exactly(2));
}
}
}

0 comments on commit b84a8d8

Please sign in to comment.