Skip to content

Commit

Permalink
[i/items] thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
xificurC committed Sep 3, 2024
1 parent d0eadef commit 7795e87
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 48 deletions.
13 changes: 5 additions & 8 deletions src/hyperfiddle/incseq/flow_protocol_enforcer.cljc
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
(ns hyperfiddle.incseq.flow-protocol-enforcer
(:require [hyperfiddle.electric.impl.array-fields :as a])
#?(:clj (:import [clojure.lang IDeref IFn]))
#?(:cljs (:require-macros [hyperfiddle.incseq.flow-protocol-enforcer :refer [cannot-throw]])))

Expand All @@ -12,22 +11,20 @@
(defmacro cannot-throw [nm f] `(try (~f) (catch ~(if (:js-globals &env) :default 'Throwable) e#
(violated ~nm ~(str f " cannot throw") e#))))

(def field-count (a/deffields -should-step -is-done))
(defn flow
([input-flow] (flow "" input-flow))
([nm input-flow]
(fn [step done]
(let [o (object-array field-count)
_ (a/set o -should-step ::init, -is-done false)
(let [!should-step? (atom ::init), !done? (atom false)
step (fn []
(when (a/get o -is-done) (violated nm "step after done"))
(if (a/getswap o -should-step not) (cannot-throw nm step) (violated nm "double step")))
done (fn [] (if (a/getset o -is-done true) (violated nm "done called twice") (cannot-throw nm done)))
(when @!done? (violated nm "step after done"))
(if (first (swap-vals! !should-step? not)) (cannot-throw nm step) (violated nm "double step")))
done (fn [] (if (first (reset-vals! !done? true)) (violated nm "done called twice") (cannot-throw nm done)))
cancel (try (input-flow step done)
(catch #?(:clj Throwable :cljs :default) e (violated "flow process creation threw" e)))]
(reify
IFn (#?(:clj invoke :cljs -invoke) [_] (cannot-throw nm cancel))
IDeref (#?(:clj deref :cljs -deref) [_]
(if-let [should-step (a/getswap o -should-step not)]
(if-let [should-step (first (swap-vals! !should-step? not))]
(violated nm (if (= ::init should-step) "transfer without initial step" "double transfer"))
@cancel)))))))
100 changes: 60 additions & 40 deletions src/hyperfiddle/incseq/items_eager_impl.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,49 @@
[hyperfiddle.incseq.diff-impl :as d]
[hyperfiddle.incseq.perm-impl :as p])
(:import #?(:clj [clojure.lang IDeref IFn])
#?(:clj [java.util.concurrent.atomic AtomicLong AtomicBoolean])
[missionary Cancelled]))
#?(:clj [java.util.concurrent.atomic AtomicLong])
#?(:clj [java.util.concurrent.locks ReentrantLock])
[missionary Cancelled])
#?(:cljs (:require-macros [hyperfiddle.incseq.items-eager-impl :refer [locked]])))

(def ps-field-count (a/deffields -stepped -cancelled -input-ps -done -diff -item*))
#?(:clj (set! *warn-on-reflection* true))

(def ps-field-count (a/deffields -stepped -cancelled -input-ps -diff -item*))

(defmacro locked [on & body]
(if (:js-globals &env)
`(do ~@body)
(let [l (with-meta (gensym "lock") {:tag `ReentrantLock})]
`(let [~l ~on] (.lock ~l) (let [v# (do ~@body)] (.unlock ~l) v#)))))

(declare cleanup-then-done)
(defn call [f] (f))
(deftype Ps [step done going stepped state-]
IFn (#?(:clj invoke :cljs -invoke) [_]
(some-> (a/get state- -input-ps) call)
(a/set-not= state- -done ::yes ::requested)
(let [cancelled? (a/getset state- -cancelled true)]
(when (not (or (a/getset state- -stepped true) cancelled? (= ::yes (a/get state- -done)))) (step))))
(deftype Ps [step done going indone state- #?(:clj lock)]
IFn (#?(:clj invoke :cljs -invoke) [this]
(let [step? (locked (.-lock this)
(swap! indone (fn [v] (if (= v ::yes) ::yes ::requested)))
(let [cancelled? (a/getset state- -cancelled true)]
(not (or (a/getset state- -stepped true) cancelled? (= ::yes @indone)))))]
(some-> (a/get state- -input-ps) call)
(when step? (step))))
IDeref (#?(:clj deref :cljs -deref) [this]
(a/set state- -stepped false)
(when (= ::requested (a/get state- -done)) (cleanup-then-done this))
(let [?diff (a/getset state- -diff nil)]
(let [[cleanup? ?diff] (locked (.-lock this)
(a/set state- -stepped false)
[(= ::requested @indone) (a/getset state- -diff nil)])]
(when cleanup? (cleanup-then-done this))
(cond (a/get state- -cancelled) (throw (Cancelled.))
(map? ?diff) ?diff
:else (throw ?diff)))))
(map? ?diff) ?diff
:else (throw ?diff)))))

(defn cleanup-then-done [^Ps ps]
(a/fset ps -input-ps nil, -done ::yes, -item* nil)
(locked (.-lock ps) (a/fset ps -input-ps nil, -item* nil))
(reset! (.-indone ps) ::yes)
((.-done ps)))
(defn going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.longValue i))
:cljs (.-going ps)))
(defn ++going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.incrementAndGet i))
:cljs (set! (.-going ps) (inc (.-going ps)))))
(defn --going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.getAndDecrement i))
(defn --going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.decrementAndGet i))
:cljs (set! (.-going ps) (dec (.-going ps)))))

