Skip to content

Commit

Permalink
Stream request middleware is ready for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
aritchie committed Jun 19, 2024
1 parent 71fbe55 commit 46793be
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 15 deletions.
7 changes: 6 additions & 1 deletion Sample/Contracts/MyMessageContracts.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
using Shiny.Mediator.Middleware;

namespace Sample.Contracts;

public record MyMessageRequest(string Arg, bool FireAndForgetEvents) : IRequest<MyMessageResponse>;
public record MyMessageRequest(string Arg, bool FireAndForgetEvents) : IRequest<MyMessageResponse>, ICacheItem
{
public string CacheKey { get; }
};

public record MyMessageResponse(string Response);

Expand Down
14 changes: 14 additions & 0 deletions Sample/Handlers/MyStreamRequestMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Sample.Handlers;

public class MyStreamRequestMiddleware<TRequest, TResult> : IStreamRequestMiddleware<TRequest, TResult> where TRequest : IStreamRequest<TResult>
{
public IAsyncEnumerator<TResult> Process(
TRequest request,
StreamRequestDelegate<TResult> next,
IStreamRequestHandler<TRequest, TResult> requestHandler,
CancellationToken cancellationToken
)
{
return next();
}
}
2 changes: 2 additions & 0 deletions Sample/Handlers/SingletonRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ namespace Sample.Handlers;


