From cd4e2457ade224103b67e2197ac8a4f62a4c7718 Mon Sep 17 00:00:00 2001 From: xificurC Date: Tue, 10 Sep 2024 17:55:29 +0200 Subject: [PATCH] enforcer everywhere --- src/hyperfiddle/electric/impl/runtime3.cljc | 149 +++++----- src/hyperfiddle/electric3.cljc | 13 +- src/hyperfiddle/incseq.cljc | 274 +++++++++---------- src/hyperfiddle/incseq/items_eager_impl.cljc | 18 +- 4 files changed, 225 insertions(+), 229 deletions(-) diff --git a/src/hyperfiddle/electric/impl/runtime3.cljc b/src/hyperfiddle/electric/impl/runtime3.cljc index 204f3b315..8c07af01d 100644 --- a/src/hyperfiddle/electric/impl/runtime3.cljc +++ b/src/hyperfiddle/electric/impl/runtime3.cljc @@ -2,6 +2,7 @@ (:refer-clojure :exclude [resolve]) (:require [hyperfiddle.incseq :as i] [missionary.core :as m] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe] [cognitect.transit :as t]) (:import missionary.Cancelled #?(:clj (clojure.lang IFn IDeref)) @@ -105,6 +106,8 @@ (deps [_ rf r site]) ;; emits ports (flow [_])) ;; returns incseq +(defn flow! [expr] (fpe/flow (str expr) (flow expr))) + (defn expr-deps [rf r site expr] (deps expr rf r site)) @@ -116,7 +119,7 @@ (defn failure? [x] (instance? Failure x)) -(defn invariant [x] (m/cp x)) +(defn invariant [x] (fpe/flow "r/invariant" (m/cp x))) (deftype Pure [value ^:unsynchronized-mutable ^:mutable hash-memo] @@ -132,9 +135,9 @@ Expr (deps [_ _ r _] r) (flow [_] - (if (failure? value) - (m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)}))) - (i/fixed (invariant value))))) + (fpe/flow "r/pure" (if (failure? value) + (m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)}))) + (i/fixed (invariant value)))))) (defn pure " -> (EXPR VOID) @@ -176,7 +179,7 @@ T T T -> (EXPR T) (deps [_ rf r site] (reduce (fn [r x] (deps x rf r site)) r inputs)) (flow [_] - (apply i/latest-product invoke (map flow inputs)))) + (fpe/flow "r/ap" (apply i/latest-product invoke (map flow! inputs))))) (defn ap " (EXPR (-> T)) -> (EXPR T) @@ -200,7 +203,7 @@ T T T -> (EXPR T) (= input (.-input ^Join other)))) Expr (deps [_ rf r site] (deps input rf r site)) - (flow [_] (i/latest-concat (flow input)))) + (flow [_] (fpe/flow "r/join" (i/latest-concat (flow! input))))) (defn join " (EXPR (IS T)) -> (EXPR T) @@ -254,8 +257,8 @@ T T T -> (EXPR T) Expr (deps [_ _ r _] r) (flow [_] - (fn [step done] - (step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key))))))) + (fpe/flow "r/unbound" (fn [step done] + (step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key)))))))) (deftype Cdef [frees nodes calls result build]) @@ -517,7 +520,7 @@ T T T -> (EXPR T) (port-deps rf r port) (rf r port)))) (flow [this] - (port-flow (slot-port this)))) + (fpe/flow "r/slot" (port-flow (slot-port this))))) (defn port-slot {:tag Slot} @@ -561,28 +564,28 @@ T T T -> (EXPR T) input)))) (defn input-sub [^objects port] - (fn [step done] - (let [^Slot slot (port-slot port) - ^objects peer (frame-peer (.-frame slot)) - busy (enter peer) - ^objects remote (aget peer peer-slot-remote) - ^objects input (input-check-create remote port) - sub (object-array input-sub-slots)] - (aset sub input-sub-slot-input input) - (aset sub input-sub-slot-step step) - (aset sub input-sub-slot-done done) - (aset sub input-sub-slot-diff (aget input input-slot-diff)) - (when-not (aget input input-slot-frozen) - (if-some [^objects prv (aget input input-slot-subs)] - (let [^objects nxt (aget prv input-sub-slot-next)] - (aset prv input-sub-slot-next sub) - (aset nxt input-sub-slot-prev sub) - (aset sub input-sub-slot-prev prv) - (aset sub input-sub-slot-next nxt)) - (do (aset input input-slot-subs sub) - (aset sub input-sub-slot-prev sub) - (aset sub input-sub-slot-next sub)))) - (exit peer busy) (step) (->InputSub sub)))) + (fpe/flow "r/input-sub" (fn [step done] + (let [^Slot slot (port-slot port) + ^objects peer (frame-peer (.-frame slot)) + busy (enter peer) + ^objects remote (aget peer peer-slot-remote) + ^objects input (input-check-create remote port) + sub (object-array input-sub-slots)] + (aset sub input-sub-slot-input input) + (aset sub input-sub-slot-step step) + (aset sub input-sub-slot-done done) + (aset sub input-sub-slot-diff (aget input input-slot-diff)) + (when-not (aget input input-slot-frozen) + (if-some [^objects prv (aget input input-slot-subs)] + (let [^objects nxt (aget prv input-sub-slot-next)] + (aset prv input-sub-slot-next sub) + (aset nxt input-sub-slot-prev sub) + (aset sub input-sub-slot-prev prv) + (aset sub input-sub-slot-next nxt)) + (do (aset input input-slot-subs sub) + (aset sub input-sub-slot-prev sub) + (aset sub input-sub-slot-next sub)))) + (exit peer busy) (step) (->InputSub sub))))) (defn make-port [^Slot slot site deps flow] (let [port (object-array port-slots) @@ -1108,36 +1111,36 @@ T T T -> (EXPR T) (defn remote-handler [opts ^objects peer] (fn [events] - (fn [step done] - (let [busy (enter peer) - ^objects remote (aget peer peer-slot-remote)] - (if (nil? (aget remote remote-slot-channel)) - (let [channel (object-array channel-slots)] - (aset remote remote-slot-channel channel) - (aset channel channel-slot-remote remote) - (aset channel channel-slot-step step) - (aset channel channel-slot-done done) - (aset channel channel-slot-busy true) - (aset channel channel-slot-over false) - (aset channel channel-slot-events events) - (aset channel channel-slot-alive (identity 1)) - (aset channel channel-slot-shared {}) - (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) - (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) - (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) - (aset peer peer-slot-channel-ready channel) - (aset channel channel-slot-process - ((aget remote remote-slot-events) - #(let [^objects remote (aget channel channel-slot-remote)] - (channel-ready channel (enter (aget remote remote-slot-peer)))) - #(let [^objects remote (aget channel channel-slot-remote)] - (aset channel channel-slot-over true) - (channel-ready channel (enter (aget remote remote-slot-peer)))))) - (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) - (channel-ready channel busy) - (->Channel channel)) - (do (exit peer busy) (step) - (->Failer done (error "Can't connect - remote already up.")))))))) + (fpe/flow "r/remote-handler" (fn [step done] + (let [busy (enter peer) + ^objects remote (aget peer peer-slot-remote)] + (if (nil? (aget remote remote-slot-channel)) + (let [channel (object-array channel-slots)] + (aset remote remote-slot-channel channel) + (aset channel channel-slot-remote remote) + (aset channel channel-slot-step step) + (aset channel channel-slot-done done) + (aset channel channel-slot-busy true) + (aset channel channel-slot-over false) + (aset channel channel-slot-events events) + (aset channel channel-slot-alive (identity 1)) + (aset channel channel-slot-shared {}) + (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) + (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) + (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) + (aset peer peer-slot-channel-ready channel) + (aset channel channel-slot-process + ((aget remote remote-slot-events) + #(let [^objects remote (aget channel channel-slot-remote)] + (channel-ready channel (enter (aget remote remote-slot-peer)))) + #(let [^objects remote (aget channel channel-slot-remote)] + (aset channel channel-slot-over true) + (channel-ready channel (enter (aget remote remote-slot-peer)))))) + (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) + (channel-ready channel busy) + (->Channel channel)) + (do (exit peer busy) (step) + (->Failer done (error "Can't connect - remote already up."))))))))) (defn input-toggle-event [^objects input] (let [^objects remote (aget input input-slot-remote)] @@ -1169,17 +1172,17 @@ T T T -> (EXPR T) (exit peer busy))) (defn with-dep [flow ^objects port] - (fn [step done] - (dep-attach port) - (flow step - #(do (dep-detach port) - (done))))) + (fpe/flow "r/with-dep" (fn [step done] + (dep-attach port) + ((fpe/flow "r/with-dep input" flow) step + #(do (dep-detach port) + (done)))))) ;; is expr always a slot ? if true, we can specialize to ports (deftype Incseq [peer expr] IFn (#?(:clj invoke :cljs -invoke) [_ step done] - ((deps expr with-dep (flow expr) (peer-site peer)) step done))) + ((fpe/flow "r/incseq" (deps expr with-dep (flow expr) (peer-site peer))) step done))) (defn incseq-expr [^Incseq incseq] (.-expr incseq)) @@ -1211,12 +1214,12 @@ T T T -> (EXPR T) (aset call call-slot-port (make-port slot site (deps expr update-inc {} site) - (i/latest-product - (fn [ctor] - (let [rank (aget call call-slot-rank) - frame (make-frame peer slot rank site ctor)] - (aset call call-slot-rank (inc rank)) frame)) - (flow expr)))) + (fpe/flow "r/create-call" (i/latest-product + (fn [ctor] + (let [rank (aget call call-slot-rank) + frame (make-frame peer slot rank site ctor)] + (aset call call-slot-rank (inc rank)) frame)) + (flow expr))))) (aset call call-slot-rank (identity 0)) call)) diff --git a/src/hyperfiddle/electric3.cljc b/src/hyperfiddle/electric3.cljc index dd9000256..e88ec359d 100644 --- a/src/hyperfiddle/electric3.cljc +++ b/src/hyperfiddle/electric3.cljc @@ -12,7 +12,8 @@ #?(:clj [fipp.edn]) [missionary.core :as m] [contrib.missionary-contrib :as mx] - [clojure.math :as math]) + [clojure.math :as math] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe]) (:import [missionary Cancelled]) #?(:cljs (:require-macros hyperfiddle.electric3))) @@ -430,9 +431,9 @@ inhibiting all further reactive updates." (hyperfiddle.electric3/defn SystemTimeSecs [] (math/floor-div (input system-time-ms) 1000)) (cc/defn uf->is [uf] - (m/ap (m/amb (i/empty-diff 0) - (let [!first (atom true) v (m/?> uf)] - (assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v}))))) + (fpe/flow "uf->is" (m/ap (m/amb (i/empty-diff 0) + (let [!first (atom true) v (m/?> uf)] + (assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v})))))) (cc/letfn [(task->is [t] (uf->is (m/ap (m/? t)))) (initialized [t init-v] (m/relieve {} (m/ap (m/amb= init-v (m/? t)))))] @@ -441,8 +442,8 @@ inhibiting all further reactive updates." ([t init-v] (input (initialized t init-v))))) #?(:clj (cc/defn -offload [tsk executor] - (uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk)))) - (catch Cancelled _ (m/amb))))))) + (fpe/flow "offload" (uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk)))) + (catch Cancelled _ (m/amb)))))))) (hyperfiddle.electric3/defn Offload diff --git a/src/hyperfiddle/incseq.cljc b/src/hyperfiddle/incseq.cljc index 9a75c487d..1009dc0d7 100644 --- a/src/hyperfiddle/incseq.cljc +++ b/src/hyperfiddle/incseq.cljc @@ -45,7 +45,9 @@ successive sequence diffs. Incremental sequences are applicative functors with ` [hyperfiddle.incseq.latest-concat-impl :as lc] [hyperfiddle.rcf :refer [tests]] [clojure.core :as cc] - [missionary.core :as m]) + [missionary.core :as m] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe] + [contrib.debug :as dbg]) (:import #?(:clj (clojure.lang IFn IDeref)) missionary.Cancelled)) @@ -224,19 +226,19 @@ Returns a flow producing the successive diffs of given continuous flow of collec (if (aset state slot-busy (not (aget state slot-busy))) (recur) nop))))) nop)))) (scan [ctor flow] - (fn [n t] - (let [state (object-array slots)] - (aset state slot-notifier n) - (aset state slot-terminator t) - (aset state slot-stepfn (ctor)) - (aset state slot-busy false) - (aset state slot-done false) - (aset state slot-process - (flow #(ready state) - #(do (aset state slot-done true) - (ready state)))) - (->Ps state cancel transfer))))] - (fn [kf flow] (scan #(->seq-differ kf) flow))))) + (fpe/flow "i/diff-by" (fn [n t] + (let [state (object-array slots)] + (aset state slot-notifier n) + (aset state slot-terminator t) + (aset state slot-stepfn (ctor)) + (aset state slot-busy false) + (aset state slot-done false) + (aset state slot-process + (flow #(ready state) + #(do (aset state slot-done true) + (ready state)))) + (->Ps state cancel transfer)))))] + (fn [kf flow] (scan #(->seq-differ kf) (fpe/flow "i/diff-by input" flow)))))) (def ^{:doc " @@ -248,24 +250,12 @@ Returns the diff applying given diffs successively. "} combine d/combine) -(def ^{:doc " -Returns the incremental sequence defined by the fixed collection of given continuous flows. -A collection is fixed iff its size is invariant and its items are immobile. -"} fixed f/flow) - - -(def ^{:arglists '([f & incseqs]) - :doc " -Returns the incremental sequence defined by applying the cartesian product of items in given incremental sequences, -combined with given function. -"} latest-product lp/flow) +(defn fixed [& flows] (fpe/flow "i/fixed" (apply f/flow (into [] (map-indexed (fn [i flow] (fpe/flow (str "i/fixed input" i) flow))) flows)))) +(defn latest-product [f & incseqs] + (fpe/flow (apply lp/flow f (into [] (map-indexed (fn [i flow] (fpe/flow (str "i/latest-product child " i) flow))) incseqs)))) -(def ^{:arglists '([incseq-of-incseqs]) - :doc " -Returns the incremental sequence defined by the concatenation of incremental sequences defined by given incremental -sequence. -"} latest-concat lc/flow) +(defn latest-concat [is-of-is] (fpe/flow "i/latest-concat" (lc/flow (fpe/flow "i/latest-concat child" is-of-is)))) (def ^{:arglists '([] [sentinel] [sentinel compare]) @@ -451,120 +441,120 @@ optional `compare` function, `clojure.core/compare` by default. ([sentinel] (spine sentinel compare)) ([sentinel compare] (let [state (object-array slots)] - (fn - ([n t] - (let [ps (locking state - (let [c (traverse assoc-count {} (aget state slot-root)) - r (object-array reader-slots)] - (if-some [^objects p (aget state slot-readers)] - (let [^objects n (aget p reader-slot-next)] - (aset r reader-slot-prev p) - (aset p reader-slot-next r) - (aset r reader-slot-next n) - (aset n reader-slot-prev r)) - (do (aset r reader-slot-prev r) - (aset r reader-slot-next r) - (aset state slot-readers r))) - (aset r reader-slot-parent state) - (aset r reader-slot-notifier n) - (aset r reader-slot-terminator t) - (aset r reader-slot-diff - {:grow (cc/count c) - :degree (cc/count c) + (fpe/flow "i/spine" (fn + ([n t] + (let [ps (locking state + (let [c (traverse assoc-count {} (aget state slot-root)) + r (object-array reader-slots)] + (if-some [^objects p (aget state slot-readers)] + (let [^objects n (aget p reader-slot-next)] + (aset r reader-slot-prev p) + (aset p reader-slot-next r) + (aset r reader-slot-next n) + (aset n reader-slot-prev r)) + (do (aset r reader-slot-prev r) + (aset r reader-slot-next r) + (aset state slot-readers r))) + (aset r reader-slot-parent state) + (aset r reader-slot-notifier n) + (aset r reader-slot-terminator t) + (aset r reader-slot-diff + {:grow (cc/count c) + :degree (cc/count c) + :shrink 0 + :permutation {} + :freeze #{} + :change c}) + (->Ps r reader-cancel reader-transfer)))] + (n) ps)) + ([k f arg] + (propagate + (locking state + (if-some [root (aget state slot-root)] + (let [size (aget root node-slot-size)] + (loop [^objects node root + rank 0] + (let [c (compare k (aget node node-slot-key))] + (if (neg? c) + (if-some [l (aget node 0)] + (recur l rank) + (let [curr (f sentinel arg)] + (when-not (= curr sentinel) + (insert-fixup state node 0 k curr) + (when-some [readers (aget state slot-readers)] + (notify-readers readers + {:grow 1 + :degree (inc size) + :shrink 0 + :permutation (rotation size rank) + :change {rank curr} + :freeze #{}}))))) + (if (pos? c) + (let [rank (unchecked-add-int rank (aget node node-slot-size))] + (if-some [r (aget node 1)] + (recur r (unchecked-subtract-int rank (aget r node-slot-size))) + (let [curr (f sentinel arg)] + (when-not (= curr sentinel) + (insert-fixup state node 1 k curr) + (when-some [readers (aget state slot-readers)] + (notify-readers readers + {:grow 1 + :degree (inc size) + :shrink 0 + :permutation (rotation size rank) + :change {rank curr} + :freeze #{}})))))) + (let [^objects l (aget node 0) + rank (if (nil? l) rank (unchecked-add-int rank (aget l node-slot-size))) + prev (aget node node-slot-value) + curr (f prev arg)] + (if (= curr sentinel) + (do (if (nil? l) + (remove-noleft state node) + (if-some [y (aget node 1)] + (loop [^objects y y] + (if-some [y (aget y 0)] + (recur y) + (do (aset node node-slot-key (aget y node-slot-key)) + (aset node node-slot-value (aget y node-slot-value)) + (remove-noleft state y)))) + (remove-single state node l))) + (when-some [readers (aget state slot-readers)] + (notify-readers readers + {:grow 0 + :degree size + :shrink 1 + :permutation (rotation rank (dec size)) + :change {} + :freeze #{}}))) + (when-not (= prev curr) + (aset node node-slot-value curr) + (when-some [readers (aget state slot-readers)] + (notify-readers readers + {:grow 0 + :degree size + :shrink 0 + :permutation {} + :change {rank curr} + :freeze #{}})))))))))) + (let [curr (f sentinel arg)] + (when-not (= curr sentinel) + (aset state slot-root + (doto (object-array node-slots) + (aset node-slot-key k) + (aset node-slot-size 1) + (aset node-slot-red? false) + (aset node-slot-value curr))) + (when-some [readers (aget state slot-readers)] + (notify-readers readers + {:grow 1 + :degree 1 :shrink 0 :permutation {} - :freeze #{} - :change c}) - (->Ps r reader-cancel reader-transfer)))] - (n) ps)) - ([k f arg] - (propagate - (locking state - (if-some [root (aget state slot-root)] - (let [size (aget root node-slot-size)] - (loop [^objects node root - rank 0] - (let [c (compare k (aget node node-slot-key))] - (if (neg? c) - (if-some [l (aget node 0)] - (recur l rank) - (let [curr (f sentinel arg)] - (when-not (= curr sentinel) - (insert-fixup state node 0 k curr) - (when-some [readers (aget state slot-readers)] - (notify-readers readers - {:grow 1 - :degree (inc size) - :shrink 0 - :permutation (rotation size rank) - :change {rank curr} - :freeze #{}}))))) - (if (pos? c) - (let [rank (unchecked-add-int rank (aget node node-slot-size))] - (if-some [r (aget node 1)] - (recur r (unchecked-subtract-int rank (aget r node-slot-size))) - (let [curr (f sentinel arg)] - (when-not (= curr sentinel) - (insert-fixup state node 1 k curr) - (when-some [readers (aget state slot-readers)] - (notify-readers readers - {:grow 1 - :degree (inc size) - :shrink 0 - :permutation (rotation size rank) - :change {rank curr} - :freeze #{}})))))) - (let [^objects l (aget node 0) - rank (if (nil? l) rank (unchecked-add-int rank (aget l node-slot-size))) - prev (aget node node-slot-value) - curr (f prev arg)] - (if (= curr sentinel) - (do (if (nil? l) - (remove-noleft state node) - (if-some [y (aget node 1)] - (loop [^objects y y] - (if-some [y (aget y 0)] - (recur y) - (do (aset node node-slot-key (aget y node-slot-key)) - (aset node node-slot-value (aget y node-slot-value)) - (remove-noleft state y)))) - (remove-single state node l))) - (when-some [readers (aget state slot-readers)] - (notify-readers readers - {:grow 0 - :degree size - :shrink 1 - :permutation (rotation rank (dec size)) - :change {} - :freeze #{}}))) - (when-not (= prev curr) - (aset node node-slot-value curr) - (when-some [readers (aget state slot-readers)] - (notify-readers readers - {:grow 0 - :degree size - :shrink 0 - :permutation {} - :change {rank curr} - :freeze #{}})))))))))) - (let [curr (f sentinel arg)] - (when-not (= curr sentinel) - (aset state slot-root - (doto (object-array node-slots) - (aset node-slot-key k) - (aset node-slot-size 1) - (aset node-slot-red? false) - (aset node-slot-value curr))) - (when-some [readers (aget state slot-readers)] - (notify-readers readers - {:grow 1 - :degree 1 - :shrink 0 - :permutation {} - :change {0 curr} - :freeze #{}}))))))))))))))) - -(def ^{:arglists '([incseq])} items i/flow) + :change {0 curr} + :freeze #{}})))))))))))))))) + +(defn items [incseq] (fpe/flow "i/items" (i/flow (fpe/flow "i/items input" incseq)))) (def ^{:arglists '([incseq]) :doc " diff --git a/src/hyperfiddle/incseq/items_eager_impl.cljc b/src/hyperfiddle/incseq/items_eager_impl.cljc index 8a1447ea8..dac9f3806 100644 --- a/src/hyperfiddle/incseq/items_eager_impl.cljc +++ b/src/hyperfiddle/incseq/items_eager_impl.cljc @@ -4,7 +4,8 @@ [clojure.set :as set] [hyperfiddle.electric.impl.array-fields :as a] [hyperfiddle.incseq.diff-impl :as d] - [hyperfiddle.incseq.perm-impl :as p]) + [hyperfiddle.incseq.perm-impl :as p] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe]) (:import #?(:clj [clojure.lang IDeref IFn]) #?(:clj [java.util.concurrent.atomic AtomicLong]) #?(:clj [java.util.concurrent.locks ReentrantLock]) @@ -106,13 +107,14 @@ (a/fset item -ps* (atom #{})) (a/set (a/fget ps -item*) i item) (a/fswap ps -diff update :change assoc (idx i i) - (a/fset item -flow (fn [step done] - (if (a/fget item -dead) - (->dead-item-ps step done (a/fget item -v)) - (let [item-ps (->item-ps item step done)] - (swap! (a/fget item -ps*) conj item-ps) - (item-ps (a/fget item -v)) - item-ps))))))) + (a/fset item -flow (fpe/flow (str "i/items child " (idx i i)) + (fn [step done] + (if (a/fget item -dead) + (->dead-item-ps step done (a/fget item -v)) + (let [item-ps (->item-ps item step done)] + (swap! (a/fget item -ps*) conj item-ps) + (item-ps (a/fget item -v)) + item-ps)))))))) (range (- d n) d)))) (defn permute! [^Ps ps {p :permutation}]