Skip to content

Commit

Permalink
Streaming requests
Browse files Browse the repository at this point in the history
  • Loading branch information
aritchie committed Jun 17, 2024
1 parent 575e2f4 commit 61be54f
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 34 deletions.
3 changes: 3 additions & 0 deletions Sample/Contracts/TickerRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Sample.Contracts;

public record TickerRequest(int Repeat, int Multiplier, int GapSeconds) : IStreamRequest<string>;
18 changes: 18 additions & 0 deletions Sample/Handlers/TickerStreamRequestHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Sample.Contracts;

namespace Sample.Handlers;


[RegisterHandler]
public class TickerStreamRequestHandler : IStreamRequestHandler<TickerRequest, string>
{
public async IAsyncEnumerable<string> Handle(TickerRequest request, CancellationToken cancellationToken)
{
for (var i = 0; i < request.Repeat; i++)
{
await Task.Delay(TimeSpan.FromSeconds(request.GapSeconds));
var value = i * request.Multiplier;
yield return ($"{i} : {value}");
}
}
}
2 changes: 1 addition & 1 deletion Sample/MauiProgram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static MauiApp CreateMauiApp()
})
);
builder.Services.AddDiscoveredMediatorHandlersFromSample();
builder.Services.AddSingletonAsImplementedInterfaces<ErrorRequestHandler>();
// builder.Services.AddSingletonAsImplementedInterfaces<ErrorRequestHandler>();

builder.Services.AddSingleton<AppSqliteConnection>();
builder.Services.AddMauiBlazorWebView();
Expand Down
20 changes: 20 additions & 0 deletions Sample/TriggerPage.xaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@
<TextCell Text="Error Trap"
Command="{Binding ErrorTrap}" />
</TableSection>

<TableSection Title="Streaming">
<EntryCell Label="Repeat"
Text="{Binding StreamRepeat}"
Keyboard="Numeric" />

<EntryCell Label="Multiplier"
Text="{Binding StreamMultiplier}"
Keyboard="Numeric" />

<EntryCell Label="Gap Seconds"
Text="{Binding StreamGapSeconds}"
Keyboard="Numeric" />

<TextCell Text="Run Stream"
Command="{Binding Stream}"/>

<TextCell Text="Last Response"
Detail="{Binding StreamLastResponse}" />
</TableSection>
</TableRoot>
</TableView>
</ContentPage>
16 changes: 15 additions & 1 deletion Sample/TriggerViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ await data.Log(
);

this.ErrorTrap = ReactiveCommand.CreateFromTask(() => mediator.Send(new ErrorRequest()));


this.Stream = ReactiveCommand.CreateFromTask(async () =>
{
var stream = mediator.Request(new TickerRequest(this.StreamRepeat, this.StreamMultiplier, this.StreamGapSeconds));
await foreach (var item in stream)
{
this.StreamLastResponse = item;
}
});
this.sub = mediator.Subscribe((MyMessageEvent @event, CancellationToken _) =>
data.Log("TriggerViewModel-Subscribe", @event)
);
Expand All @@ -77,6 +85,12 @@ public Task Handle(MyMessageEvent @event, CancellationToken cancellationToken)
[Reactive] public string Arg { get; set; }
[Reactive] public bool FireAndForgetEvents { get; set; }

public ICommand Stream { get; }
[Reactive] public int StreamGapSeconds { get; set; } = 1;
[Reactive] public int StreamRepeat { get; set; } = 5;
[Reactive] public int StreamMultiplier { get; set; } = 2;
[Reactive] public string? StreamLastResponse { get; private set; }

