Skip to content

Commit

Permalink
Add cancellation tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusz96 committed Aug 13, 2024
1 parent bd73c5c commit 506e508
Show file tree
Hide file tree
Showing 15 changed files with 437 additions and 54 deletions.
65 changes: 56 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ dotnet add package PipelineNet
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*

- [Simple example](#simple-example)
- [Pipeline vs Chain of responsibility](#pipeline-vs-chain-of-responsibility)
- [Middleware](#middleware)
- [Pipelines](#pipelines)
- [Chains of responsibility](#chains-of-responsibility)
- [Middleware resolver](#middleware-resolver)
- [License](#license)
- [Simple example](#simple-example)
- [Pipeline vs Chain of responsibility](#pipeline-vs-chain-of-responsibility)
- [Middleware](#middleware)
- [Pipelines](#pipelines)
- [Chains of responsibility](#chains-of-responsibility)
- [Cancellation tokens](#cancellation-tokens)
- [Middleware resolver](#middleware-resolver)
- [ServiceProvider implementation](#serviceprovider-implementation)
- [Unity implementation](#unity-implementation)
- [License](#license)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -174,8 +177,8 @@ As we already have an example of a chain of responsibility, here is an example u
If you want to, you can use the asynchronous version, using asynchronous middleware. Changing the instantiation to:
```C#
var exceptionHandlersChain = new AsyncResponsibilityChain<Exception, bool>(new ActivatorMiddlewareResolver())
.Chain<OutOfMemoryAsyncExceptionHandler>() // The order of middleware being chained matters
.Chain<ArgumentAsyncExceptionHandler>()
.Chain<OutOfMemoryExceptionAsyncHandler>() // The order of middleware being chained matters
.Chain<ArgumentExceptionAsyncHandler>()
.Finally((ex) =>
{
ex.Source = ExceptionSource;
Expand All @@ -194,6 +197,50 @@ result = await exceptionHandlersChain.Execute(new ArgumentException()); // Resul
result = await exceptionHandlersChain.Execute(new InvalidOperationException()); // Result will be false
```

## Cancellation tokens
If you want to pass the cancellation token to your asynchronous pipeline middleware, you can do so by implementing the `ICancellableAsyncMiddleware<TParameter>` interface
and passing the cancellation token argument to the `IAsyncPipeline<TParameter>.Execute` method:
```C#
var pipeline = new AsyncPipeline<Bitmap>(new ActivatorMiddlewareResolver())
.AddCancellable<RoudCornersCancellableAsyncMiddleware>()
.Add<AddTransparencyAsyncMiddleware>() // You can mix both kinds of asynchronous middleware
.AddCancellable<AddWatermarkCancellableAsyncMiddleware>();

Bitmap image = (Bitmap) Image.FromFile("party-photo.png");
CancellationToken cancellationToken = CancellationToken.None;

await pipeline.Execute(image, cancellationToken);

public class RoudCornersCancellableAsyncMiddleware : ICancellableAsyncMiddleware<Bitmap>
{
public async Task Run(Bitmap parameter, Func<Bitmap, Task> next, CancellationToken cancellationToken)
{
await RoundCournersAsync(parameter, cancellationToken);

await next(parameter);
}

private async Task RoudCournersAsync(Bitmap bitmap, CancellationToken cancellationToken)
{
// Handle somehow
await Task.CompletedTask;
}
}
```
To pass the cancellation token to your asynchronous chain of responsibility middleware instead, implement the `ICancellableAsyncMiddleware<TParameter, TReturn>` interface
and pass the cancellation token argument to the `IAsynchChainOfResponsibility<TParamete, TReturnr>.Execute` method. There is also the `CancellableFinally` method that accepts
the cancellation token parameter:
```C#
var exceptionHandlersChain = new ResponsibilityChain<Exception, bool>(new ActivatorMiddlewareResolver())
.ChainCancellable<OutOfMemoryExceptionCancellableAsyncHandler>()
.ChainCancellable<ArgumentExceptionCancellableAsyncHandler>()
.CancellableFinally((parameter, cancellationToken) =>
{
// Do something
return true;
});
```

## Middleware resolver
You may be wondering what is all this `ActivatorMiddlewareResolver` class being passed to every instance of pipeline and chain of responsibility.
This is a default implementation of the `IMiddlewareResolver`, which is used to create instances of the middleware types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ public async Task<bool> Run(Exception exception, Func<Exception, Task<bool>> exe
return await executeNext(exception);
}
}

public class ThrowIfCancellationRequestedMiddleware : ICancellableAsyncMiddleware<Exception, bool>
{
public async Task<bool> Run(Exception exception, Func<Exception, Task<bool>> executeNext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return await executeNext(exception);
}
}
#endregion

[Fact]
Expand Down Expand Up @@ -166,8 +175,6 @@ public void Chain_AddTypeThatIsNotAMiddleware_ThrowsException()
});
}



/// <summary>
/// Try to generate a deadlock in synchronous middleware.
/// </summary>
Expand All @@ -184,5 +191,51 @@ public void Execute_SynchronousChainOfResponsibility_SuccessfullyExecute()

Assert.Equal("Test with spaces and new lines", result);
}

[Fact]
public async Task Execute_ChainOfMiddlewareWithCancellableMiddleware_CancellableMiddlewareIsExecuted()
{
var responsibilityChain = new AsyncResponsibilityChain<Exception, bool>(new ActivatorMiddlewareResolver())
.Chain<UnavailableResourcesExceptionHandler>()
.Chain(typeof(InvalidateDataExceptionHandler))
.Chain<MyExceptionHandler>()
.ChainCancellable<ThrowIfCancellationRequestedMiddleware>();

// Creates an ArgumentNullException. The 'ThrowIfCancellationRequestedMiddleware'
// middleware should be the last one to execute.
var exception = new ArgumentNullException();

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// The 'ThrowIfCancellationRequestedMiddleware' should throw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => responsibilityChain.Execute(exception, cancellationToken));
}

[Fact]
public async Task Execute_ChainOfMiddlewareWithCancellableFinally_CancellableFinallyIsExecuted()
{
var responsibilityChain = new AsyncResponsibilityChain<Exception, bool>(new ActivatorMiddlewareResolver())
.Chain<UnavailableResourcesExceptionHandler>()
.Chain(typeof(InvalidateDataExceptionHandler))
.Chain<MyExceptionHandler>()
.CancellableFinally(async (ex, ct) =>
{
ct.ThrowIfCancellationRequested();

await Task.CompletedTask;
return true;
});

// Creates an ArgumentNullException. The 'finally'
// function should be the last one to execute.
var exception = new ArgumentNullException();

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// The 'finally' function should throw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => responsibilityChain.Execute(exception, cancellationToken));
}
}
}
37 changes: 37 additions & 0 deletions src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public async Task Run(PersonModel context, Func<PersonModel, Task> executeNext)
await executeNext(context);
}
}

public class ThrowIfCancellationRequestedMiddleware : ICancellableAsyncMiddleware<PersonModel>
{
public async Task Run(PersonModel context, Func<PersonModel, Task> executeNext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await executeNext(context);
}
}
#endregion

[Fact]
Expand Down Expand Up @@ -162,5 +171,33 @@ public void Add_AddTypeThatIsNotAMiddleware_ThrowsException()
pipeline.Add(typeof(AsyncPipelineTests));
});
}

[Fact]
public async Task Execute_RunPipelineWithCancellableMiddleware_CancellableMiddlewareIsExecuted()
{
var pipeline = new AsyncPipeline<PersonModel>(new ActivatorMiddlewareResolver())
.Add<PersonWithEvenId>()
.Add<PersonWithOddId>()
.Add<PersonWithEmailName>()
.Add<PersonWithGenderProperty>()
.AddCancellable<ThrowIfCancellationRequestedMiddleware>();

// Create a new instance with a 'Gender' property. The 'ThrowIfCancellationRequestedMiddleware'
// middleware should be the last one to execute.
var personModel = new PersonModel
{
Name = "[email protected]",
Gender = Gender.Other
};

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// Check if 'ThrowIfCancellationRequestedMiddleware' threw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => pipeline.Execute(personModel, cancellationToken));

// Check if the level of 'personModel' is 4, which is configured by 'PersonWithGenderProperty' middleware.
Assert.Equal(4, personModel.Level);
}
}
}
64 changes: 64 additions & 0 deletions src/PipelineNet/AsyncBaseMiddlewareFlow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using PipelineNet.MiddlewareResolver;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace PipelineNet
{
/// <summary>
/// Defines the base class for asynchronous middleware flows.
/// </summary>
/// <typeparam name="TMiddleware">The middleware type.</typeparam>
/// <typeparam name="TCancellableMiddleware">The cancellable middleware type.</typeparam>
public abstract class AsyncBaseMiddlewareFlow<TMiddleware, TCancellableMiddleware>
{
/// <summary>
/// The list of middleware types.
/// </summary>
protected IList<Type> MiddlewareTypes { get; private set; }

/// <summary>
/// The resolver used to create the middleware types.
/// </summary>
protected IMiddlewareResolver MiddlewareResolver { get; private set; }

internal AsyncBaseMiddlewareFlow(IMiddlewareResolver middlewareResolver)
{
MiddlewareResolver = middlewareResolver ?? throw new ArgumentNullException("middlewareResolver",
"An instance of IMiddlewareResolver must be provided. You can use ActivatorMiddlewareResolver.");
MiddlewareTypes = new List<Type>();
}

/// <summary>
/// Stores the <see cref="TypeInfo"/> of the middleware type.
/// </summary>
private static readonly TypeInfo MiddlewareTypeInfo = typeof(TMiddleware).GetTypeInfo();

/// <summary>
/// Stores the <see cref="TypeInfo"/> of the cancellable middleware type.
/// </summary>
private static readonly TypeInfo CancellableMiddlewareTypeInfo = typeof(TCancellableMiddleware).GetTypeInfo();


/// <summary>
/// Adds a new middleware type to the internal list of types.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <typeparamref name="TMiddleware"/> or <see cref="TCancellableMiddleware"/>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
protected void AddMiddleware(Type middlewareType)
{
if (middlewareType == null) throw new ArgumentNullException("middlewareType");

bool isAssignableFromMiddleware = MiddlewareTypeInfo.IsAssignableFrom(middlewareType.GetTypeInfo())
|| CancellableMiddlewareTypeInfo.IsAssignableFrom(middlewareType.GetTypeInfo());
if (!isAssignableFromMiddleware)
throw new ArgumentException(
$"The middleware type must implement \"{typeof(TMiddleware)}\" or \"{typeof(TCancellableMiddleware)}\".");

this.MiddlewareTypes.Add(middlewareType);
}
}
}
Loading

0 comments on commit 506e508

Please sign in to comment.