Skip to content

Commit

Permalink
Migrated to ValueOption, IOptions<> and simplified WebSocket handling…
Browse files Browse the repository at this point in the history
… logic
  • Loading branch information
xperiandri committed Mar 17, 2024
1 parent a6ee3e2 commit 38fed3c
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ open System.Runtime.CompilerServices
open System.Text.Json
open Microsoft.AspNetCore.Http
open Microsoft.Extensions.DependencyInjection
open Microsoft.Extensions.Options

open FSharp.Core
open FsToolkit.ErrorHandling
Expand All @@ -26,7 +27,7 @@ type HttpContext with
/// </returns>
[<Extension>]
member ctx.TryBindJsonAsync<'T>(expectedJson) = taskResult {
let serializerOptions = ctx.RequestServices.GetRequiredService<IGraphQLOptions>().SerializerOptions
let serializerOptions = ctx.RequestServices.GetRequiredService<IOptions<IGraphQLOptions>>().Value.SerializerOptions
let request = ctx.Request

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ open System.Threading.Tasks
open Microsoft.AspNetCore.Http
open Microsoft.Extensions.DependencyInjection
open Microsoft.Extensions.Logging
open Microsoft.Extensions.Options

open FsToolkit.ErrorHandling
open Giraffe
Expand All @@ -20,6 +21,8 @@ type HttpHandler = HttpFunc -> HttpContext -> HttpFuncResult

module HttpHandlers =

let [<Literal>] internal IdentedOptionsName = "Idented"

let rec private moduleType = getModuleType <@ moduleType @>

