diff --git a/src/contrib/data.cljc b/src/contrib/data.cljc index f1bec6f7e..2528a6a9c 100644 --- a/src/contrib/data.cljc +++ b/src/contrib/data.cljc @@ -353,3 +353,11 @@ ([f a b c d e] (fn [o] (f o a b c d e)))) (defn keep-if [v pred] (when (pred v) v)) + +(defn ->box + ([] (->box nil)) + ([init] (let [o (doto (object-array 1) (aset (int 0) init))] + (fn box + ([] (aget o (int 0))) + ([v] (aset o (int 0) v)) + ([retf swapf] (let [v (box), ret (retf v)] (box (swapf v)) ret)))))) diff --git a/src/contrib/triple_store.clj b/src/contrib/triple_store.cljc similarity index 59% rename from src/contrib/triple_store.clj rename to src/contrib/triple_store.cljc index 1c92e776d..a790fdd87 100644 --- a/src/contrib/triple_store.clj +++ b/src/contrib/triple_store.cljc @@ -1,8 +1,8 @@ (ns contrib.triple-store (:refer-clojure :exclude [find]) - (:require [dom-top.core :refer [loopr]] - [clojure.set :as set] - [contrib.assert :as ca])) + (:require [clojure.set :as set] + [contrib.assert :as ca] + [contrib.data :refer [->box]])) ;; ts - triple store ;; e - entity (id of entity) @@ -18,27 +18,25 @@ ;; ave :foo 1 -> (sorted-set 1 2) <- sorted so e.g. :parent e is well ordered ;; vea 1 1 -> #{:foo :bar} CURRENTLY NOT USED/FILLED -(defrecord TripleStore [o eav ave vea]) +(defrecord TripleStore [o eav ave]) -(defn ->ts ([] (->ts {})) ([o] (->TripleStore o {} {} {}))) +(defn ->ts ([] (->ts {})) ([o] (->TripleStore o {} {}))) (defn add [ts nd] (let [e (get nd :db/id) - [eav ave vea] - (loopr [eav (:eav ts), ave (:ave ts), vea (:vea ts)] - [[a v] nd] - (recur (update eav e assoc a v) - (update ave a update v (fnil conj (sorted-set)) e) - vea - #_(update vea v update e (fnil conj #{}) a)))] - (->TripleStore (:o ts) eav ave vea))) + -eav (->box (:eav ts)), -ave (->box (:ave ts))] + (reduce-kv (fn [_ a v] + (-eav (update (-eav) e assoc a v)) + (-ave (update (-ave) a update v (fnil conj (sorted-set)) e))) + nil nd) + (->TripleStore (:o ts) (-eav) (-ave)))) (defn del [ts e] (let [nd (-> ts :eav (get e)) - {:keys [o eav ave vea]} ts + {:keys [o eav ave]} ts eav (dissoc eav e) ave (reduce-kv (fn [ave a v] (update ave a update v disj e)) ave nd)] - (->TripleStore o eav ave vea))) + (->TripleStore o eav ave))) (defn upd [ts e a f] (let [v0 (-> ts :eav (get e) (get a)) @@ -48,26 +46,13 @@ (:ave ts) (let [ave (update (:ave ts) a update v1 (fnil conj (sorted-set)) e) ave (cond-> ave (contains? (get ave a) v0) (update a update v0 disj e))] - (cond-> ave (not (seq (-> ave (get a) (get v0)))) (update a dissoc v0)))) - vea (:vea ts) - ;; vea (update (:vea ts) v1 update e (fnil conj #{}) a) - ;; vea (cond-> vea (contains? (get vea v0) e) (update v0 update e disj a)) - ] - (->TripleStore (:o ts) eav ave vea))) + (cond-> ave (not (seq (-> ave (get a) (get v0)))) (update a dissoc v0))))] + (->TripleStore (:o ts) eav ave))) (defn asc ([ts e a v] (upd ts e a (fn [_] v))) ([ts e a v & avs] (apply asc (asc ts e a v) e avs))) -(defn get-entity [ts e] (get (:eav ts) e)) - -(defn ->datoms [ts] - (loopr [datoms (transient [])] - [[e av] (:eav ts) - [a v] av] - (recur (conj! datoms [e a v])) - (persistent! datoms))) - ;;;;;;;;;;;;;;; ;;; HELPERS ;;; ;;;;;;;;;;;;;;; diff --git a/src/hyperfiddle/electric/impl/array_fields.cljc b/src/hyperfiddle/electric/impl/array_fields.cljc index 4e35ed8a1..693cddf1a 100644 --- a/src/hyperfiddle/electric/impl/array_fields.cljc +++ b/src/hyperfiddle/electric/impl/array_fields.cljc @@ -2,29 +2,51 @@ (:refer-clojure :exclude [get set]) #?(:cljs (:require-macros hyperfiddle.electric.impl.array-fields)) (:require [hyperfiddle.rcf :as rcf :refer [tests]])) -;; #?(:clj (set! *warn-on-reflection* true)) +#?(:clj (set! *warn-on-reflection* true)) (defmacro deffields [& fields] `(do ~@(for [[fld idx] (mapv vector fields (range))] - `(def ~fld (int ~idx))))) + `(def ~fld (int ~idx))) + ~(count fields))) +(defn get [^objects a k] (aget a (int k))) +(defn set + ([^objects a i v] (aset a (int i) v)) + ([^objects a i v i2 v2] (aset a (int i) v) (aset a (int i2) v2)) + ([^objects a i v i2 v2 i3 v3] (aset a (int i) v) (aset a (int i2) v2) (aset a (int i3) v3)) + ([^objects a i v i2 v2 i3 v3 i4 v4] (aset a (int i) v) (aset a (int i2) v2) (aset a (int i3) v3) (aset a (int i4) v4)) + ([^objects a i v i2 v2 i3 v3 i4 v4 & more] (set a i v i2 v2 i3 v3 i4 v4) (apply set a more))) (defn swap - ([^objects a k f] (aset a k (f (aget a k)))) - ([^objects a k f x] (aset a k (f (aget a k) x))) - ([^objects a k f x y] (aset a k (f (aget a k) x y))) - ([^objects a k f x y z] (aset a k (f (aget a k) x y z))) - ([^objects a k f x y z & more] (aset a k (apply f (aget a k) x y z more)))) + ([^objects a k f] (set a k (f (get a k)))) + ([^objects a k f x] (set a k (f (get a k) x))) + ([^objects a k f x y] (set a k (f (get a k) x y))) + ([^objects a k f x y z] (set a k (f (get a k) x y z))) + ([^objects a k f x y z & more] (set a k (apply f (get a k) x y z more)))) (defmacro fswap [O k f & args] `(swap (.-state- ~O) ~k ~f ~@args)) -(defn get [^objects a k] (aget a k)) (defmacro fget [O k] `(get (.-state- ~O) ~k)) -(defmacro set [arr & kvs] - (let [ar (with-meta (gensym "arr") {:tag 'objects})] - `(let [~ar ~arr] - ~@(for [[k v] (partition 2 kvs)] - ;; FIXME better way to fix reflection warning than call `identity`? - `(aset ~ar ~k (identity ~v)))))) (defmacro fset [O & kvs] `(set (.-state- ~O) ~@kvs)) -(defn getset [^objects a k v] (let [ret (aget a k)] (aset a k v) ret)) +(defn getset [^objects a k v] (let [ret (get a k)] (when (not= ret v) (set a k v)) ret)) (defmacro fgetset [O k v] `(getset (.-state- ~O) ~k ~v)) -(defn getswap [^objects a k f] (let [ret (aget a k)] (swap a k f) ret)) +(defn getswap [^objects a k f] (let [ret (get a k)] (swap a k f) ret)) +(defn set= [^objects a i oldv newv] (if (= oldv (get a i)) (do (set a i newv) true) false)) +(defmacro fset= [O i oldv newv] `(set= (.-state- ~O) ~i ~oldv ~newv)) +(defn set-not= [^objects a i oldv newv] (if (not= oldv (get a i)) (do (set a i newv) true) false)) +(defmacro fset-not= [O i oldv newv] `(set-not= (.-state- ~O) ~i ~oldv ~newv)) + +(defn copy [x y n] #?(:clj (System/arraycopy x 0 y 0 n) :cljs (dotimes [i n] (aset y i (aget x i)))) y) +(defn overfit [k n] (loop [k (* 2 k)] (if (>= k n) k (recur (* 2 k))))) +(defn ensure-fits ^objects [^objects a n] (let [l (alength a)] (cond-> a (< l n) (copy (object-array (overfit l n)) l)))) + +(defn rot + ([^objects a i j] (let [tmp (get a i)] (set a i (get a j) j tmp))) + ([^objects a i j k] (let [tmp (get a i)] (set a i (get a j) j (get a k) k tmp))) + ([^objects a i j k l] (let [tmp (get a i)] (set a i (get a j) j (get a k) k (get a l) l tmp))) + ([^objects a i j k l & more] + (let [tmp (get a i)] + (rot a i j k l) + (loop [[i j :as more] (seq (cons l more))] + (if j + (do (set a i (get a j)) (recur (next more))) + (set a i tmp)))))) + ;;; TESTS ;;; (deftype P [state-]) @@ -42,3 +64,21 @@ (getswap (.-state- aP) x inc) := 100 (fget aP x) := 101 )) + +(tests + (let [a (object-array [:a :b])] + (rot a 0 1) + (vec a) := [:b :a]) + (let [a (object-array [:a :b :c])] + (rot a 0 2 1) + (vec a) := [:c :a :b]) + (let [a (object-array [:a :b :c :d])] + (rot a 0 2 1 3) + (vec a) := [:c :d :b :a]) + (let [a (object-array [:a :b :c :d :e :f :g])] + (apply rot a (range 7)) + (vec a) := [:b :c :d :e :f :g :a])) + +(tests + (alength (ensure-fits (object-array 2) 9)) := 16 + ) diff --git a/src/hyperfiddle/incseq.cljc b/src/hyperfiddle/incseq.cljc index 1a53602db..9a75c487d 100644 --- a/src/hyperfiddle/incseq.cljc +++ b/src/hyperfiddle/incseq.cljc @@ -40,7 +40,7 @@ successive sequence diffs. Incremental sequences are applicative functors with ` (:require [hyperfiddle.incseq.fixed-impl :as f] [hyperfiddle.incseq.perm-impl :as p] [hyperfiddle.incseq.diff-impl :as d] - [hyperfiddle.incseq.items-impl :as i] + [hyperfiddle.incseq.items-eager-impl :as i] [hyperfiddle.incseq.latest-product-impl :as lp] [hyperfiddle.incseq.latest-concat-impl :as lc] [hyperfiddle.rcf :refer [tests]] diff --git a/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc b/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc new file mode 100644 index 000000000..f8e91bfe8 --- /dev/null +++ b/src/hyperfiddle/incseq/flow_protocol_enforcer.cljc @@ -0,0 +1,30 @@ +(ns hyperfiddle.incseq.flow-protocol-enforcer + #?(:clj (:import [clojure.lang IDeref IFn])) + #?(:cljs (:require-macros [hyperfiddle.incseq.flow-protocol-enforcer :refer [cannot-throw]]))) + +(defn violated + ([nm msg] (println nm "flow protocol violation:" msg) #?(:cljs (.error js/console) :clj (prn (Throwable.)))) + ([nm msg e] + (println nm "flow protocol violation:" msg) + (#?(:clj prn :cljs js/console.error) e))) + +(defmacro cannot-throw [nm f] `(try (~f) (catch ~(if (:js-globals &env) :default 'Throwable) e# + (violated ~nm ~(str f " cannot throw") e#)))) + +(defn flow + ([input-flow] (flow "" input-flow)) + ([nm input-flow] + (fn [step done] + (let [!should-step? (atom ::init), !done? (atom false) + step (fn [] + (when @!done? (violated nm "step after done")) + (if (first (swap-vals! !should-step? not)) (cannot-throw nm step) (violated nm "double step"))) + done (fn [] (if (first (reset-vals! !done? true)) (violated nm "done called twice") (cannot-throw nm done))) + cancel (try (input-flow step done) + (catch #?(:clj Throwable :cljs :default) e (violated "flow process creation threw" e)))] + (reify + IFn (#?(:clj invoke :cljs -invoke) [_] (cannot-throw nm cancel)) + IDeref (#?(:clj deref :cljs -deref) [_] + (if-let [should-step (first (swap-vals! !should-step? not))] + (violated nm (if (= ::init should-step) "transfer without initial step" "double transfer")) + @cancel))))))) diff --git a/src/hyperfiddle/incseq/items_eager_impl.cljc b/src/hyperfiddle/incseq/items_eager_impl.cljc new file mode 100644 index 000000000..9648bec39 --- /dev/null +++ b/src/hyperfiddle/incseq/items_eager_impl.cljc @@ -0,0 +1,176 @@ +(ns hyperfiddle.incseq.items-eager-impl + (:require [contrib.data :refer [->box]] + [contrib.debug :as dbg] + [clojure.set :as set] + [hyperfiddle.electric.impl.array-fields :as a] + [hyperfiddle.incseq.diff-impl :as d] + [hyperfiddle.incseq.perm-impl :as p]) + (:import #?(:clj [clojure.lang IDeref IFn]) + #?(:clj [java.util.concurrent.atomic AtomicLong]) + #?(:clj [java.util.concurrent.locks ReentrantLock]) + [missionary Cancelled]) + #?(:cljs (:require-macros [hyperfiddle.incseq.items-eager-impl :refer [locked]]))) + +#?(:clj (set! *warn-on-reflection* true)) + +(def ps-field-count (a/deffields -stepped -cancelled -input-ps -diff -item*)) + +(defmacro locked [on & body] + (if (:js-globals &env) + `(do ~@body) + (let [l (with-meta (gensym "lock") {:tag `ReentrantLock})] + `(let [~l ~on] (.lock ~l) (let [v# (do ~@body)] (.unlock ~l) v#))))) + +(declare cleanup-then-done) +(defn call [f] (f)) +(deftype Ps [step done going indone state- #?(:clj lock)] + IFn (#?(:clj invoke :cljs -invoke) [this] + (let [step? (locked (.-lock this) + (swap! indone (fn [v] (if (= v ::yes) ::yes ::requested))) + (let [cancelled? (a/getset state- -cancelled true)] + (not (or (a/getset state- -stepped true) cancelled? (= ::yes @indone)))))] + (some-> (a/get state- -input-ps) call) + (when step? (step)))) + IDeref (#?(:clj deref :cljs -deref) [this] + (let [[cleanup? ?diff] (locked (.-lock this) + (a/set state- -stepped false) + [(= ::requested @indone) (a/getset state- -diff nil)])] + (when cleanup? (cleanup-then-done this)) + (cond (a/get state- -cancelled) (throw (Cancelled.)) + (map? ?diff) ?diff + :else (throw ?diff))))) + +(defn cleanup-then-done [^Ps ps] + (locked (.-lock ps) (a/fset ps -input-ps nil, -item* nil)) + (reset! (.-indone ps) ::yes) + ((.-done ps))) +(defn going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.longValue i)) + :cljs (.-going ps))) +(defn ++going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.incrementAndGet i)) + :cljs (set! (.-going ps) (inc (.-going ps))))) +(defn --going [^Ps ps] #?(:clj (let [^AtomicLong i (.-going ps)] (.decrementAndGet i)) + :cljs (set! (.-going ps) (dec (.-going ps))))) + +(def item-field-count (a/deffields -v -flow -ps* -dead)) +(deftype Item [state-]) + +(def item-ps-field-count (a/deffields _stepped _cancelled -cache -orphaned)) ; -stepped would warn of redefinition + +(defn remove-item-ps [^Item item ps] (swap! (a/fget item -ps*) disj ps)) + +(defn cleanup-item-ps [ps a done] (when-not (= ps (a/getset a -cache ps)) (done))) + +(defprotocol Orphanable (orphan [_])) + +(defn ->item-ps [^Item item step done] + (let [a (object-array item-ps-field-count)] + (a/set a -cache a, -cancelled false, -orphaned false) + (reify + IFn + (#?(:clj invoke :cljs -invoke) [this] + (remove-item-ps item this) + (let [cancelled? (a/getset a -cancelled true)] + (when (not (or (a/getset a -stepped true) cancelled?)) (step)))) + (#?(:clj invoke :cljs -invoke) [_ v] + (when-not (or (= v (a/getset a -cache v)) (a/getset a -stepped true)) + (step))) + Orphanable (orphan [this] + (a/set a -orphaned true) + (remove-item-ps item this) + (when-not (a/get a -stepped) (cleanup-item-ps this a done))) + IDeref + (#?(:clj deref :cljs -deref) [this] + (a/set a -stepped false) + (let [v (a/get a -cache)] + (when (a/get a -orphaned) (cleanup-item-ps this a done)) + (if (a/get a -cancelled) + (do (cleanup-item-ps this a done) (throw (Cancelled.))) + v)))))) + +(let [cancelled #?(:clj (Object.) :cljs (js/Object.))] + (defn ->dead-item-ps [step done -v] + (step) + (let [ (->box -v)] + (reify + IFn (#?(:clj invoke :cljs -invoke) [_] ( cancelled)) + Orphanable (orphan [_]) + IDeref (#?(:clj deref :cljs -deref) [this] + (done) + (if (identical? cancelled ()) (throw (Cancelled.)) (let [v ()] ( this) v))))))) + +(defn grow! [^Ps ps {d :degree, n :grow, p :permutation}] + (let [idx (set/map-invert p)] + (a/fgetset ps -item* (a/ensure-fits (a/fget ps -item*) d)) + (run! (fn [i] + (let [^Item item (->Item (object-array item-field-count))] + (a/fset item -ps* (atom #{})) + (a/set (a/fget ps -item*) i item) + (a/fswap ps -diff update :change assoc (idx i i) + (a/fset item -flow (fn [step done] + (if (a/fget item -dead) + (->dead-item-ps step done (a/fget item -v)) + (let [item-ps (->item-ps item step done)] + (swap! (a/fget item -ps*) conj item-ps) + (item-ps (a/fget item -v)) + item-ps))))))) + (range (- d n) d)))) + +(defn permute! [^Ps ps {p :permutation}] + (let [rot* (p/decompose conj #{} p) + item* (a/fget ps -item*)] + (run! (fn [rot] (apply a/rot item* rot)) rot*))) + +(defn shrink! [^Ps ps {d :degree, n :shrink}] + (let [item* (a/fget ps -item*)] + (run! (fn [i] + (let [^Item item (a/get item* i)] + (a/fset item -dead true) + (a/set item* i nil) + (run! orphan @(a/fget item -ps*)))) + (range (- d n) d)))) + +(defn change! [^Ps ps diff] + (let [item* (a/fget ps -item*)] + (reduce-kv (fn [_ i v] + (let [^Item item (a/get item* i)] + (a/fset item -v v) + (run! (fn [item-ps] (item-ps v)) @(a/fget item -ps*)))) + nil (:change diff)))) + +(defn needed-diff? [d] + (or (seq (:permutation d)) (pos? (:grow d)) (pos? (:shrink d)) (seq (:freeze d)))) + +(defn transfer-input [^Ps ps] + (let [step? + (locked (.-lock ps) + (loop [diff (a/fgetset ps -diff {:change {}})] + (let [?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))] + (if (map? ?in-diff) + (do (grow! ps ?in-diff) (permute! ps ?in-diff) (shrink! ps ?in-diff) (change! ps ?in-diff) + (let [newdiff (a/fset ps -diff (cond->> (assoc ?in-diff :change (:change (a/fget ps -diff))) + diff (d/combine diff)))] + (if (= 1 (going ps)) + (case (a/fget ps -stepped) + false (when (needed-diff? newdiff) (a/fset ps -stepped true)) + true nil + nil (a/fset ps -stepped true)) + (do (--going ps) (recur newdiff))))) + (do (some-> (a/fget ps -input-ps) call) + (swap! (.-indone ps) (fn [v] (if (= ::yes v) ::yes ::requested))) + (a/fset ps -diff (if (zero? (going ps)) (ex-info "uninitialized input process" {}) ?in-diff)) + (not (a/fgetset ps -stepped true)))))))] + (--going ps) + (when step? ((.-step ps))))) + +(def +initial-item-size+ 8) +(defn flow [input] + (fn [step done] + (let [^Ps ps (->Ps step done #?(:clj (new AtomicLong -1) :cljs -1) (atom ::no) (object-array ps-field-count) + #?(:clj (new ReentrantLock)))] + (a/fset ps -item* (object-array +initial-item-size+), -stepped nil) + (a/fset ps -input-ps (input + #(when (= 1 (++going ps)) (transfer-input ps)) + #(if (or (pos? (going ps)) (a/fget ps -stepped)) + (reset! (.-indone ps) ::requested) + (cleanup-then-done ps)))) + (++going ps) (transfer-input ps) ps))) diff --git a/test/contrib/triple_store_test.clj b/test/contrib/triple_store_test.cljc similarity index 80% rename from test/contrib/triple_store_test.clj rename to test/contrib/triple_store_test.cljc index 7e31bc7c1..9548a29f5 100644 --- a/test/contrib/triple_store_test.clj +++ b/test/contrib/triple_store_test.cljc @@ -3,12 +3,12 @@ [hyperfiddle.rcf :as rcf :refer [tests]])) (tests - (-> (ts/->ts) (ts/add {:db/id 1, :foo 2}) (ts/get-entity 1) :foo) := 2 + (-> (ts/->ts) (ts/add {:db/id 1, :foo 2}) (ts/->node 1) :foo) := 2 (-> (ts/->ts) (ts/add {:db/id 1, :foo 1}) (ts/add {:db/id 2, :foo 1}) :ave :foo (get 1)) := #{1 2} ;; (-> (ts/->ts) (ts/add {:db/id 1, :foo 2, :bar 2}) :vea (get 2) (get 1)) := #{:foo :bar} - (-> (ts/->ts) (ts/add {:db/id 1, :foo 2, :bar 2}) (ts/get-entity 1) (select-keys [:foo :bar :baz])) := {:foo 2, :bar 2} + (-> (ts/->ts) (ts/add {:db/id 1, :foo 2, :bar 2}) (ts/->node 1) (select-keys [:foo :bar :baz])) := {:foo 2, :bar 2} - (-> (ts/->ts) (ts/add {:db/id '_}) (ts/upd '_ :x (fnil inc 0)) (ts/upd '_ :x (fnil inc 0)) (ts/get-entity '_) :x) := 2 + (-> (ts/->ts) (ts/add {:db/id '_}) (ts/upd '_ :x (fnil inc 0)) (ts/upd '_ :x (fnil inc 0)) (ts/->node '_) :x) := 2 (-> (ts/->ts) (ts/add {:db/id 1}) (ts/asc 1 :x 2) (ts/asc 1 :x 2) :ave :x (get 2)) := #{1} (-> (ts/->ts) (ts/add {:db/id 1}) (ts/asc 1 :x 2 :y 3) :eav (get 1)) := {:db/id 1, :x 2, :y 3} diff --git a/test/hyperfiddle/incseq/items_eager_impl_test.cljc b/test/hyperfiddle/incseq/items_eager_impl_test.cljc new file mode 100644 index 000000000..0005c0be6 --- /dev/null +++ b/test/hyperfiddle/incseq/items_eager_impl_test.cljc @@ -0,0 +1,689 @@ +(ns hyperfiddle.incseq.items-eager-impl-test + (:require + [clojure.test :as t] + [contrib.assert :as ca] + [contrib.data :refer [->box]] + [hyperfiddle.incseq.diff-impl :as d] + [hyperfiddle.incseq.items-eager-impl :as items] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe] + [missionary.core :as m]) + (:import #?(:clj [clojure.lang ExceptionInfo IDeref IFn]) + [missionary Cancelled])) + +(defn ->queue + ([] #?(:clj clojure.lang.PersistentQueue/EMPTY :cljs #queue [])) + ([& args] (into (->queue) args))) + +(defn ->mq [] + (let [box (->box (->queue))] + (fn + ([] (let [q (box)] (ca/is q seq "empty test queue") (box (pop q)) (peek q))) + ([v] (box (conj (box) v)))))) + +(t/deftest queue-test + (let [q (->mq)] + (q 1) (t/is (= 1 (q))) + (q 2) (q 3) (t/is (= 2 (q))) (t/is (= 3 (q))) + (t/is (thrown? ExceptionInfo (q))))) + +(defn spawn-ps + ([q] (spawn-ps q (->box (fn [_step _done] (q))))) + ([q ] (spawn-ps q (->box (fn [_step _done] (q :input-cancel))))) + ([q ] + ((fpe/flow "i/items" (items/flow (fn [step done] + (q [step done]) + (step) + (reify + IFn (#?(:clj invoke :cljs -invoke) [_] (() step done)) + IDeref (#?(:clj deref :cljs -deref) [_] (() step done)))))) + #(q :items-step) #(q :items-done)))) + +(t/deftest spawn + (let [q (->mq) + _ (q (d/empty-diff 0)) ; what input will return on transfer + ps (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (t/is (= (d/empty-diff 0) @ps)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest one-item + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest one-item-change + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (q (assoc (d/empty-diff 1) :change {0 :bar})) + _ (in-step) + _ (t/is (= :item0-step (q))) + _ (t/is (= :bar @item0)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest one-item-dedupes + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (q (assoc (d/empty-diff 1) :change {0 :foo})) + _ (in-step) + _ (q ::none) ; :foo = :foo, so we skipped + _ (t/is (= ::none (q)))])) + +(t/deftest two-items + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (q {:grow 1, :degree 2, :shrink 0, :permutation {}, :freeze #{}, :change {1 :bar}}) + _ (in-step) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= {:grow 1, :degree 2, :shrink 0, :permutation {}, :freeze #{}} (dissoc diff :change))) + item1 ((-> diff :change (get 1)) #(q :item1-step) #(q :item1-done)) + _ (t/is (= :item1-step (q))) + _ (t/is (= :bar @item1)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest item-is-latest + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (q (assoc (d/empty-diff 1) :change {0 :bar})) + _ (in-step) + _ (t/is (= :bar @item0)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest two-item-processes + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0-ps0 ((-> diff :change (get 0)) #(q :item0-ps0-step) #(q :item0-ps0-done)) + _ (t/is (= :item0-ps0-step (q))) + item0-ps1 ((-> diff :change (get 0)) #(q :item0-ps1-step) #(q :item0-ps1-done)) + _ (t/is (= :item0-ps1-step (q))) + _ (t/is (= :foo @item0-ps1)) ; ps1 reads, ps0 didn't + _ (q (assoc (d/empty-diff 1) :change {0 :bar})) + _ (in-step) + _ (t/is (= :item0-ps1-step (q))) ; ps1 steps because it already transferred + _ (t/is (= :bar @item0-ps0)) ; ps0 transfers latest + _ (t/is (= :bar @item0-ps1)) ; ps1 transfers + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest permutation + (let [q (->mq) + _ (q (assoc (d/empty-diff 2) :grow 2 :change {0 :foo, 1 :bar})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 2) :grow 2) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + item1 ((-> diff :change (get 1)) #(q :item1-step) #(q :item1-done)) + _ (t/is (= :item1-step (q))) + _ (t/is (= :bar @item1)) + perm (assoc (d/empty-diff 2) :permutation {0 1, 1 0}) + _ (q perm) + _ (in-step) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= perm diff)) + _ (q (assoc (d/empty-diff 2) :change {0 :baz})) + _ (in-step) + _ (t/is (= :item1-step (q))) ; change on 0 means item1 after permutation + _ (t/is (= :baz @item1)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest shrink-idle-item-ps + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + shrink1 (assoc (d/empty-diff 1) :shrink 1) + _ (q shrink1) + _ (in-step) + _ (t/is (= :item0-done (q))) + _ (t/is (= :items-step (q))) + _ (t/is (= shrink1 @items)) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest shrink-stepped-item-ps + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + shrink1 (assoc (d/empty-diff 1) :shrink 1) + _ (q shrink1) + _ (in-step) + _ (t/is (= :items-step (q))) + _ (t/is (= shrink1 @items)) + _ (t/is (= :foo @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest dead-item-ps-returns-last-value-and-terminates + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0-flow (-> diff :change (get 0)) + shrink1 (assoc (d/empty-diff 1) :shrink 1) + _ (q shrink1) + _ (in-step) + _ (t/is (= :items-step (q))) + _ (t/is (= shrink1 @items)) + item0 (item0-flow #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest dead-item-ps-cancelled-throws-and-terminates + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0-flow (-> diff :change (get 0)) + shrink1 (assoc (d/empty-diff 1) :shrink 1) + _ (q shrink1) + _ (in-step) + _ (t/is (= :items-step (q))) + _ (t/is (= shrink1 @items)) + item0 (item0-flow #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (item0) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest item-ps-cancellation-idle + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (item0) + _ (t/is (= :item0-step (q))) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest item-ps-cancellation-stepped + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (item0) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest cancellation-idle + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + _ (items) + _ (t/is (= :input-cancel (q))) + _ (t/is (= :items-step (q))) + _ (t/is (thrown? Cancelled @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest cancellation-stepped + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (items) + _ (t/is (= :input-cancel (q))) + _ (t/is (thrown? Cancelled @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest double-input-step + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (q (assoc (d/empty-diff 2) :grow 1 :change {1 :bar})) + _ (in-step) + diff @items + _ (t/is (= (assoc (d/empty-diff 2) :grow 2) (assoc diff :change {}))) + _ (t/is (= 2 (count (:change diff)))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest reentrant-transfer + (let [q (->mq) + items ((items/flow (m/seed [{:grow 1, :degree 1, :shrink 0, :change {0 :foo}, :permutation {}, :freeze #{}} + {:grow 1, :degree 2, :shrink 0, :change {1 :bar}, :permutation {}, :freeze #{}}])) + #(q :items-step) #(q :items-done)) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 2) :grow 2) (assoc diff :change {}))) + _ (t/is (= 2 (count (:change diff)))) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest input-terminate-during-transfer + (let [q (->mq) + items ((items/flow (m/seed [{:grow 1, :degree 1, :shrink 0, :change {0 :foo}, :permutation {}, :freeze #{}}])) + #(q :items-step) #(q :items-done)) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + _ (t/is (= 1 (count (:change diff)))) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest input-terminate-when-idle + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + _ (t/is (= 1 (count (:change diff)))) + _ (in-done) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest input-terminate-when-stepped + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + _ (t/is (= 1 (count (:change diff)))) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :bar})) + _ (in-step) + _ (t/is (= :items-step (q))) + _ (in-done) + _ (q ::none) + _ (t/is (= ::none (q))) + _diff @items + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest failure-on-first-transfer + (let [q (->mq) + items (spawn-ps q (->box (fn [_step done] (done) (throw (ex-info "boom" {}))))) + [_in-step _in-done] (q) + _ (t/is (= :input-cancel (q))) + _ (t/is (= :items-step (q))) + _ (t/is (thrown? ExceptionInfo @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest failure-on-non-first-transfer + (let [q (->mq) + (->box (fn [_step _done] (d/empty-diff 0))) + items (spawn-ps q ) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (t/is (= (d/empty-diff 0) @items)) + _ ( (fn [_step done] (done) (throw (ex-info "boom" {})))) + _ (in-step) + _ (t/is (= :input-cancel (q))) + _ (t/is (= :items-step (q))) + _ (t/is (thrown? ExceptionInfo @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(defn consume-calling [f*] + (let [ (->box (seq f*))] + (fn [step done] + ((ca/is ( first next) some? "overconsumed") step done)))) + +(t/deftest failure-on-reentrant-transfer + (let [q (->mq) + (->box (consume-calling [(fn [step _] (step) (d/empty-diff 0)) + (fn [_ done] (done) (throw (ex-info "boom" {})))])) + items (spawn-ps q ) + [_in-step _in-done] (q) + _ (t/is (= :input-cancel (q))) + _ (t/is (= :items-step (q))) + _ (t/is (thrown? ExceptionInfo @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest failure-after-cancellation + (let [q (->mq) + (->box (consume-calling [(fn [_ _] (d/empty-diff 0)) + (fn [_ done] (done) (throw (ex-info "boom" {})))])) + (->box (fn [_step _done])) + items (spawn-ps q ) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (t/is (= (d/empty-diff 0) @items)) + _ (items) + _ (t/is (= :items-step (q))) + _ (t/is (thrown? Cancelled @items)) ; is this OK or should the ExInfo come out + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest grow + (let [q (->mq) + n (inc items/+initial-item-size+) + _ (q (assoc (d/empty-diff n) :grow n :change (zipmap (range n) (repeat :foo)))) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= 9 (count (:change diff)))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest double-cancellation-stepped + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (items) + _ (t/is (= :input-cancel (q))) + _ (items) + _ (t/is (= :input-cancel (q))) + _ (t/is (thrown? Cancelled @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest double-cancellation-idle + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _diff @items + _ (items) + _ (t/is (= :input-cancel (q))) + _ (t/is (= :items-step (q))) + _ (items) + _ (t/is (= :input-cancel (q))) + _ (t/is (thrown? Cancelled @items)) + _ (t/is (= :items-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest cancel-after-done + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _diff @items + _ (items) + _ (t/is (= :input-cancel (q))) + _ (t/is (= :items-step (q))) + _ (t/is (thrown? Cancelled @items)) + _ (t/is (= :items-done (q))) + _ (items) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest cancel-after-done-normally + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step in-done] (q) + _ (t/is (= :items-step (q))) + _diff @items + _ (in-done) + _ (t/is (= :items-done (q))) + _ (items) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest item-ps-double-cancellation-idle + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (item0) + _ (t/is (= :item0-step (q))) + _ (item0) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest item-ps-double-cancellation-stepped + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (item0) + _ (item0) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest item-ps-cancel-after-done + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [_in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0 ((-> diff :change (get 0)) #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (item0) + _ (t/is (= :item0-step (q))) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (item0) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest dead-item-ps-cancel-after-done + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0-flow (-> diff :change (get 0)) + shrink1 (assoc (d/empty-diff 1) :shrink 1) + _ (q shrink1) + _ (in-step) + _ (t/is (= :items-step (q))) + _ (t/is (= shrink1 @items)) + item0 (item0-flow #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (t/is (= :foo @item0)) + _ (t/is (= :item0-done (q))) + _ (item0) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest dead-item-ps-cancel-after-throw + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + item0-flow (-> diff :change (get 0)) + shrink1 (assoc (d/empty-diff 1) :shrink 1) + _ (q shrink1) + _ (in-step) + _ (t/is (= :items-step (q))) + _ (t/is (= shrink1 @items)) + item0 (item0-flow #(q :item0-step) #(q :item0-done)) + _ (t/is (= :item0-step (q))) + _ (item0) + _ (t/is (thrown? Cancelled @item0)) + _ (t/is (= :item0-done (q))) + _ (item0) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest change-index-respects-permutation + (let [q (->mq) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= (assoc (d/empty-diff 1) :grow 1) (assoc diff :change {}))) + _ (q {:grow 1, :degree 2, :shrink 1, :permutation {0 1, 1 0}, :change {0 :bar}}) + _ (in-step) + _ (t/is (= :items-step (q))) + diff @items + _ (t/is (= 0 (-> diff :change keys first))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +(t/deftest input-must-be-initialized + (let [q (->mq) + items ((items/flow (fn [step done] + (q [step done]) + (reify + IFn (#?(:clj invoke :cljs -invoke) [_] (q :input-cancel)) + IDeref (#?(:clj deref :cljs -deref) [_] (q))))) + #(q :items-step) #(q :items-done)) + _ (t/is (= :input-cancel (q))) + _ (t/is (thrown? ExceptionInfo @items))])) + +(t/deftest input-transfer-decrements-on-non-needed-diff + (let [q (->mq) + _ (q (d/empty-diff 0)) ; what input will return on transfer + items (spawn-ps q) + [in-step _in-done] (q) + _ (t/is (= :items-step (q))) + _ (t/is (= (d/empty-diff 0) @items)) + _ (q (d/empty-diff 0)) + _ (in-step) + _ (q ::none) + _ (t/is (= ::none (q))) + _ (q (assoc (d/empty-diff 1) :grow 1 :change {0 :foo})) + _ (in-step) + _ (t/is (= (-> (d/empty-diff 1) (assoc :grow 1) (dissoc :change)) (dissoc @items :change))) + _ (t/is (= :items-step (q))) + _ (q ::none) + _ (t/is (= ::none (q)))])) + +;; missing tests +;; - double transfer (optional) +;; - item-ps +;; - dead-item-ps +;; - items +;; - thread safety +;; - freeze