Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional capacity to queue #74

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/kcas_data/elems.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ let rev_prepend_to_seq t tl =
| Right t' -> t'
in
prepend_to_seq t tl ()

let rec of_list_rev tl length = function
| [] -> tl
| x :: xs ->
let length = length + 1 in
of_list_rev { value = x; tl; length } length xs

let[@inline] of_list_rev = function
| [] -> empty
| x :: xs -> of_list_rev (singleton x) 1 xs
1 change: 1 addition & 0 deletions src/kcas_data/elems.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ val prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val to_seq : 'a t -> 'a Seq.t
val of_seq_rev : 'a Seq.t -> 'a t
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val of_list_rev : 'a list -> 'a t
114 changes: 114 additions & 0 deletions src/kcas_data/list_with_capacity.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
open Kcas

type 'a t = { capacity : int; length : int; list : 'a list; limit : int }

let empty_unlimited =
{ capacity = Int.max_int; length = 0; list = []; limit = Int.max_int }

let[@inline] make_empty ~capacity =
if capacity = Int.max_int then empty_unlimited
else { capacity; length = 0; list = []; limit = capacity }

let[@inline] make ~capacity ~length ~list ~limit =
{ capacity; length; list; limit }

let[@inline] to_rev_elems t = Elems.of_list_rev t.list
let[@inline] is_empty t = t.length = 0
let[@inline] length t = t.length
let[@inline] capacity t = t.capacity
let[@inline] limit t = t.limit
let[@inline] list t = t.list

let[@inline] tl_safe = function
| { list = []; _ } as t -> t
| { capacity; length; list = _ :: list; _ } as t ->
let limit = if capacity = Int.max_int then capacity else t.limit in
{ capacity; length = length - 1; list; limit }

let[@inline] tl_or_retry = function
| { list = []; _ } -> Retry.later ()
| { capacity; length; list = _ :: list; _ } as t ->
let limit = if capacity = Int.max_int then capacity else t.limit in
{ capacity; length = length - 1; list; limit }

let[@inline] hd_opt t = match t.list with [] -> None | x :: _ -> Some x

let[@inline] hd_or_retry t =
match t.list with [] -> Retry.later () | x :: _ -> x

let[@inline] hd_unsafe t = List.hd t.list

let[@inline] cons_safe x ({ capacity; _ } as t) =
if capacity = Int.max_int then
let { length; list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit = capacity }
else
let { length; limit; _ } = t in
if length < limit then
let { list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit }
else t

let[@inline] cons_or_retry x ({ capacity; _ } as t) =
if capacity = Int.max_int then
let { length; list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit = capacity }
else
let { length; limit; _ } = t in
if length < limit then
let { list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit }
else Retry.later ()

let[@inline] move ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else
let { length; _ } = t in
if length = 0 then t
else
let { limit; _ } = t in
{ capacity; length = 0; list = []; limit = limit - length }

let move_last ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else
let { length; _ } = t in
let limit = capacity - length in
if length = 0 && t.limit = limit then t
else { capacity; length = 0; list = []; limit }

let[@inline] clear ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else if t.length = 0 && t.limit = capacity then t
else make_empty ~capacity

let rec prepend_to_seq xs tl =
match xs with
| [] -> tl
| x :: xs -> fun () -> Seq.Cons (x, prepend_to_seq xs tl)

let to_seq { list; _ } = prepend_to_seq list Seq.empty

let rev_prepend_to_seq { length; list; _ } tl =
if length <= 1 then prepend_to_seq list tl
else
let t = ref (`Original list) in
fun () ->
let t =
match !t with
| `Original t' ->
(* This is domain safe as the result is always equivalent. *)
let t' = List.rev t' in
t := `Reversed t';
t'
| `Reversed t' -> t'
in
prepend_to_seq t tl ()

let of_list ?(capacity = Int.max_int) list =
let length = List.length list in
let limit = Int.min 0 (capacity - length) in
{ capacity; length; list; limit }

let of_seq_rev ?capacity xs =
of_list ?capacity (Seq.fold_left (fun xs x -> x :: xs) [] xs)
24 changes: 24 additions & 0 deletions src/kcas_data/list_with_capacity.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
type !'a t

val empty_unlimited : 'a t
val make_empty : capacity:int -> 'a t
val make : capacity:int -> length:int -> list:'a list -> limit:int -> 'a t
val is_empty : 'a t -> bool
val length : 'a t -> int
val capacity : 'a t -> int
val limit : 'a t -> int
val list : 'a t -> 'a list
val cons_safe : 'a -> 'a t -> 'a t
val cons_or_retry : 'a -> 'a t -> 'a t
val move : 'a t -> 'a t
val move_last : 'a t -> 'a t
val clear : 'a t -> 'a t
val to_rev_elems : 'a t -> 'a Elems.t
val to_seq : 'a t -> 'a Seq.t
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val of_seq_rev : ?capacity:int -> 'a Seq.t -> 'a t
val tl_safe : 'a t -> 'a t
val tl_or_retry : 'a t -> 'a t
val hd_opt : 'a t -> 'a option
val hd_or_retry : 'a t -> 'a
val hd_unsafe : 'a t -> 'a
147 changes: 95 additions & 52 deletions src/kcas_data/queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,78 @@ open Kcas

type 'a t = {
front : 'a Elems.t Loc.t;
middle : 'a Elems.t Loc.t;
back : 'a Elems.t Loc.t;
back : 'a List_with_capacity.t Loc.t;
middle : 'a List_with_capacity.t Loc.t;
}