let ofTaskIResult ctx (taskRes: Task<IResult>) : HttpFuncResult = task {
Expand All @@ -38,12 +41,12 @@ module HttpHandlers =

let logger = sp.CreateLogger moduleType

let options = sp.GetRequiredService<GraphQLOptions<'Root>>()
let options = sp.GetRequiredService<IOptionsMonitor<GraphQLOptions<'Root>>>()

let toResponse { DocumentId = documentId; Content = content; Metadata = metadata } =

let serializeIdented value =
let jsonSerializerOptions = options.GetSerializerOptionsIdented()
let jsonSerializerOptions = options.Get(IdentedOptionsName).SerializerOptions
JsonSerializer.Serialize(value, jsonSerializerOptions)

match content with
Expand Down Expand Up @@ -243,7 +246,7 @@ module HttpHandlers =
variables
|> Option.iter (fun v -> logger.LogTrace($"GraphQL variables:{Environment.NewLine}{{variables}}", v))

let root = options.RootFactory ctx
let root = options.CurrentValue.RootFactory ctx

let! result =
Async.StartAsTask(
Expand All @@ -259,7 +262,7 @@ module HttpHandlers =
Task.FromResult None
else
taskResult {
let executor = options.SchemaExecutor
let executor = options.CurrentValue.SchemaExecutor
match! checkOperationType ctx with
| IntrospectionQuery optionalAstDocument -> return! executeIntrospectionQuery executor optionalAstDocument
| OperationQuery content -> return! executeOperation executor content
Expand Down
11 changes: 2 additions & 9 deletions src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLOptions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ open System.Text.Json
open System.Threading.Tasks
open Microsoft.AspNetCore.Http

type PingHandler = IServiceProvider -> JsonDocument option -> Task<JsonDocument option>
type PingHandler = IServiceProvider -> JsonDocument voption -> Task<JsonDocument voption>

type GraphQLTransportWSOptions = {
EndpointUrl : string
ConnectionInitTimeoutInMs : int
CustomPingHandler : PingHandler option
CustomPingHandler : PingHandler voption
}

type IGraphQLOptions =
abstract member SerializerOptions : JsonSerializerOptions
abstract member WebsocketOptions : GraphQLTransportWSOptions
abstract member GetSerializerOptionsIdented : unit -> JsonSerializerOptions

type GraphQLOptions<'Root> = {
SchemaExecutor : Executor<'Root>
Expand All @@ -26,12 +25,6 @@ type GraphQLOptions<'Root> = {
WebsocketOptions : GraphQLTransportWSOptions
} with

member options.GetSerializerOptionsIdented () =
let options = JsonSerializerOptions (options.SerializerOptions)
options.WriteIndented <- true
options

interface IGraphQLOptions with
member this.SerializerOptions = this.SerializerOptions
member this.WebsocketOptions = this.WebsocketOptions
member this.GetSerializerOptionsIdented () = this.GetSerializerOptionsIdented ()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ open System.Threading.Tasks
open Microsoft.AspNetCore.Http
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.Logging
open Microsoft.Extensions.Options
open FsToolkit.ErrorHandling

open FSharp.Data.GraphQL
Expand All @@ -22,18 +23,24 @@ type GraphQLWebSocketMiddleware<'Root>
applicationLifetime : IHostApplicationLifetime,
serviceProvider : IServiceProvider,
logger : ILogger<GraphQLWebSocketMiddleware<'Root>>,
options : GraphQLOptions<'Root>
options : IOptions<GraphQLOptions<'Root>>
) =

let options = options.Value
let serializerOptions = options.SerializerOptions
let pingHandler = options.WebsocketOptions.CustomPingHandler
let endpointUrl = PathString options.WebsocketOptions.EndpointUrl
let connectionInitTimeout = options.WebsocketOptions.ConnectionInitTimeoutInMs

let serializeServerMessage (jsonSerializerOptions : JsonSerializerOptions) (serverMessage : ServerMessage) = task {
let raw =
match serverMessage with
| ConnectionAck -> { Id = None; Type = "connection_ack"; Payload = None }
| ServerPing -> { Id = None; Type = "ping"; Payload = None }
| ServerPong p -> { Id = None; Type = "pong"; Payload = p |> Option.map CustomResponse }
| Next (id, payload) -> { Id = Some id; Type = "next"; Payload = Some <| ExecutionResult payload }
| Complete id -> { Id = Some id; Type = "complete"; Payload = None }
| Error (id, errMsgs) -> { Id = Some id; Type = "error"; Payload = Some <| ErrorMessages errMsgs }
| ConnectionAck -> { Id = ValueNone; Type = "connection_ack"; Payload = ValueNone }
| ServerPing -> { Id = ValueNone; Type = "ping"; Payload = ValueNone }
| ServerPong p -> { Id = ValueNone; Type = "pong"; Payload = p |> ValueOption.map CustomResponse }
| Next (id, payload) -> { Id = ValueSome id; Type = "next"; Payload = ValueSome <| ExecutionResult payload }
| Complete id -> { Id = ValueSome id; Type = "complete"; Payload = ValueNone }
| Error (id, errMsgs) -> { Id = ValueSome id; Type = "error"; Payload = ValueSome <| ErrorMessages errMsgs }
return JsonSerializer.Serialize (raw, jsonSerializerOptions)
}

Expand Down Expand Up @@ -83,10 +90,10 @@ type GraphQLWebSocketMiddleware<'Root>
|> Array.ofSeq
|> System.Text.Encoding.UTF8.GetString
if String.IsNullOrWhiteSpace message then
return None
return ValueNone
else
let! result = message |> deserializeClientMessage serializerOptions
return Some result
return ValueSome result
}

