Skip to content

Commit

Permalink
Make pipe write non blocking. The reasoning being that, if the write is
Browse files Browse the repository at this point in the history
blocking, we're already waiting for a process to read the other end.
  • Loading branch information
toots committed Sep 29, 2024
1 parent 574fc32 commit e6c8533
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
0.9.5 (unreleased)
=====
* Make pipe write unblocking.

0.9.4 (2024-03-18)
=====
* Fix poll segfault.
Expand Down
46 changes: 30 additions & 16 deletions src/duppy.ml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ let clear_tasks s =

let create ?(on_error = Printexc.raise_with_backtrace) ?(compare = compare) () =
let out_pipe, in_pipe = Unix.pipe () in
Unix.set_nonblock in_pipe;
{
on_error;
out_pipe;
Expand All @@ -106,7 +107,13 @@ let create ?(on_error = Printexc.raise_with_backtrace) ?(compare = compare) () =
queue_stopped_c = Condition.create ();
}

let wake_up s = ignore (Unix.write s.in_pipe (Bytes.of_string "x") 0 1)
let wake_up s =
try ignore (Unix.write s.in_pipe (Bytes.of_string "x") 0 1)
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
()

module Task = struct
(** Events and tasks from the user's point-of-view. *)
Expand Down Expand Up @@ -217,15 +224,15 @@ let process s log =
(r, w, x)
with
| Unix.Unix_error (Unix.EINTR, _, _) ->
(* [EINTR] means that select was interrupted by
* a signal before any of the selected events
(* [EINTR] means that select was interrupted by
* a signal before any of the selected events
* occurred and before the timeout interval expired.
* We catch it and restart.. *)
log (Printf.sprintf "Select interrupted at %f." (time ()));
f ()
| e ->
(* Uncaught exception:
* 1) Discards all tasks currently in the loop (we do not know which
(* Uncaught exception:
* 1) Discards all tasks currently in the loop (we do not know which
* socket caused an error).
* 2) Re-Raise e *)
clear_tasks s;
Expand All @@ -239,7 +246,7 @@ let process s log =
(* For safety, we may absorb more than
* one write. This avoids bad situation
* when exceesive wake_up may fill up the
* pipe's write buffer, causing a wake_up
* pipe's write buffer, causing a wake_up
* to become blocking.. *)
ignore (Unix.read s.out_pipe tmp 0 1024)
in
Expand All @@ -266,7 +273,7 @@ let process s log =
* Returns true a task was found (and hence processed).
*
* s.ready_m *must* be locked before calling
* this function, and is freed *only*
* this function, and is freed *only*
* if some task was processed. *)
let exec s (priorities : 'a -> bool) =
(* This assertion does not work on
Expand Down Expand Up @@ -313,7 +320,7 @@ let queue ?log ?(priorities = fun _ -> true) s name =
Mutex.unlock s.stop_m;
if stop then raise Queue_stopped;
(* Lock the ready tasks until the queue has a task to proceed,
* *or* is really ready to restart on its condition, see the
* *or* is really ready to restart on its condition, see the
* Condition.wait call below for the atomic unlock and wait. *)
Mutex.lock s.ready_m;
log (Printf.sprintf "There are %d ready tasks." (List.length s.ready));
Expand Down Expand Up @@ -344,12 +351,12 @@ let queue ?log ?(priorities = fun _ -> true) s name =
(* We use s.ready_m mutex here.
* Hence, we avoid race conditions
* with any other queue being processing
* a task that would create a new task:
* without this mutex, the new task may not be
* a task that would create a new task:
* without this mutex, the new task may not be
* notified to this queue if it is going to sleep
* in concurrency..
* It also avoid race conditions when restarting
* queues since s.ready_m is locked until all
* It also avoid race conditions when restarting
* queues since s.ready_m is locked until all
* queues have been signaled. *)
Condition.wait c s.ready_m;
Mutex.unlock s.ready_m
Expand All @@ -376,7 +383,7 @@ let queue ?log ?(priorities = fun _ -> true) s name =
on_done ()
module Async = struct
(* m is used to make sure that
(* m is used to make sure that
* calls to [wake_up] and [stop]
* are thread-safe. *)
type t = { stop : bool ref; mutable fd : fd option; m : Mutex.t }
Expand All @@ -386,6 +393,7 @@ module Async = struct
let add ~priority (scheduler : 'a scheduler) f =
(* A pipe to wake up the task *)
let out_pipe, in_pipe = Unix.pipe () in
Unix.set_nonblock in_pipe;
let stop = ref false in
let tmp = Bytes.create 1024 in
let rec task l =
Expand Down Expand Up @@ -418,7 +426,13 @@ module Async = struct
try
begin
match t.fd with
| Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1)
| Some t -> (
try ignore (Unix.write t (Bytes.of_string " ") 0 1)
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
())
| None -> raise Stopped
end;
Mutex.unlock t.m
Expand Down Expand Up @@ -627,7 +641,7 @@ struct
[]
in
(* First one is without read,
* in case init contains the wanted match.
* in case init contains the wanted match.
* Unless the user sets timeout to 0., this
* should not interfer with user-defined timeout.. *)
let task =
Expand Down Expand Up @@ -895,7 +909,7 @@ module Monad = struct
let unlock m h' =
Mutex_o.lock ctl_m;
(* Here we allow inter-thread
(* Here we allow inter-thread
* and double unlock.. Double unlock
* is not necessarily a problem and
* inter-thread unlock well.. what is
Expand Down

0 comments on commit e6c8533

Please sign in to comment.