let alloc ~front ~middle ~back =
let alloc ~front ~back ~middle =
(* We allocate locations in specific order to make most efficient use of the
splay-tree based transaction log. *)
let front = Loc.make ~padded:true front
and middle = Loc.make ~padded:true middle
and back = Loc.make ~padded:true back in
and back = Loc.make ~padded:true back
and middle = Loc.make ~padded:true middle in
Multicore_magic.copy_as_padded { back; middle; front }

let create () = alloc ~front:Elems.empty ~middle:Elems.empty ~back:Elems.empty
let create ?(capacity = Int.max_int) () =
if capacity < 0 then invalid_arg "Queue.create: capacity must be non-negative";
let back = List_with_capacity.make_empty ~capacity in
alloc ~front:Elems.empty ~back ~middle:List_with_capacity.empty_unlimited

let copy q =
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.middle, Xt.get ~xt q.back) in
let front, middle, back = Xt.commit { tx } in
alloc ~front ~middle ~back
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.back, Xt.get ~xt q.middle) in
let front, back, middle = Xt.commit { tx } in
alloc ~front ~back ~middle

module Xt = struct
let is_empty ~xt t =
(* We access locations in order of allocation to make most efficient use of
the splay-tree based transaction log. *)
Xt.get ~xt t.front == Elems.empty
&& Xt.get ~xt t.middle == Elems.empty
&& Xt.get ~xt t.back == Elems.empty

let length ~xt { back; middle; front } =
Elems.length (Xt.get ~xt front)
+ Elems.length (Xt.get ~xt middle)
+ Elems.length (Xt.get ~xt back)

let add ~xt x q = Xt.modify ~xt q.back @@ Elems.cons x
&& List_with_capacity.is_empty (Xt.get ~xt t.back)
&& Xt.get ~xt t.middle == List_with_capacity.empty_unlimited

let length ~xt q =
Elems.length (Xt.get ~xt q.front)
+ List_with_capacity.length (Xt.get ~xt q.back)
+ List_with_capacity.length (Xt.get ~xt q.middle)

let try_add ~xt x q =
let lwc = Xt.update ~xt q.back (List_with_capacity.cons_safe x) in
let capacity = List_with_capacity.capacity lwc in
capacity = Int.max_int
||
let back_length = List_with_capacity.length lwc in
back_length < List_with_capacity.limit lwc
||
let other_length =
List_with_capacity.length (Xt.get ~xt q.middle)
+ Elems.length (Xt.get ~xt q.front)
in
let limit = capacity - other_length in
back_length < limit
&&
(Xt.set ~xt q.back
(List_with_capacity.make ~capacity ~length:(back_length + 1)
~list:(x :: List_with_capacity.list lwc)
~limit);
true)

let add ~xt x q = Retry.unless (try_add ~xt x q)
let push = add

(** Cooperative helper to move elems from back to middle. *)
let back_to_middle ~middle ~back =
let back_to_middle ~back ~middle =
let tx ~xt =
let xs = Xt.exchange ~xt back Elems.empty in
if xs == Elems.empty || Xt.exchange ~xt middle xs != Elems.empty then
raise_notrace Exit
let xs = Xt.update ~xt back List_with_capacity.move in
if
List_with_capacity.length xs = 0
|| Xt.exchange ~xt middle xs != List_with_capacity.empty_unlimited
then raise_notrace Exit
in
try Xt.commit { tx } with Exit -> ()

let take_opt_finish ~xt front elems =
let elems = Elems.rev elems in
let take_opt_finish ~xt front lwc =
let elems = List_with_capacity.to_rev_elems lwc in
Xt.set ~xt front (Elems.tl_safe elems);
Elems.hd_opt elems

