Skip to content

Commit

Permalink
[i/items] thread safe counter
Browse files Browse the repository at this point in the history
  • Loading branch information
xificurC authored and dustingetz committed Sep 3, 2024
1 parent 19eb8ef commit 71b05cc
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
31 changes: 19 additions & 12 deletions src/hyperfiddle/incseq/items_eager_impl.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
[hyperfiddle.incseq.diff-impl :as d]
[hyperfiddle.incseq.perm-impl :as p])
(:import #?(:clj [clojure.lang IDeref IFn])
#?(:clj [java.util.concurrent.atomic AtomicInteger AtomicBoolean])
[missionary Cancelled]))

(def ps-field-count (a/deffields -stepped -cancelled -going -input-ps -done -diff -item*))
(def ps-field-count (a/deffields -stepped -cancelled -input-ps -done -diff -item*))

(declare cleanup-then-done)
(defn call [f] (f))
(deftype Ps [step done state-]
(deftype Ps [step done going state-]
IFn (#?(:clj invoke :cljs -invoke) [_]
(some-> (a/get state- -input-ps) call)
(a/set-not= state- -done ::yes ::requested)
Expand All @@ -28,6 +29,12 @@
(defn cleanup-then-done [^Ps ps]
(a/fset ps -input-ps nil, -done ::yes, -item* nil)
((.-done ps)))
(defn going [^Ps ps] #?(:clj (let [^AtomicInteger i (.-going ps)] (.longValue i))
:cljs (.-going ps)))
(defn ++going [^Ps ps] #?(:clj (let [^AtomicInteger i (.-going ps)] (.incrementAndGet i))
:cljs (set! (.-going ps) (inc (.-going ps)))))
(defn --going [^Ps ps] #?(:clj (let [^AtomicInteger i (.-going ps)] (.getAndDecrement i))
:cljs (set! (.-going ps) (dec (.-going ps)))))

(def item-field-count (a/deffields -v -flow -ps* -dead))
(deftype Item [state-])
Expand Down Expand Up @@ -117,34 +124,34 @@

(defn transfer-input [^Ps ps]
(loop [diff (a/fgetset ps -diff {:change {}})]
(let [going (a/fgetset ps -going true)
?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))]
(if (and (map? ?in-diff) (not going))
(let [?in-diff (try @(a/fget ps -input-ps) (catch #?(:clj Throwable :cljs :default) e e))]
(--going ps)
(if (map? ?in-diff)
(do (grow! ps ?in-diff)
(permute! ps ?in-diff)
(shrink! ps ?in-diff)
(change! ps ?in-diff)
(let [newdiff (a/fset ps -diff (cond->> (assoc ?in-diff :change (:change (a/fget ps -diff)))
diff (d/combine diff)))]
(if (a/fgetset ps -going false)
(if (zero? (going ps))
(case (a/fget ps -stepped)
false (when (needed-diff? newdiff) (a/fset ps -stepped true) ((.-step ps)))
true nil
nil (do (a/fset ps -stepped true) ((.-step ps))))
(recur newdiff))))
(do (some-> (a/fget ps -input-ps) call)
(a/fset-not= ps -done ::yes ::requested)
(a/fset ps -diff (if going (ex-info "uninitialized input process" {}) ?in-diff))
(a/fset ps -diff (if (neg? (going ps)) (ex-info "uninitialized input process" {}) ?in-diff))
(when-not (a/fgetset ps -stepped true) ((.-step ps))))))))

(def +initial-item-size+ 8)
(defn flow [input]
(fn [step done]
(let [ps (->Ps step done (object-array ps-field-count))]
(a/fset ps -item* (object-array +initial-item-size+), -stepped nil, -going true, -done ::no)
(let [ps (->Ps step done #?(:clj (AtomicInteger. -1) :cljs -1) (object-array ps-field-count))]
(a/fset ps -item* (object-array +initial-item-size+), -stepped nil, -done ::no)
(a/fset ps -input-ps (input
#(when-not (a/fgetset ps -going false) (transfer-input ps))
#(if (or (a/fget ps -stepped) (a/fget ps -going))
#(when (= 1 (++going ps)) (transfer-input ps))
#(if (or (pos? (going ps)) (a/fget ps -stepped))
(a/fset ps -done ::requested)
(cleanup-then-done ps))))
(transfer-input ps) ps)))
(++going ps) (transfer-input ps) ps)))
2 changes: 1 addition & 1 deletion test/hyperfiddle/incseq/items_eager_impl_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@
(let [q (->mq)
<transfer-fn> (->box (consume-calling [(fn [_ _] (d/empty-diff 0))
(fn [_ done] (done) (throw (ex-info "boom" {})))]))
<cancel-fn> (->box (fn [step _done] (step)))
<cancel-fn> (->box (fn [_step _done]))
items (spawn-ps q <transfer-fn> <cancel-fn>)
[_in-step _in-done] (q)
_ (t/is (= :items-step (q)))
Expand Down

0 comments on commit 71b05cc

Please sign in to comment.