public override void Destroy()
{
base.Destroy();
Expand Down
4 changes: 1 addition & 3 deletions src/Shiny.Mediator.Contracts/IRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,4 @@

public interface IRequest;
public interface IRequest<out TResult>;
// public interface IStreamRequest<out TResult> : IRequest<IAsyncEnumerable<TResult>>
// {
// }
public interface IStreamRequest<out TResult>;

This file was deleted.

14 changes: 7 additions & 7 deletions src/Shiny.Mediator/IStreamRequestHandler.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// namespace Shiny.Mediator;
//
//
// public interface IStreamRequestHandler<in TRequest, TResult> where TRequest : IStreamRequest<TResult>
// {
// IAsyncEnumerable<TResult> Handle(TRequest request, CancellationToken cancellationToken);
// }
namespace Shiny.Mediator;


public interface IStreamRequestHandler<in TRequest, TResult> where TRequest : IStreamRequest<TResult>
{
IAsyncEnumerable<TResult> Handle(TRequest request, CancellationToken cancellationToken);
}
22 changes: 11 additions & 11 deletions src/Shiny.Mediator/IStreamRequestMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// namespace Shiny.Mediator;
//
//
// public interface IStreamRequestMiddleware<TRequest, TResult> where TRequest : IStreamRequest<TResult>
// {
// // TODO: I want to be able to pump the async enumerable from the middleware as well
// IAsyncEnumerable<TResult> Process(
// TRequest request,
// CancellationToken cancellationToken
// );
// }
namespace Shiny.Mediator;


public interface IStreamRequestMiddleware<TRequest, TResult> where TRequest : IStreamRequest<TResult>
{
// TODO: I want to be able to pump the async enumerable from the middleware as well
IAsyncEnumerator<TResult> Process(
TRequest request,
CancellationToken cancellationToken
);
}

// public delegate Task<TResult> RequestHandlerDelegate<TResult>();
// public interface IRequestMiddleware<TRequest, TResult> where TRequest : IRequest<TResult>
Expand Down
19 changes: 17 additions & 2 deletions src/Shiny.Mediator/Impl/DefaultRequestSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,23 @@ public async Task Send(IRequest request, CancellationToken cancellationToken)
var task = (Task)wrapperMethod.Invoke(wrapper, [scope.ServiceProvider, request, cancellationToken])!;
await task.ConfigureAwait(false);
}



public IAsyncEnumerable<TResult> Request<TResult>(IStreamRequest<TResult> request, CancellationToken cancellationToken = default)
{
using var scope = services.CreateScope();

// TODO: middleware
var wrapperType = typeof(IStreamRequestHandler<,>).MakeGenericType([request.GetType(), typeof(TResult)]);
var wrapper = scope.ServiceProvider.GetService(wrapperType);
if (wrapper == null)
throw new InvalidOperationException($"No Stream Request Handler for '{request.GetType().FullName}'");

var wrapperMethod = wrapperType.GetMethod("Handle", BindingFlags.Public | BindingFlags.Instance)!;
var enumerable = (IAsyncEnumerable<TResult>)wrapperMethod.Invoke(wrapper, [request, cancellationToken])!;
return enumerable;
}


// TODO: I want to prevent IRequest (void) from being callable here
public async Task<TResult> Request<TResult>(IRequest<TResult> request, CancellationToken cancellationToken = default)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Shiny.Mediator/Impl/Mediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public Task<TResult> Request<TResult>(IRequest<TResult> request, CancellationTok
public Task Send(IRequest request, CancellationToken cancellationToken = default)
=> requestSender.Send(request, cancellationToken);

public IAsyncEnumerable<TResult> Request<TResult>(IStreamRequest<TResult> request, CancellationToken cancellationToken = default)
=> requestSender.Request(request, cancellationToken);

public Task Publish<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent
=> eventPublisher.Publish(@event, cancellationToken);

Expand Down
13 changes: 13 additions & 0 deletions src/Shiny.Mediator/Infrastructure/IRequestSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,17 @@ Task Send(
IRequest request,
CancellationToken cancellationToken = default
);


/// <summary>
///
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <typeparam name="TResult"></typeparam>
/// <returns></returns>
IAsyncEnumerable<TResult> Request<TResult>(
IStreamRequest<TResult> request,
CancellationToken cancellationToken = default
);
}

0 comments on commit 61be54f

Please sign in to comment.