Expand All @@ -58,17 +84,19 @@ module Xt = struct
else
let middle = t.middle and back = t.back in
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems
back_to_middle ~back ~middle;
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
if lwc != List_with_capacity.empty_unlimited then
take_opt_finish ~xt front lwc
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems else None
let lwc = Xt.update ~xt back List_with_capacity.move_last in
if List_with_capacity.length lwc <> 0 then take_opt_finish ~xt front lwc
else None

let take_blocking ~xt q = Xt.to_blocking ~xt (take_opt q)

let peek_opt_finish ~xt front elems =
let elems = Elems.rev elems in
let peek_opt_finish ~xt front lwc =
let elems = List_with_capacity.to_rev_elems lwc in
Xt.set ~xt front elems;
Elems.hd_opt elems

Expand All @@ -79,57 +107,72 @@ module Xt = struct
else
let middle = t.middle and back = t.back in
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems
back_to_middle ~back ~middle;
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
if lwc != List_with_capacity.empty_unlimited then
peek_opt_finish ~xt front lwc
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems else None
let lwc = Xt.update ~xt back List_with_capacity.move_last in
if List_with_capacity.length lwc <> 0 then peek_opt_finish ~xt front lwc
else None

let peek_blocking ~xt q = Xt.to_blocking ~xt (peek_opt q)

let clear ~xt t =
Xt.set ~xt t.front Elems.empty;
Xt.set ~xt t.middle Elems.empty;
Xt.set ~xt t.back Elems.empty
Xt.modify ~xt t.back List_with_capacity.clear;
Xt.set ~xt t.middle List_with_capacity.empty_unlimited

let swap ~xt q1 q2 =
let front = Xt.get ~xt q1.front
and middle = Xt.get ~xt q1.middle
and back = Xt.get ~xt q1.back in
and back = Xt.get ~xt q1.back
and middle = Xt.get ~xt q1.middle in
let front = Xt.exchange ~xt q2.front front
and middle = Xt.exchange ~xt q2.middle middle
and back = Xt.exchange ~xt q2.back back in
and back = Xt.exchange ~xt q2.back back
and middle = Xt.exchange ~xt q2.middle middle in
Xt.set ~xt q1.front front;
Xt.set ~xt q1.middle middle;
Xt.set ~xt q1.back back
Xt.set ~xt q1.back back;
Xt.set ~xt q1.middle middle

let seq_of ~front ~middle ~back =
(* Sequence construction is lazy, so this function is O(1). *)
Seq.empty
|> Elems.rev_prepend_to_seq back
|> Elems.rev_prepend_to_seq middle
|> List_with_capacity.rev_prepend_to_seq back
|> List_with_capacity.rev_prepend_to_seq middle
|> Elems.prepend_to_seq front

let to_seq ~xt t =
let front = Xt.get ~xt t.front
and middle = Xt.get ~xt t.middle
and back = Xt.get ~xt t.back in
and back = Xt.get ~xt t.back
and middle = Xt.get ~xt t.middle in
seq_of ~front ~middle ~back

let take_all ~xt t =
let front = Xt.exchange ~xt t.front Elems.empty
and middle = Xt.exchange ~xt t.middle Elems.empty
and back = Xt.exchange ~xt t.back Elems.empty in
and back = Xt.update ~xt t.back List_with_capacity.clear
and middle = Xt.exchange ~xt t.middle List_with_capacity.empty_unlimited in
seq_of ~front ~middle ~back
end

let is_empty q = Kcas.Xt.commit { tx = Xt.is_empty q }
let length q = Kcas.Xt.commit { tx = Xt.length q }

let try_add x q =
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
let lwc = Loc.fenceless_update q.back (List_with_capacity.cons_safe x) in
let capacity = List_with_capacity.capacity lwc in
capacity = Int.max_int
||
let back_length = List_with_capacity.length lwc in
back_length < List_with_capacity.limit lwc
|| Kcas.Xt.commit { tx = Xt.try_add x q }

let add x q =
(* Fenceless is safe as we always update. *)
Loc.fenceless_modify q.back @@ Elems.cons x
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
let lwc = Loc.fenceless_update q.back (List_with_capacity.cons_safe x) in
if List_with_capacity.capacity lwc <> Int.max_int then
if List_with_capacity.length lwc = List_with_capacity.limit lwc then
Kcas.Xt.commit { tx = Xt.add x q }

let push = add

Expand Down
Loading