diff --git a/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc b/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc index 5a9b186fa..f8e91bfe8 100644 --- a/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc +++ b/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc @@ -1,5 +1,4 @@ (ns hyperfiddle.incseq.flow-protocol-enforcer - (:require [hyperfiddle.electric.impl.array-fields :as a]) #?(:clj (:import [clojure.lang IDeref IFn])) #?(:cljs (:require-macros [hyperfiddle.incseq.flow-protocol-enforcer :refer [cannot-throw]]))) @@ -12,22 +11,20 @@ (defmacro cannot-throw [nm f] `(try (~f) (catch ~(if (:js-globals &env) :default 'Throwable) e# (violated ~nm ~(str f " cannot throw") e#)))) -(def field-count (a/deffields -should-step -is-done)) (defn flow ([input-flow] (flow "" input-flow)) ([nm input-flow] (fn [step done] - (let [o (object-array field-count) - _ (a/set o -should-step ::init, -is-done false) + (let [!should-step? (atom ::init), !done? (atom false) step (fn [] - (when (a/get o -is-done) (violated nm "step after done")) - (if (a/getswap o -should-step not) (cannot-throw nm step) (violated nm "double step"))) - done (fn [] (if (a/getset o -is-done true) (violated nm "done called twice") (cannot-throw nm done))) + (when @!done? (violated nm "step after done")) + (if (first (swap-vals! !should-step? not)) (cannot-throw nm step) (violated nm "double step"))) + done (fn [] (if (first (reset-vals! !done? true)) (violated nm "done called twice") (cannot-throw nm done))) cancel (try (input-flow step done) (catch #?(:clj Throwable :cljs :default) e (violated "flow process creation threw" e)))] (reify IFn (#?(:clj invoke :cljs -invoke) [_] (cannot-throw nm cancel)) IDeref (#?(:clj deref :cljs -deref) [_] - (if-let [should-step (a/getswap o -should-step not)] + (if-let [should-step (first (swap-vals! !should-step? not))] (violated nm (if (= ::init should-step) "transfer without initial step" "double transfer")) @cancel))))))) diff --git a/src/hyperfiddle/incseq/items_eager_impl.cljc b/src/hyperfiddle/incseq/items_eager_impl.cljc index 13bea5612..9648bec39 100644 --- a/src/hyperfiddle/incseq/items_eager_impl.cljc +++ b/src/hyperfiddle/incseq/items_eager_impl.cljc @@ -6,34 +6,49 @@ [hyperfiddle.incseq.diff-impl :as d] [hyperfiddle.incseq.perm-impl :as p]) (:import #?(:clj [clojure.lang IDeref IFn]) - #?(:clj [java.util.concurrent.atomic AtomicLong AtomicBoolean]) - [missionary Cancelled])) + #?(:clj [java.util.concurrent.atomic AtomicLong]) + #?(:clj [java.util.concurrent.locks ReentrantLock]) + [missionary Cancelled]) + #?(:cljs (:require-macros [hyperfiddle.incseq.items-eager-impl :refer [locked]]))) -(def ps-field-count (a/deffields -stepped -cancelled -input-ps -done -diff -item*)) +#?(:clj (set! *warn-on-reflection* true)) + +(def ps-field-count (a/deffields -stepped -cancelled -input-ps -diff -item*)) + +(defmacro locked [on & body] + (if (:js-globals &env) + `(do ~@body) + (let [l (with-meta (gensym "lock") {:tag `ReentrantLock})] + `(let [~l ~on] (.lock ~l) (let [v# (do ~@body)] (.unlock ~l) v#))))) (declare cleanup-then-done) (defn call [f] (f)) -(deftype Ps [step done going stepped state-] - IFn (#?(:clj invoke :cljs -invoke) [_] - (some-> (a/get state- -input-ps) call) - (a/set-not= state- -done ::yes ::requested) - (let [cancelled? (a/getset state- -cancelled true)] - (when (not (or (a/getset state- -stepped true) cancelled? (= ::yes (a/get state- -done)))) (step)))) +(deftype Ps [step done going indone state- #?(:clj lock)] + IFn (#?(:clj invoke :cljs -invoke) [this] + (let [step? (locked (.-lock this) + (swap! indone (fn [v] (if (= v ::yes) ::yes ::requested))) + (let [cancelled? (a/getset state- -cancelled true)] + (not (or (a/getset state- -stepped true) cancelled? (= ::yes @indone)))))] + (some-> (a/get state- -input-ps) call) + (when step? (step)))) IDeref (#?(:clj deref :cljs -deref) [this] - (a/set state- -stepped false) - (when (= ::requested (a/get state- -done)) (cleanup-then-done this)) - (let [?diff (a/getset state- -diff nil)] + (let [[cleanup? ?diff] (locked (.-lock this) + (a/set state- -stepped false) + [(= ::requested @indone) (a/getset state- -diff nil)])] + (when cleanup? (cleanup-then-done this)) (cond (a/get state- -cancelled) (throw (Cancelled.)) - (map? ?diff) ?diff - :else (throw ?diff))))) + (map? ?diff) ?diff + :else (throw ?diff))))) + (defn cleanup-then-done [^Ps ps] - (a/fset ps -input-ps nil, -done ::yes, -item* nil) + (locked (.-lock ps) (a/fset ps -input-ps nil, -item* nil)) + (reset! (.-indone ps) ::yes) ((.-done ps))) (defn going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.longValue i)) :cljs (.-going ps))) (defn ++going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.incrementAndGet i)) :cljs (set! (.-going ps) (inc (.-going ps))))) -(defn --going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.getAndDecrement i)) +(defn --going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.decrementAndGet i)) :cljs (set! (.-going ps) (dec (.-going ps))))) (def item-field-count (a/deffields -v -flow -ps* -dead)) @@ -59,7 +74,10 @@ (#?(:clj invoke :cljs -invoke) [_ v] (when-not (or (= v (a/getset a -cache v)) (a/getset a -stepped true)) (step))) - Orphanable (orphan [this] (a/set a -orphaned true) (when-not (a/get a -stepped) (cleanup-item-ps this a done))) + Orphanable (orphan [this] + (a/set a -orphaned true) + (remove-item-ps item this) + (when-not (a/get a -stepped) (cleanup-item-ps this a done))) IDeref (#?(:clj deref :cljs -deref) [this] (a/set a -stepped false) @@ -120,37 +138,39 @@ nil (:change diff)))) (defn needed-diff? [d] - (or (= (d/empty-diff 0) d) (seq (:permutation d)) (pos? (:grow d)) (pos? (:shrink d)) (seq (:freeze d)))) + (or (seq (:permutation d)) (pos? (:grow d)) (pos? (:shrink d)) (seq (:freeze d)))) (defn transfer-input [^Ps ps] - (loop [diff (a/fgetset ps -diff {:change {}})] - (let [?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))] - (--going ps) - (if (map? ?in-diff) - (do (grow! ps ?in-diff) - (permute! ps ?in-diff) - (shrink! ps ?in-diff) - (change! ps ?in-diff) - (let [newdiff (a/fset ps -diff (cond->> (assoc ?in-diff :change (:change (a/fget ps -diff))) - diff (d/combine diff)))] - (if (zero? (going ps)) - (when (and (not (a/fget ps -stepped)) (needed-diff? newdiff)) - (a/fset ps -stepped true) ((.-step ps))) - (recur newdiff)))) - (do (some-> (a/fget ps -input-ps) call) - (a/fset-not= ps -done ::yes ::requested) - (a/fset ps -diff (if (neg? (going ps)) (ex-info "uninitialized input process" {}) ?in-diff)) - (when-not (a/fgetset ps -stepped true) ((.-step ps)))))))) + (let [step? + (locked (.-lock ps) + (loop [diff (a/fgetset ps -diff {:change {}})] + (let [?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))] + (if (map? ?in-diff) + (do (grow! ps ?in-diff) (permute! ps ?in-diff) (shrink! ps ?in-diff) (change! ps ?in-diff) + (let [newdiff (a/fset ps -diff (cond->> (assoc ?in-diff :change (:change (a/fget ps -diff))) + diff (d/combine diff)))] + (if (= 1 (going ps)) + (case (a/fget ps -stepped) + false (when (needed-diff? newdiff) (a/fset ps -stepped true)) + true nil + nil (a/fset ps -stepped true)) + (do (--going ps) (recur newdiff))))) + (do (some-> (a/fget ps -input-ps) call) + (swap! (.-indone ps) (fn [v] (if (= ::yes v) ::yes ::requested))) + (a/fset ps -diff (if (zero? (going ps)) (ex-info "uninitialized input process" {}) ?in-diff)) + (not (a/fgetset ps -stepped true)))))))] + (--going ps) + (when step? ((.-step ps))))) (def +initial-item-size+ 8) (defn flow [input] (fn [step done] - (let [ps (->Ps step done #?(:clj (new AtomicLong -1) :cljs -1) #?(:clj (new AtomicBoolean false) :cljs false) - (object-array ps-field-count))] - (a/fset ps -item* (object-array +initial-item-size+), -stepped false, -done ::no) + (let [^Ps ps (->Ps step done #?(:clj (new AtomicLong -1) :cljs -1) (atom ::no) (object-array ps-field-count) + #?(:clj (new ReentrantLock)))] + (a/fset ps -item* (object-array +initial-item-size+), -stepped nil) (a/fset ps -input-ps (input #(when (= 1 (++going ps)) (transfer-input ps)) #(if (or (pos? (going ps)) (a/fget ps -stepped)) - (a/fset ps -done ::requested) + (reset! (.-indone ps) ::requested) (cleanup-then-done ps)))) (++going ps) (transfer-input ps) ps))) diff --git a/test/hyperfiddle/incseq/items_eager_impl_test.cljc b/test/hyperfiddle/incseq/items_eager_impl_test.cljc index 63fb63d1a..0005c0be6 100644 --- a/test/hyperfiddle/incseq/items_eager_impl_test.cljc +++ b/test/hyperfiddle/incseq/items_eager_impl_test.cljc @@ -662,6 +662,24 @@ _ (t/is (= :input-cancel (q))) _ (t/is (thrown? ExceptionInfo @items))])) +(t/deftest input-transfer-decrements-on-non-needed-diff + (let [q (->mq) + _ (q (d/empty-diff 0)) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (t/is (= (d/empty-diff 0) @items)) + _ (q (d/empty-diff 0)) + _ (in-step) + _ (q ::none) + _ (t/is (= ::none (q))) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) + _ (in-step) + _ (t/is (= (-> (d/empty-diff 1) (assoc :grow 1) (dissoc :change)) (dissoc @items :change))) + _ (t/is (= :items-step (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + ;; missing tests ;; - double transfer (optional) ;; - item-ps