let sendMessageViaSocket (jsonSerializerOptions) (socket : WebSocket) (message : ServerMessage) : Task = task {
Expand Down Expand Up @@ -137,15 +144,7 @@ type GraphQLWebSocketMiddleware<'Root>
let tryToGracefullyCloseSocketWithDefaultBehavior =
tryToGracefullyCloseSocket (WebSocketCloseStatus.NormalClosure, "Normal Closure")

let handleMessages
(cancellationToken : CancellationToken)
(httpContext : HttpContext)
(serializerOptions : JsonSerializerOptions)
(executor : Executor<'Root>)
(root : HttpContext -> 'Root)
(pingHandler : PingHandler option)
(socket : WebSocket)
=
let handleMessages (cancellationToken : CancellationToken) (httpContext : HttpContext) (socket : WebSocket) : Task =
let subscriptions = new Dictionary<SubscriptionId, SubscriptionUnsubscriber * OnUnsubscribeAction> ()
// ---------->
// Helpers -->
Expand Down Expand Up @@ -204,8 +203,8 @@ type GraphQLWebSocketMiddleware<'Root>

let getStrAddendumOfOptionalPayload optionalPayload =
optionalPayload
|> Option.map (fun payloadStr -> $" with payload: %A{payloadStr}")
|> Option.defaultWith (fun () -> "")
|> ValueOption.map (fun payloadStr -> $" with payload: %A{payloadStr}")
|> ValueOption.defaultWith (fun () -> "")

let logMsgReceivedWithOptionalPayload optionalPayload (msgAsStr : string) =
logger.LogTrace ("{message}{messageaddendum}", msgAsStr, (optionalPayload |> getStrAddendumOfOptionalPayload))
Expand All @@ -226,13 +225,13 @@ type GraphQLWebSocketMiddleware<'Root>
let! receivedMessage = rcv ()
match receivedMessage with
| Result.Error failureMsgs ->
"InvalidMessage" |> logMsgReceivedWithOptionalPayload None
"InvalidMessage" |> logMsgReceivedWithOptionalPayload ValueNone
match failureMsgs with
| InvalidMessage (code, explanation) -> do! socket.CloseAsync (enum code, explanation, CancellationToken.None)
| Ok maybeMsg ->
match maybeMsg with
| None -> logger.LogTrace ("Websocket socket received empty message! (socket state = {socketstate})", socket.State)
| Some msg ->
| ValueNone -> logger.LogTrace ("Websocket socket received empty message! (socket state = {socketstate})", socket.State)
| ValueSome msg ->
match msg with
| ConnectionInit p ->
"ConnectionInit" |> logMsgReceivedWithOptionalPayload p
Expand All @@ -245,10 +244,10 @@ type GraphQLWebSocketMiddleware<'Root>
| ClientPing p ->
"ClientPing" |> logMsgReceivedWithOptionalPayload p
match pingHandler with
| Some func ->
| ValueSome func ->
let! customP = p |> func serviceProvider
do! ServerPong customP |> sendMsg
| None -> do! ServerPong p |> sendMsg
| ValueNone -> do! ServerPong p |> sendMsg
| ClientPong p -> "ClientPong" |> logMsgReceivedWithOptionalPayload p
| Subscribe (id, query) ->
"Subscribe" |> logMsgWithIdReceived id
Expand All @@ -262,7 +261,8 @@ type GraphQLWebSocketMiddleware<'Root>
else
let variables = query.Variables |> Skippable.toOption
let! planExecutionResult =
executor.AsyncExecute (query.Query, root (httpContext), ?variables = variables)
let root = options.RootFactory httpContext
options.SchemaExecutor.AsyncExecute (query.Query, root, ?variables = variables)
do! planExecutionResult |> applyPlanExecutionResult id socket
| ClientComplete id ->
"ClientComplete" |> logMsgWithIdReceived id
Expand All @@ -282,14 +282,10 @@ type GraphQLWebSocketMiddleware<'Root>
// <-- Main
// <--------

let waitForConnectionInitAndRespondToClient
(serializerOptions : JsonSerializerOptions)
(connectionInitTimeoutInMs : int)
(socket : WebSocket)
: TaskResult<unit, string> =
taskResult {
let waitForConnectionInitAndRespondToClient (socket : WebSocket) : TaskResult<unit, string> =
task {
let timerTokenSource = new CancellationTokenSource ()
timerTokenSource.CancelAfter (connectionInitTimeoutInMs)
timerTokenSource.CancelAfter connectionInitTimeout
let detonationRegistration =
timerTokenSource.Token.Register (fun _ ->
socket
Expand All @@ -302,14 +298,14 @@ type GraphQLWebSocketMiddleware<'Root>
logger.LogDebug ("Waiting for ConnectionInit...")
let! receivedMessage = receiveMessageViaSocket (CancellationToken.None) serializerOptions socket
match receivedMessage with
| Ok (Some (ConnectionInit _)) ->
| Ok (ValueSome (ConnectionInit _)) ->
logger.LogDebug ("Valid connection_init received! Responding with ACK!")
detonationRegistration.Unregister () |> ignore
do!
ConnectionAck
|> sendMessageViaSocket serializerOptions socket
return true
| Ok (Some (Subscribe _)) ->
| Ok (ValueSome (Subscribe _)) ->
do!
socket
|> tryToGracefullyCloseSocket (enum CustomWebSocketStatus.Unauthorized, "Unauthorized")
Expand All @@ -327,46 +323,30 @@ type GraphQLWebSocketMiddleware<'Root>
)
if (not timerTokenSource.Token.IsCancellationRequested) then
if connectionInitSucceeded then
return ()
return Ok ()
else
return!
Result.Error
<| "ConnectionInit failed (not because of timeout)"
return Result.Error ("ConnectionInit failed (not because of timeout)")
else
return! Result.Error <| "ConnectionInit timeout"
return Result.Error <| "ConnectionInit timeout"
}

member __.InvokeAsync (ctx : HttpContext) = task {
if not (ctx.Request.Path = PathString (options.WebsocketOptions.EndpointUrl)) then
if not (ctx.Request.Path = endpointUrl) then
do! next.Invoke (ctx)
else if ctx.WebSockets.IsWebSocketRequest then
use! socket = ctx.WebSockets.AcceptWebSocketAsync ("graphql-transport-ws")
let! connectionInitResult =
socket
|> waitForConnectionInitAndRespondToClient options.SerializerOptions options.WebsocketOptions.ConnectionInitTimeoutInMs
socket |> waitForConnectionInitAndRespondToClient
match connectionInitResult with
| Result.Error errMsg -> logger.LogWarning ("{warningmsg}", ($"%A{errMsg}"))
| Result.Error errMsg -> logger.LogWarning ("{warningmsg}", errMsg)
| Ok _ ->
let longRunningCancellationToken =
(CancellationTokenSource
.CreateLinkedTokenSource(ctx.RequestAborted, applicationLifetime.ApplicationStopping)
.Token)
longRunningCancellationToken.Register (fun _ ->
socket
|> tryToGracefullyCloseSocketWithDefaultBehavior
|> Async.AwaitTask
|> Async.RunSynchronously)
|> ignore
let safe_HandleMessages = handleMessages longRunningCancellationToken
longRunningCancellationToken.Register (fun _ -> (socket |> tryToGracefullyCloseSocketWithDefaultBehavior).Wait()) |> ignore
try
do!
socket
|> safe_HandleMessages
ctx
options.SerializerOptions
options.SchemaExecutor
options.RootFactory
options.WebsocketOptions.CustomPingHandler
do! socket |> handleMessages longRunningCancellationToken ctx
with ex ->
logger.LogError (ex, "Cannot handle Websocket message.")
else
Expand Down
12 changes: 6 additions & 6 deletions src/FSharp.Data.GraphQL.Server.AspNetCore/Messages.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ type SubscriptionUnsubscriber = IDisposable
type OnUnsubscribeAction = SubscriptionId -> unit
type SubscriptionsDict = IDictionary<SubscriptionId, SubscriptionUnsubscriber * OnUnsubscribeAction>

type RawMessage = { Id : string option; Type : string; Payload : JsonDocument option }
type RawMessage = { Id : string voption; Type : string; Payload : JsonDocument voption }

type ServerRawPayload =
| ExecutionResult of Output
| ErrorMessages of NameValueLookup list
| CustomResponse of JsonDocument

type RawServerMessage = { Id : string option; Type : string; Payload : ServerRawPayload option }
type RawServerMessage = { Id : string voption; Type : string; Payload : ServerRawPayload voption }

type ClientMessage =
| ConnectionInit of payload : JsonDocument option
| ClientPing of payload : JsonDocument option
| ClientPong of payload : JsonDocument option
| ConnectionInit of payload : JsonDocument voption
| ClientPing of payload : JsonDocument voption
| ClientPong of payload : JsonDocument voption
| Subscribe of id : string * query : GQLRequestContent
| ClientComplete of id : string

Expand All @@ -32,7 +32,7 @@ type ClientMessageProtocolFailure = InvalidMessage of code : int * explanation :
type ServerMessage =
| ConnectionAck
| ServerPing
| ServerPong of JsonDocument option
| ServerPong of JsonDocument voption
| Next of id : string * payload : Output
| Error of id : string * err : NameValueLookup list
| Complete of id : string
Expand Down
Loading

0 comments on commit 38fed3c

Please sign in to comment.