From 6f613da8ab2fa0011ecfe91bcd41c8ef1d3cdd62 Mon Sep 17 00:00:00 2001 From: xificurC Date: Tue, 10 Sep 2024 17:55:29 +0200 Subject: [PATCH] enforcer everywhere --- src/contrib/debug.cljc | 9 +- .../electric/impl/mount_point.cljc | 21 ++- src/hyperfiddle/electric/impl/runtime3.cljc | 152 +++++++++--------- src/hyperfiddle/electric3.cljc | 13 +- src/hyperfiddle/incseq.cljc | 60 ++++--- src/hyperfiddle/incseq/items_eager_impl.cljc | 18 ++- 6 files changed, 144 insertions(+), 129 deletions(-) diff --git a/src/contrib/debug.cljc b/src/contrib/debug.cljc index eb560370a..a06ec5d03 100644 --- a/src/contrib/debug.cljc +++ b/src/contrib/debug.cljc @@ -46,12 +46,13 @@ (defn instrument* [nm flow] (fn [n t] (let [id (swap! !id inc) - it (flow #(do (prn nm id :notified) (n)) #(do (prn nm id :terminated) (t)))] + _ (prn nm id 'spawned) + it (flow #(do (prn nm id 'notified) (n)) #(do (prn nm id 'terminated) (t)))] (reify - IFn (#?(:clj invoke :cljs -invoke) [_] (prn nm id :cancelled) (it)) + IFn (#?(:clj invoke :cljs -invoke) [_] (prn nm id 'cancelled) (it)) IDeref (#?(:clj deref :cljs -deref) [_] (let [v (try @it (catch #?(:clj Throwable :cljs :default) e [::ex e]))] - (prn nm id :transferred + (prn nm id 'transferred (if false #_(instance? Failure v) ; FIXME Update to electric v3 (let [e (.-error v)] [(type e) (ex-message e)]) @@ -59,7 +60,7 @@ (if (and (vector? v) (= ::ex (first v))) (throw (second v)) v))))))) -(defmacro instrument [nm & body] `(new (instrument* ~nm (hyperfiddle.electric/fn [] ~@body)))) +(defmacro instrument [nm v] `(hyperfiddle.electric3/input (instrument* ~nm (hyperfiddle.electric3/pure ~v)))) (defmacro js-measure [nm & body] (if (:js-globals &env) diff --git a/src/hyperfiddle/electric/impl/mount_point.cljc b/src/hyperfiddle/electric/impl/mount_point.cljc index 85ab6b5a7..59c03382e 100644 --- a/src/hyperfiddle/electric/impl/mount_point.cljc +++ b/src/hyperfiddle/electric/impl/mount_point.cljc @@ -34,11 +34,13 @@ On call step event : Unmounting a block generates a shrink for each active item having this block's frame as an ancestor. Mounting a block generates a grow for each active item having this block's frame as an ancestor. -" (:require [hyperfiddle.kvs :refer [KVS]] +" (:require [hyperfiddle.kvs :as kvs :refer [KVS]] [hyperfiddle.incseq.arrays-impl :as a] [hyperfiddle.incseq.fixed-impl :as f] [hyperfiddle.incseq.diff-impl :as d] [hyperfiddle.incseq.perm-impl :as p] + [contrib.debug :as dbg] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe] [hyperfiddle.electric.impl.runtime3 :as r]) #?(:clj (:import (clojure.lang IFn IDeref) (java.util.concurrent.locks ReentrantLock)))) @@ -792,9 +794,16 @@ Mounting a block generates a grow for each active item having this block's frame (#?(:clj invoke :cljs -invoke) [_ step done] (reader-spawn state step done))) +(defn fpe [^MountPoint mp] + (let [enforced (fpe/flow "mount-point" mp)] + (reify + KVS (insert! [_ t i] (kvs/insert! mp t i)) (update! [_ t f] (kvs/update! mp t f)) (remove! [_ t] (kvs/remove! mp t)) + IFn (#?(:clj invoke :cljs -invoke) [_ step done] + (enforced step done))))) + (defn create [peer] - (->MountPoint - (doto (object-array slots) - (aset slot-lock #?(:clj (ReentrantLock.) :cljs false)) - (aset slot-peer peer) - (aset slot-items {})))) \ No newline at end of file + (fpe (->MountPoint + (doto (object-array slots) + (aset slot-lock #?(:clj (ReentrantLock.) :cljs false)) + (aset slot-peer peer) + (aset slot-items {}))))) diff --git a/src/hyperfiddle/electric/impl/runtime3.cljc b/src/hyperfiddle/electric/impl/runtime3.cljc index 204f3b315..616a4ea3a 100644 --- a/src/hyperfiddle/electric/impl/runtime3.cljc +++ b/src/hyperfiddle/electric/impl/runtime3.cljc @@ -2,6 +2,8 @@ (:refer-clojure :exclude [resolve]) (:require [hyperfiddle.incseq :as i] [missionary.core :as m] + [contrib.debug :as dbg] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe] [cognitect.transit :as t]) (:import missionary.Cancelled #?(:clj (clojure.lang IFn IDeref)) @@ -105,6 +107,8 @@ (deps [_ rf r site]) ;; emits ports (flow [_])) ;; returns incseq +(defn flow! [expr] (fpe/flow (str expr) (flow expr))) + (defn expr-deps [rf r site expr] (deps expr rf r site)) @@ -116,7 +120,7 @@ (defn failure? [x] (instance? Failure x)) -(defn invariant [x] (m/cp x)) +(defn invariant [x] (fpe/flow "r/invariant" (m/cp x))) (deftype Pure [value ^:unsynchronized-mutable ^:mutable hash-memo] @@ -132,9 +136,9 @@ Expr (deps [_ _ r _] r) (flow [_] - (if (failure? value) - (m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)}))) - (i/fixed (invariant value))))) + (fpe/flow "r/pure" (if (failure? value) + (m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)}))) + (i/fixed (invariant value)))))) (defn pure " -> (EXPR VOID) @@ -176,7 +180,7 @@ T T T -> (EXPR T) (deps [_ rf r site] (reduce (fn [r x] (deps x rf r site)) r inputs)) (flow [_] - (apply i/latest-product invoke (map flow inputs)))) + (fpe/flow "r/ap" (apply i/latest-product invoke (map flow! inputs))))) (defn ap " (EXPR (-> T)) -> (EXPR T) @@ -200,7 +204,7 @@ T T T -> (EXPR T) (= input (.-input ^Join other)))) Expr (deps [_ rf r site] (deps input rf r site)) - (flow [_] (i/latest-concat (flow input)))) + (flow [_] (fpe/flow "r/join" (i/latest-concat (flow! input))))) (defn join " (EXPR (IS T)) -> (EXPR T) @@ -254,8 +258,8 @@ T T T -> (EXPR T) Expr (deps [_ _ r _] r) (flow [_] - (fn [step done] - (step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key))))))) + (fpe/flow "r/unbound" (fn [step done] + (step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key)))))))) (deftype Cdef [frees nodes calls result build]) @@ -517,7 +521,7 @@ T T T -> (EXPR T) (port-deps rf r port) (rf r port)))) (flow [this] - (port-flow (slot-port this)))) + (fpe/flow "r/slot" (port-flow (slot-port this))))) (defn port-slot {:tag Slot} @@ -561,28 +565,30 @@ T T T -> (EXPR T) input)))) (defn input-sub [^objects port] - (fn [step done] - (let [^Slot slot (port-slot port) - ^objects peer (frame-peer (.-frame slot)) - busy (enter peer) - ^objects remote (aget peer peer-slot-remote) - ^objects input (input-check-create remote port) - sub (object-array input-sub-slots)] - (aset sub input-sub-slot-input input) - (aset sub input-sub-slot-step step) - (aset sub input-sub-slot-done done) - (aset sub input-sub-slot-diff (aget input input-slot-diff)) - (when-not (aget input input-slot-frozen) - (if-some [^objects prv (aget input input-slot-subs)] - (let [^objects nxt (aget prv input-sub-slot-next)] - (aset prv input-sub-slot-next sub) - (aset nxt input-sub-slot-prev sub) - (aset sub input-sub-slot-prev prv) - (aset sub input-sub-slot-next nxt)) - (do (aset input input-slot-subs sub) - (aset sub input-sub-slot-prev sub) - (aset sub input-sub-slot-next sub)))) - (exit peer busy) (step) (->InputSub sub)))) + (dbg/instrument* 'input-sub + (fpe/flow "r/input-sub" + (fn [step done] + (let [^Slot slot (port-slot port) + ^objects peer (frame-peer (.-frame slot)) + busy (enter peer) + ^objects remote (aget peer peer-slot-remote) + ^objects input (input-check-create remote port) + sub (object-array input-sub-slots)] + (aset sub input-sub-slot-input input) + (aset sub input-sub-slot-step step) + (aset sub input-sub-slot-done done) + (aset sub input-sub-slot-diff (aget input input-slot-diff)) + (when-not (aget input input-slot-frozen) + (if-some [^objects prv (aget input input-slot-subs)] + (let [^objects nxt (aget prv input-sub-slot-next)] + (aset prv input-sub-slot-next sub) + (aset nxt input-sub-slot-prev sub) + (aset sub input-sub-slot-prev prv) + (aset sub input-sub-slot-next nxt)) + (do (aset input input-slot-subs sub) + (aset sub input-sub-slot-prev sub) + (aset sub input-sub-slot-next sub)))) + (exit peer busy) (step) (->InputSub sub)))))) (defn make-port [^Slot slot site deps flow] (let [port (object-array port-slots) @@ -1108,36 +1114,36 @@ T T T -> (EXPR T) (defn remote-handler [opts ^objects peer] (fn [events] - (fn [step done] - (let [busy (enter peer) - ^objects remote (aget peer peer-slot-remote)] - (if (nil? (aget remote remote-slot-channel)) - (let [channel (object-array channel-slots)] - (aset remote remote-slot-channel channel) - (aset channel channel-slot-remote remote) - (aset channel channel-slot-step step) - (aset channel channel-slot-done done) - (aset channel channel-slot-busy true) - (aset channel channel-slot-over false) - (aset channel channel-slot-events events) - (aset channel channel-slot-alive (identity 1)) - (aset channel channel-slot-shared {}) - (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) - (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) - (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) - (aset peer peer-slot-channel-ready channel) - (aset channel channel-slot-process - ((aget remote remote-slot-events) - #(let [^objects remote (aget channel channel-slot-remote)] - (channel-ready channel (enter (aget remote remote-slot-peer)))) - #(let [^objects remote (aget channel channel-slot-remote)] - (aset channel channel-slot-over true) - (channel-ready channel (enter (aget remote remote-slot-peer)))))) - (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) - (channel-ready channel busy) - (->Channel channel)) - (do (exit peer busy) (step) - (->Failer done (error "Can't connect - remote already up.")))))))) + (fpe/flow "r/remote-handler" (fn [step done] + (let [busy (enter peer) + ^objects remote (aget peer peer-slot-remote)] + (if (nil? (aget remote remote-slot-channel)) + (let [channel (object-array channel-slots)] + (aset remote remote-slot-channel channel) + (aset channel channel-slot-remote remote) + (aset channel channel-slot-step step) + (aset channel channel-slot-done done) + (aset channel channel-slot-busy true) + (aset channel channel-slot-over false) + (aset channel channel-slot-events events) + (aset channel channel-slot-alive (identity 1)) + (aset channel channel-slot-shared {}) + (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) + (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) + (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) + (aset peer peer-slot-channel-ready channel) + (aset channel channel-slot-process + ((aget remote remote-slot-events) + #(let [^objects remote (aget channel channel-slot-remote)] + (channel-ready channel (enter (aget remote remote-slot-peer)))) + #(let [^objects remote (aget channel channel-slot-remote)] + (aset channel channel-slot-over true) + (channel-ready channel (enter (aget remote remote-slot-peer)))))) + (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) + (channel-ready channel busy) + (->Channel channel)) + (do (exit peer busy) (step) + (->Failer done (error "Can't connect - remote already up."))))))))) (defn input-toggle-event [^objects input] (let [^objects remote (aget input input-slot-remote)] @@ -1169,17 +1175,17 @@ T T T -> (EXPR T) (exit peer busy))) (defn with-dep [flow ^objects port] - (fn [step done] - (dep-attach port) - (flow step - #(do (dep-detach port) - (done))))) + (fpe/flow "r/with-dep" (fn [step done] + (dep-attach port) + ((fpe/flow "r/with-dep input" flow) step + #(do (dep-detach port) + (done)))))) ;; is expr always a slot ? if true, we can specialize to ports (deftype Incseq [peer expr] IFn (#?(:clj invoke :cljs -invoke) [_ step done] - ((deps expr with-dep (flow expr) (peer-site peer)) step done))) + ((fpe/flow "r/incseq" (deps expr with-dep (flow expr) (peer-site peer))) step done))) (defn incseq-expr [^Incseq incseq] (.-expr incseq)) @@ -1211,12 +1217,12 @@ T T T -> (EXPR T) (aset call call-slot-port (make-port slot site (deps expr update-inc {} site) - (i/latest-product - (fn [ctor] - (let [rank (aget call call-slot-rank) - frame (make-frame peer slot rank site ctor)] - (aset call call-slot-rank (inc rank)) frame)) - (flow expr)))) + (fpe/flow "r/create-call" (i/latest-product + (fn [ctor] + (let [rank (aget call call-slot-rank) + frame (make-frame peer slot rank site ctor)] + (aset call call-slot-rank (inc rank)) frame)) + (flow expr))))) (aset call call-slot-rank (identity 0)) call)) diff --git a/src/hyperfiddle/electric3.cljc b/src/hyperfiddle/electric3.cljc index dd9000256..e88ec359d 100644 --- a/src/hyperfiddle/electric3.cljc +++ b/src/hyperfiddle/electric3.cljc @@ -12,7 +12,8 @@ #?(:clj [fipp.edn]) [missionary.core :as m] [contrib.missionary-contrib :as mx] - [clojure.math :as math]) + [clojure.math :as math] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe]) (:import [missionary Cancelled]) #?(:cljs (:require-macros hyperfiddle.electric3))) @@ -430,9 +431,9 @@ inhibiting all further reactive updates." (hyperfiddle.electric3/defn SystemTimeSecs [] (math/floor-div (input system-time-ms) 1000)) (cc/defn uf->is [uf] - (m/ap (m/amb (i/empty-diff 0) - (let [!first (atom true) v (m/?> uf)] - (assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v}))))) + (fpe/flow "uf->is" (m/ap (m/amb (i/empty-diff 0) + (let [!first (atom true) v (m/?> uf)] + (assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v})))))) (cc/letfn [(task->is [t] (uf->is (m/ap (m/? t)))) (initialized [t init-v] (m/relieve {} (m/ap (m/amb= init-v (m/? t)))))] @@ -441,8 +442,8 @@ inhibiting all further reactive updates." ([t init-v] (input (initialized t init-v))))) #?(:clj (cc/defn -offload [tsk executor] - (uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk)))) - (catch Cancelled _ (m/amb))))))) + (fpe/flow "offload" (uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk)))) + (catch Cancelled _ (m/amb)))))))) (hyperfiddle.electric3/defn Offload diff --git a/src/hyperfiddle/incseq.cljc b/src/hyperfiddle/incseq.cljc index 9a75c487d..c08133b29 100644 --- a/src/hyperfiddle/incseq.cljc +++ b/src/hyperfiddle/incseq.cljc @@ -45,7 +45,9 @@ successive sequence diffs. Incremental sequences are applicative functors with ` [hyperfiddle.incseq.latest-concat-impl :as lc] [hyperfiddle.rcf :refer [tests]] [clojure.core :as cc] - [missionary.core :as m]) + [missionary.core :as m] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe] + [contrib.debug :as dbg]) (:import #?(:clj (clojure.lang IFn IDeref)) missionary.Cancelled)) @@ -224,19 +226,19 @@ Returns a flow producing the successive diffs of given continuous flow of collec (if (aset state slot-busy (not (aget state slot-busy))) (recur) nop))))) nop)))) (scan [ctor flow] - (fn [n t] - (let [state (object-array slots)] - (aset state slot-notifier n) - (aset state slot-terminator t) - (aset state slot-stepfn (ctor)) - (aset state slot-busy false) - (aset state slot-done false) - (aset state slot-process - (flow #(ready state) - #(do (aset state slot-done true) - (ready state)))) - (->Ps state cancel transfer))))] - (fn [kf flow] (scan #(->seq-differ kf) flow))))) + (fpe/flow "i/diff-by" (fn [n t] + (let [state (object-array slots)] + (aset state slot-notifier n) + (aset state slot-terminator t) + (aset state slot-stepfn (ctor)) + (aset state slot-busy false) + (aset state slot-done false) + (aset state slot-process + (flow #(ready state) + #(do (aset state slot-done true) + (ready state)))) + (->Ps state cancel transfer)))))] + (fn [kf flow] (scan #(->seq-differ kf) (fpe/flow "i/diff-by input" flow)))))) (def ^{:doc " @@ -248,24 +250,12 @@ Returns the diff applying given diffs successively. "} combine d/combine) -(def ^{:doc " -Returns the incremental sequence defined by the fixed collection of given continuous flows. -A collection is fixed iff its size is invariant and its items are immobile. -"} fixed f/flow) - +(defn fixed [& flows] (fpe/flow "i/fixed" (apply f/flow (into [] (map-indexed (fn [i flow] (fpe/flow (str "i/fixed input" i) flow))) flows)))) -(def ^{:arglists '([f & incseqs]) - :doc " -Returns the incremental sequence defined by applying the cartesian product of items in given incremental sequences, -combined with given function. -"} latest-product lp/flow) +(defn latest-product [f & incseqs] + (fpe/flow (apply lp/flow f (into [] (map-indexed (fn [i flow] (fpe/flow (str "i/latest-product child " i) flow))) incseqs)))) - -(def ^{:arglists '([incseq-of-incseqs]) - :doc " -Returns the incremental sequence defined by the concatenation of incremental sequences defined by given incremental -sequence. -"} latest-concat lc/flow) +(defn latest-concat [is-of-is] (fpe/flow "i/latest-concat" (lc/flow (fpe/flow "i/latest-concat child" is-of-is)))) (def ^{:arglists '([] [sentinel] [sentinel compare]) @@ -277,7 +267,7 @@ The map is mutated by calling the port with 3 arguments : a key, a reducing func associated with the key will be updated with the result of calling the reducing function with the current state and the argument. Absence of entry is indicated by optional `sentinel` value, `nil` by default. Keys must be comparable with optional `compare` function, `clojure.core/compare` by default. -"} spine +"} spine- (let [slot-root 0 slot-readers 1 slots 2 @@ -564,7 +554,13 @@ optional `compare` function, `clojure.core/compare` by default. :change {0 curr} :freeze #{}}))))))))))))))) -(def ^{:arglists '([incseq])} items i/flow) +(defn spine [& args] + (let [s (apply spine- args), fpe (fpe/flow "i/spine" s)] + (fn + ([n t] (fpe n t)) + ([k f arg] (s k f arg))))) + +(defn items [incseq] (dbg/instrument* "i/items" (fpe/flow "i/items" (i/flow (dbg/instrument* "i/items input" (fpe/flow "i/items input" incseq)))))) (def ^{:arglists '([incseq]) :doc " diff --git a/src/hyperfiddle/incseq/items_eager_impl.cljc b/src/hyperfiddle/incseq/items_eager_impl.cljc index 8a1447ea8..dac9f3806 100644 --- a/src/hyperfiddle/incseq/items_eager_impl.cljc +++ b/src/hyperfiddle/incseq/items_eager_impl.cljc @@ -4,7 +4,8 @@ [clojure.set :as set] [hyperfiddle.electric.impl.array-fields :as a] [hyperfiddle.incseq.diff-impl :as d] - [hyperfiddle.incseq.perm-impl :as p]) + [hyperfiddle.incseq.perm-impl :as p] + [hyperfiddle.incseq.flow-protocol-enforcer :as fpe]) (:import #?(:clj [clojure.lang IDeref IFn]) #?(:clj [java.util.concurrent.atomic AtomicLong]) #?(:clj [java.util.concurrent.locks ReentrantLock]) @@ -106,13 +107,14 @@ (a/fset item -ps* (atom #{})) (a/set (a/fget ps -item*) i item) (a/fswap ps -diff update :change assoc (idx i i) - (a/fset item -flow (fn [step done] - (if (a/fget item -dead) - (->dead-item-ps step done (a/fget item -v)) - (let [item-ps (->item-ps item step done)] - (swap! (a/fget item -ps*) conj item-ps) - (item-ps (a/fget item -v)) - item-ps))))))) + (a/fset item -flow (fpe/flow (str "i/items child " (idx i i)) + (fn [step done] + (if (a/fget item -dead) + (->dead-item-ps step done (a/fget item -v)) + (let [item-ps (->item-ps item step done)] + (swap! (a/fget item -ps*) conj item-ps) + (item-ps (a/fget item -v)) + item-ps)))))))) (range (- d n) d)))) (defn permute! [^Ps ps {p :permutation}]