[RegisterHandler]
[TimedLogging(3000)]
public class SingletonRequestHandler(IMediator mediator, AppSqliteConnection data) : IRequestHandler<MyMessageRequest, MyMessageResponse>
{
[Cache(Storage = StoreType.File, MaxAgeSeconds = 30, OnlyForOffline = true)]
public async Task<MyMessageResponse> Handle(MyMessageRequest request, CancellationToken cancellationToken)
{
var e = new MyMessageEvent(
Expand Down
2 changes: 2 additions & 0 deletions Sample/TriggerViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ AppSqliteConnection data
this.FireAndForgetEvents
);
var result = await mediator.Request(request, this.cancelSource.Token);


await data.Log(
"TriggerViewModel-Response",
new MyMessageEvent(
Expand Down
4 changes: 2 additions & 2 deletions src/Shiny.Mediator.Maui/Attributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class MainThreadAttribute : Attribute {}


[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class TimedLoggingAttribute : Attribute
public class TimedLoggingAttribute(double errorThresholdMillis) : Attribute
{
public double ErrorThresholdMillis { get; set; } = 0;
public double ErrorThresholdMillis => errorThresholdMillis;
}


Expand Down
4 changes: 2 additions & 2 deletions src/Shiny.Mediator.Maui/Middleware/CacheRequestMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public virtual Task<TResult> Process(
TRequest request,
RequestHandlerDelegate<TResult> next
)
=> cacheManager.CacheOrGet<TResult>(
=> cacheManager.CacheOrGet(
cfg,
request,
request!,
async () => await next().ConfigureAwait(false)
);
}
Expand Down
10 changes: 8 additions & 2 deletions src/Shiny.Mediator/IRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ namespace Shiny.Mediator;

public interface IRequestHandler { }

public interface IRequestHandler<in TRequest> : IRequestHandler where TRequest : IRequest
public interface IRequestHandler<TRequest> : IRequestHandler where TRequest : IRequest
{
Task Handle(TRequest request, CancellationToken cancellationToken);
}

public interface IRequestHandler<in TRequest, TResult> : IRequestHandler where TRequest : IRequest<TResult>
public interface IRequestHandler<TRequest, TResult> : IRequestHandler where TRequest : IRequest<TResult>
{
Task<TResult> Handle(TRequest request, CancellationToken cancellationToken);
}


public interface IStreamRequestHandler<TRequest, TResult> : IRequestHandler where TRequest : IStreamRequest<TResult>
{
IAsyncEnumerable<TResult> Handle(TRequest request, CancellationToken cancellationToken);
}
7 changes: 0 additions & 7 deletions src/Shiny.Mediator/IStreamRequestHandler.cs

This file was deleted.

3 changes: 3 additions & 0 deletions src/Shiny.Mediator/IStreamRequestMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
namespace Shiny.Mediator;


public delegate IAsyncEnumerator<TResult> StreamRequestDelegate<TResult>();
public interface IStreamRequestMiddleware<TRequest, TResult> where TRequest : IStreamRequest<TResult>
{
// TODO: I want to be able to pump the async enumerable from the middleware as well
IAsyncEnumerator<TResult> Process(
TRequest request,
StreamRequestDelegate<TResult> next,
IStreamRequestHandler<TRequest, TResult> requestHandler,
CancellationToken cancellationToken
);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Shiny.Mediator/MediatorExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.DependencyInjection.Extensions;
using Shiny.Mediator.Impl;
using Shiny.Mediator.Infrastructure;
using Shiny.Mediator.Middleware;

namespace Shiny.Mediator;

Expand Down Expand Up @@ -83,4 +84,11 @@ public static IServiceCollection AddScopedAsImplementedInterfaces<TImplementatio

return services;
}


public static ShinyConfigurator AddReplayStreamMiddleware(this ShinyConfigurator configurator)
{
configurator.AddOpenStreamMiddleware(typeof(ReplayStreamMiddleware<,>));
return configurator;
}
}
55 changes: 55 additions & 0 deletions src/Shiny.Mediator/Middleware/ReplayStreamMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Text.Json;

namespace Shiny.Mediator.Middleware;

/// <summary>
/// Replays the last result before requesting a new one
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResult"></typeparam>
public class ReplayStreamMiddleware<TRequest, TResult> : IStreamRequestMiddleware<TRequest, TResult>
where TRequest : IStreamRequest<TResult>
{
public IAsyncEnumerator<TResult> Process(
TRequest request,
StreamRequestDelegate<TResult> next,
IStreamRequestHandler<TRequest, TResult> requestHandler,
CancellationToken cancellationToken
)
{
var attribute = requestHandler.GetHandlerHandleMethodAttribute<TRequest, ReplayAttribute>();
if (attribute == null)
return next();

return this.Iterate(request, next, cancellationToken);
}


protected virtual async IAsyncEnumerator<TResult> Iterate(TRequest request, StreamRequestDelegate<TResult> next, CancellationToken ct)
{
var path = this.GetCacheFilePath(request);
if (File.Exists(path))
{
var json = File.ReadAllText(path);
var obj = JsonSerializer.Deserialize<TResult>(json);
yield return obj;
}

var nxt = next();
while (await nxt.MoveNextAsync() && !ct.IsCancellationRequested)
{
var json = JsonSerializer.Serialize(nxt.Current);
File.WriteAllText(path, json);
yield return nxt.Current;
}
}

protected virtual string GetCacheFilePath(TRequest request)
{
if (request is IReplayKey<TResult> key)
return key.Key;

var t = request.GetType();
return $"{t.Namespace}_{t.Name}.replay";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Shiny.Mediator.Middleware;

public class TimerRefreshStreamRequestMiddleware<TRequest, TResult> : IStreamRequestMiddleware<TRequest, TResult>
where TRequest : IStreamRequest<TResult>
{
public IAsyncEnumerator<TResult> Process(
TRequest request,
StreamRequestDelegate<TResult> next,
IStreamRequestHandler<TRequest, TResult> requestHandler,
CancellationToken cancellationToken
)
{
var attribute = requestHandler.GetHandlerHandleMethodAttribute<TRequest, TimerRefreshAttribute>();
if (attribute == null)
return next();

return Iterate(attribute, next, cancellationToken);
}


async IAsyncEnumerator<TResult> Iterate(TimerRefreshAttribute attribute, StreamRequestDelegate<TResult> next, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(attribute.RefreshSeconds, ct);

var nxt = next();
while (await nxt.MoveNextAsync() && !ct.IsCancellationRequested)
yield return nxt.Current;
}
}
}
17 changes: 17 additions & 0 deletions src/Shiny.Mediator/MiddlewareModels.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Shiny.Mediator;

[AttributeUsage(AttributeTargets.Method, Inherited = false, AllowMultiple = false)]
public class TimerRefreshAttribute(int refreshSeconds, bool ignoreErrors = true) : Attribute
{
public int RefreshSeconds => refreshSeconds;
public bool IgnoreErrors => ignoreErrors;
}


[AttributeUsage(AttributeTargets.Method, Inherited = false, AllowMultiple = false)]
public class ReplayAttribute : Attribute {}

public interface IReplayKey<TResult> : IStreamRequest<TResult>
{
string Key { get; }
}
9 changes: 8 additions & 1 deletion src/Shiny.Mediator/ShinyConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ public ShinyConfigurator AddOpenRequestMiddleware(Type implementationType, Servi
services.Add(new ServiceDescriptor(typeof(IRequestMiddleware<,>), null, implementationType, lifetime));
return this;
}



public ShinyConfigurator AddOpenStreamMiddleware(Type implementationType, ServiceLifetime lifetime = ServiceLifetime.Scoped)
{
// TODO: validate open generic
services.Add(new ServiceDescriptor(typeof(IStreamRequestMiddleware<,>), null, implementationType, lifetime));
return this;
}

public ShinyConfigurator AddOpenEventMiddleware(Type implementationType, ServiceLifetime lifetime = ServiceLifetime.Scoped)
{
Expand Down

0 comments on commit 46793be

Please sign in to comment.