Skip to content

Commit

Permalink
Merge pull request #172 from inhabitedtype/refactor-request-queue
Browse files Browse the repository at this point in the history
refactor request queue mechanics
  • Loading branch information
seliopou authored May 22, 2021
2 parents ac88195 + 77f216b commit cc7478a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 53 deletions.
9 changes: 0 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,6 @@ let output_state t : Output_state.t =
| Waiting -> Waiting
;;

let is_complete t =
match input_state t with
| Ready -> false
| Complete ->
(match output_state t with
| Waiting | Ready -> false
| Complete -> true)
;;

let flush_request_body t =
let request_body = request_body t in
if Body.has_pending_output request_body
Expand Down
104 changes: 60 additions & 44 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ let current_reqd_exn t =

let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
then failwith "yield_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k
Expand Down Expand Up @@ -182,49 +182,36 @@ let set_error_and_handle ?request t error =
let report_exn t exn =
set_error_and_handle t (`Exn exn)

let advance_request_queue_if_necessary t =
if is_active t then begin
let reqd = current_reqd_exn t in
if Reqd.persistent_connection reqd then begin
if Reqd.is_complete reqd then begin
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (current_reqd_exn t);
wakeup_reader t;
end
end else begin
(* Take the head of the queue, close the remaining request bodies, clear
* the queue, and push the head back on. We do not plan on processing any
* more requests after the current one. *)
ignore (Queue.take t.request_queue);
Queue.iter Reqd.close_request_body t.request_queue;
Queue.clear t.request_queue;
Queue.push reqd t.request_queue;
if Reqd.is_complete reqd
then shutdown t
else
match Reqd.input_state reqd with
| Ready -> ()
| Complete -> shutdown_reader t
end
end else if Reader.is_closed t.reader
then shutdown t

let _next_read_operation t =
advance_request_queue_if_necessary t;
if is_active t
let advance_request_queue t =
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (Queue.peek_exn t.request_queue);
;;

let rec _next_read_operation t =
if not (is_active t)
then (
if Reader.is_closed t.reader
then shutdown t;
Reader.next t.reader
) else (
let reqd = current_reqd_exn t in
match Reqd.input_state reqd with
| Ready -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
)

and _final_read_operation_for t reqd =
if not (Reqd.persistent_connection reqd) then (
shutdown_reader t;
Reader.next t.reader;
) else (
match Reqd.output_state reqd with
| Waiting | Ready -> `Yield
| Complete ->
if Reqd.persistent_connection reqd
then `Yield
else (
shutdown_reader t;
Reader.next t.reader)
advance_request_queue t;
_next_read_operation t;
)
else Reader.next t.reader
;;

let next_read_operation t =
Expand Down Expand Up @@ -259,13 +246,42 @@ let read t bs ~off ~len =
let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let next_write_operation t =
advance_request_queue_if_necessary t;
if is_active t
then (
let rec _next_write_operation t =
if not (is_active t)
then Writer.next t.writer
else (
let reqd = current_reqd_exn t in
Reqd.flush_response_body reqd);
Writer.next t.writer
match Reqd.output_state reqd with
| Waiting ->
(* XXX(dpatti): I don't think we should need to call this, but it is
necessary in the case of a streaming, non-chunked body so that you can
set the appropriate flag. *)
Reqd.flush_response_body reqd;
Writer.next t.writer
| Ready ->
Reqd.flush_response_body reqd;
Writer.next t.writer
| Complete -> _final_write_operation_for t reqd
)

and _final_write_operation_for t reqd =
let next =
if not (Reqd.persistent_connection reqd) then (
shutdown_writer t;
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Ready -> Writer.next t.writer;
| Complete ->
advance_request_queue t;
_next_write_operation t;
)
in
wakeup_reader t;
next
;;

let next_write_operation t = _next_write_operation t

let report_write_result t result =
Writer.report_result t.writer result

0 comments on commit cc7478a

Please sign in to comment.