(def item-field-count (a/deffields -v -flow -ps* -dead))
Expand All @@ -59,7 +74,10 @@
(#?(:clj invoke :cljs -invoke) [_ v]
(when-not (or (= v (a/getset a -cache v)) (a/getset a -stepped true))
(step)))
Orphanable (orphan [this] (a/set a -orphaned true) (when-not (a/get a -stepped) (cleanup-item-ps this a done)))
Orphanable (orphan [this]
(a/set a -orphaned true)
(remove-item-ps item this)
(when-not (a/get a -stepped) (cleanup-item-ps this a done)))
IDeref
(#?(:clj deref :cljs -deref) [this]
(a/set a -stepped false)
Expand Down Expand Up @@ -120,37 +138,39 @@
nil (:change diff))))

(defn needed-diff? [d]
(or (= (d/empty-diff 0) d) (seq (:permutation d)) (pos? (:grow d)) (pos? (:shrink d)) (seq (:freeze d))))
(or (seq (:permutation d)) (pos? (:grow d)) (pos? (:shrink d)) (seq (:freeze d))))

(defn transfer-input [^Ps ps]
(loop [diff (a/fgetset ps -diff {:change {}})]
(let [?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))]
(--going ps)
(if (map? ?in-diff)
(do (grow! ps ?in-diff)
(permute! ps ?in-diff)
(shrink! ps ?in-diff)
(change! ps ?in-diff)
(let [newdiff (a/fset ps -diff (cond->> (assoc ?in-diff :change (:change (a/fget ps -diff)))
diff (d/combine diff)))]
(if (zero? (going ps))
(when (and (not (a/fget ps -stepped)) (needed-diff? newdiff))
(a/fset ps -stepped true) ((.-step ps)))
(recur newdiff))))
(do (some-> (a/fget ps -input-ps) call)
(a/fset-not= ps -done ::yes ::requested)
(a/fset ps -diff (if (neg? (going ps)) (ex-info "uninitialized input process" {}) ?in-diff))
(when-not (a/fgetset ps -stepped true) ((.-step ps))))))))
(let [step?
(locked (.-lock ps)
(loop [diff (a/fgetset ps -diff {:change {}})]
(let [?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))]
(if (map? ?in-diff)
(do (grow! ps ?in-diff) (permute! ps ?in-diff) (shrink! ps ?in-diff) (change! ps ?in-diff)
(let [newdiff (a/fset ps -diff (cond->> (assoc ?in-diff :change (:change (a/fget ps -diff)))
diff (d/combine diff)))]
(if (= 1 (going ps))
(case (a/fget ps -stepped)
false (when (needed-diff? newdiff) (a/fset ps -stepped true))
true nil
nil (a/fset ps -stepped true))
(do (--going ps) (recur newdiff)))))
(do (some-> (a/fget ps -input-ps) call)
(swap! (.-indone ps) (fn [v] (if (= ::yes v) ::yes ::requested)))
(a/fset ps -diff (if (zero? (going ps)) (ex-info "uninitialized input process" {}) ?in-diff))
(not (a/fgetset ps -stepped true)))))))]
(--going ps)
(when step? ((.-step ps)))))

(def +initial-item-size+ 8)
(defn flow [input]
(fn [step done]
(let [ps (->Ps step done #?(:clj (new AtomicLong -1) :cljs -1) #?(:clj (new AtomicBoolean false) :cljs false)
(object-array ps-field-count))]
(a/fset ps -item* (object-array +initial-item-size+), -stepped false, -done ::no)
(let [^Ps ps (->Ps step done #?(:clj (new AtomicLong -1) :cljs -1) (atom ::no) (object-array ps-field-count)
#?(:clj (new ReentrantLock)))]
(a/fset ps -item* (object-array +initial-item-size+), -stepped nil)
(a/fset ps -input-ps (input
#(when (= 1 (++going ps)) (transfer-input ps))
#(if (or (pos? (going ps)) (a/fget ps -stepped))
(a/fset ps -done ::requested)
(reset! (.-indone ps) ::requested)
(cleanup-then-done ps))))
(++going ps) (transfer-input ps) ps)))
18 changes: 18 additions & 0 deletions test/hyperfiddle/incseq/items_eager_impl_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,24 @@
_ (t/is (= :input-cancel (q)))
_ (t/is (thrown? ExceptionInfo @items))]))

(t/deftest input-transfer-decrements-on-non-needed-diff
(let [q (->mq)
_ (q (d/empty-diff 0)) ; what input will return on transfer
items (spawn-ps q)
[in-step _in-done] (q)
_ (t/is (= :items-step (q)))
_ (t/is (= (d/empty-diff 0) @items))
_ (q (d/empty-diff 0))
_ (in-step)
_ (q ::none)
_ (t/is (= ::none (q)))
_ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo}))
_ (in-step)
_ (t/is (= (-> (d/empty-diff 1) (assoc :grow 1) (dissoc :change)) (dissoc @items :change)))
_ (t/is (= :items-step (q)))
_ (q ::none)
_ (t/is (= ::none (q)))]))

;; missing tests
;; - double transfer (optional)
;; - item-ps
Expand Down

0 comments on commit 7795e87

Please sign in to comment.