Skip to content

Commit

Permalink
Implemented a parallel request resolver.
Browse files Browse the repository at this point in the history
Working towards resolving kapetan#39
  • Loading branch information
jasoncouture committed Jul 6, 2019
1 parent 27cc83a commit 9949214
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 10 deletions.
70 changes: 70 additions & 0 deletions DNS/Client/RequestResolver/ParallelRequestResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using DNS.Protocol;
using DNS.Protocol.Utils;
using System.Collections.Generic;
using System.Linq;

namespace DNS.Client.RequestResolver
{
/// <summary>
/// Resolve requests using multiple IRequestResolvers, taking the first result.
/// </summary>
public class ParallelRequestResolver : IRequestResolver
{
private List<IRequestResolver> resolvers;
/// <summary>
/// Create a new instance of ParallelRequestResolver
/// </summary>
/// <param name="innerResolvers"></param>
/// <exception cref="System.ArgumentException">Thrown when <paramref name="innerResolvers">innerResolvers</paramref> does not contain at least 1 resolver.</exception>
public ParallelRequestResolver(IEnumerable<IRequestResolver> innerResolvers)
{
resolvers = innerResolvers.ToList();
if (resolvers.Count == 0) throw new ArgumentException("No inner DNS resolvers were provided!", nameof(innerResolvers));
}

public async Task<IResponse> Resolve(IRequest request, CancellationToken cancellationToken = default)
{
CancellationTokenSource requestCompletedCancellationSource = new CancellationTokenSource();
var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(requestCompletedCancellationSource.Token, cancellationToken);
List<Exception> exceptions = new List<Exception>();
var tasks = resolvers.Select(i => i.Resolve(request, linkedSource.Token)).ToList();
bool done = false;
IResponse response = null;
while (response == null)
{
if (tasks.Count == 0)
break;
var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false);
try
{
// We could check the task manually, but this way will handle edge cases.
response = await completedTask.ConfigureAwait(false);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
tasks.Remove(completedTask);
}

if (tasks.Any())
{
tasks = tasks.Select(i => i.SwallowExceptions(null)).ToList();
}

requestCompletedCancellationSource.Cancel();
if (response == null)
{
throw new AggregateException(exceptions);
}

// Should response be wrapped with something that exposes exceptions?
// IE: public class ResponseWithExceptions : IResponse
// new ResponseWithExceptions(response, exceptions)
return response;
}
}
}
38 changes: 28 additions & 10 deletions DNS/Protocol/Utils/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,44 @@
using System.Threading;
using System.Threading.Tasks;

namespace DNS.Protocol.Utils {
public static class TaskExtensions {
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken token) {
namespace DNS.Protocol.Utils
{
public static class TaskExtensions
{
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken token)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
CancellationTokenRegistration registration = token.Register(src => {
((TaskCompletionSource<bool>) src).TrySetResult(true);
CancellationTokenRegistration registration = token.Register(src =>
{
((TaskCompletionSource<bool>)src).TrySetResult(true);
}, tcs);

using (registration) {
if (await Task.WhenAny(task, tcs.Task) != task) {
using (registration)
{
if (await Task.WhenAny(task, tcs.Task) != task)
{
throw new OperationCanceledException(token);
}
}

return await task;
}

public static async Task<T> WithCancellationTimeout<T>(this Task<T> task, TimeSpan timeout, CancellationToken cancellationToken = default(CancellationToken)) {
public static async Task<T> SwallowExceptions<T>(this Task<T> task, T defaultValue = default)
{
try
{
return await task;
}
catch
{
return defaultValue;
}
}
public static async Task<T> WithCancellationTimeout<T>(this Task<T> task, TimeSpan timeout, CancellationToken cancellationToken = default(CancellationToken))
{
using (CancellationTokenSource timeoutSource = new CancellationTokenSource(timeout))
using (CancellationTokenSource linkSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutSource.Token, cancellationToken)) {
using (CancellationTokenSource linkSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutSource.Token, cancellationToken))
{
return await task.WithCancellation(linkSource.Token);
}
}
Expand Down

0 comments on commit 9949214

Please sign in to comment.