Skip to content

Commit

Permalink
enforcer everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
xificurC committed Sep 12, 2024
1 parent 9f3aa31 commit 6f613da
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 129 deletions.
9 changes: 5 additions & 4 deletions src/contrib/debug.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,21 @@
(defn instrument* [nm flow]
(fn [n t]
(let [id (swap! !id inc)
it (flow #(do (prn nm id :notified) (n)) #(do (prn nm id :terminated) (t)))]
_ (prn nm id 'spawned)
it (flow #(do (prn nm id 'notified) (n)) #(do (prn nm id 'terminated) (t)))]
(reify
IFn (#?(:clj invoke :cljs -invoke) [_] (prn nm id :cancelled) (it))
IFn (#?(:clj invoke :cljs -invoke) [_] (prn nm id 'cancelled) (it))
IDeref (#?(:clj deref :cljs -deref) [_]
(let [v (try @it (catch #?(:clj Throwable :cljs :default) e [::ex e]))]
(prn nm id :transferred
(prn nm id 'transferred
(if false #_(instance? Failure v) ; FIXME Update to electric v3
(let [e (.-error v)]
[(type e) (ex-message e)])
v))
(if (and (vector? v) (= ::ex (first v)))
(throw (second v))
v)))))))
(defmacro instrument [nm & body] `(new (instrument* ~nm (hyperfiddle.electric/fn [] ~@body))))
(defmacro instrument [nm v] `(hyperfiddle.electric3/input (instrument* ~nm (hyperfiddle.electric3/pure ~v))))

(defmacro js-measure [nm & body]
(if (:js-globals &env)
Expand Down
21 changes: 15 additions & 6 deletions src/hyperfiddle/electric/impl/mount_point.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ On call step event :
Unmounting a block generates a shrink for each active item having this block's frame as an ancestor.
Mounting a block generates a grow for each active item having this block's frame as an ancestor.
" (:require [hyperfiddle.kvs :refer [KVS]]
" (:require [hyperfiddle.kvs :as kvs :refer [KVS]]
[hyperfiddle.incseq.arrays-impl :as a]
[hyperfiddle.incseq.fixed-impl :as f]
[hyperfiddle.incseq.diff-impl :as d]
[hyperfiddle.incseq.perm-impl :as p]
[contrib.debug :as dbg]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe]
[hyperfiddle.electric.impl.runtime3 :as r])
#?(:clj (:import (clojure.lang IFn IDeref)
(java.util.concurrent.locks ReentrantLock))))
Expand Down Expand Up @@ -792,9 +794,16 @@ Mounting a block generates a grow for each active item having this block's frame
(#?(:clj invoke :cljs -invoke) [_ step done]
(reader-spawn state step done)))

(defn fpe [^MountPoint mp]
(let [enforced (fpe/flow "mount-point" mp)]
(reify
KVS (insert! [_ t i] (kvs/insert! mp t i)) (update! [_ t f] (kvs/update! mp t f)) (remove! [_ t] (kvs/remove! mp t))
IFn (#?(:clj invoke :cljs -invoke) [_ step done]
(enforced step done)))))

(defn create [peer]
(->MountPoint
(doto (object-array slots)
(aset slot-lock #?(:clj (ReentrantLock.) :cljs false))
(aset slot-peer peer)
(aset slot-items {}))))
(fpe (->MountPoint
(doto (object-array slots)
(aset slot-lock #?(:clj (ReentrantLock.) :cljs false))
(aset slot-peer peer)
(aset slot-items {})))))
152 changes: 79 additions & 73 deletions src/hyperfiddle/electric/impl/runtime3.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
(:refer-clojure :exclude [resolve])
(:require [hyperfiddle.incseq :as i]
[missionary.core :as m]
[contrib.debug :as dbg]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe]
[cognitect.transit :as t])
(:import missionary.Cancelled
#?(:clj (clojure.lang IFn IDeref))
Expand Down Expand Up @@ -105,6 +107,8 @@
(deps [_ rf r site]) ;; emits ports
(flow [_])) ;; returns incseq

(defn flow! [expr] (fpe/flow (str expr) (flow expr)))

(defn expr-deps [rf r site expr]
(deps expr rf r site))

Expand All @@ -116,7 +120,7 @@
(defn failure? [x]
(instance? Failure x))

(defn invariant [x] (m/cp x))
(defn invariant [x] (fpe/flow "r/invariant" (m/cp x)))

(deftype Pure [value
^:unsynchronized-mutable ^:mutable hash-memo]
Expand All @@ -132,9 +136,9 @@
Expr
(deps [_ _ r _] r)
(flow [_]
(if (failure? value)
(m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)})))
(i/fixed (invariant value)))))
(fpe/flow "r/pure" (if (failure? value)
(m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)})))
(i/fixed (invariant value))))))

(defn pure "
-> (EXPR VOID)
Expand Down Expand Up @@ -176,7 +180,7 @@ T T T -> (EXPR T)
(deps [_ rf r site]
(reduce (fn [r x] (deps x rf r site)) r inputs))
(flow [_]
(apply i/latest-product invoke (map flow inputs))))
(fpe/flow "r/ap" (apply i/latest-product invoke (map flow! inputs)))))

(defn ap "
(EXPR (-> T)) -> (EXPR T)
Expand All @@ -200,7 +204,7 @@ T T T -> (EXPR T)
(= input (.-input ^Join other))))
Expr
(deps [_ rf r site] (deps input rf r site))
(flow [_] (i/latest-concat (flow input))))
(flow [_] (fpe/flow "r/join" (i/latest-concat (flow! input)))))

(defn join "
(EXPR (IS T)) -> (EXPR T)
Expand Down Expand Up @@ -254,8 +258,8 @@ T T T -> (EXPR T)
Expr
(deps [_ _ r _] r)
(flow [_]
(fn [step done]
(step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key)))))))
(fpe/flow "r/unbound" (fn [step done]
(step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key))))))))

(deftype Cdef [frees nodes calls result build])

Expand Down Expand Up @@ -517,7 +521,7 @@ T T T -> (EXPR T)
(port-deps rf r port)
(rf r port))))
(flow [this]
(port-flow (slot-port this))))
(fpe/flow "r/slot" (port-flow (slot-port this)))))

(defn port-slot
{:tag Slot}
Expand Down Expand Up @@ -561,28 +565,30 @@ T T T -> (EXPR T)
input))))

(defn input-sub [^objects port]
(fn [step done]
(let [^Slot slot (port-slot port)
^objects peer (frame-peer (.-frame slot))
busy (enter peer)
^objects remote (aget peer peer-slot-remote)
^objects input (input-check-create remote port)
sub (object-array input-sub-slots)]
(aset sub input-sub-slot-input input)
(aset sub input-sub-slot-step step)
(aset sub input-sub-slot-done done)
(aset sub input-sub-slot-diff (aget input input-slot-diff))
(when-not (aget input input-slot-frozen)
(if-some [^objects prv (aget input input-slot-subs)]
(let [^objects nxt (aget prv input-sub-slot-next)]
(aset prv input-sub-slot-next sub)
(aset nxt input-sub-slot-prev sub)
(aset sub input-sub-slot-prev prv)
(aset sub input-sub-slot-next nxt))
(do (aset input input-slot-subs sub)
(aset sub input-sub-slot-prev sub)
(aset sub input-sub-slot-next sub))))
(exit peer busy) (step) (->InputSub sub))))
(dbg/instrument* 'input-sub
(fpe/flow "r/input-sub"
(fn [step done]
(let [^Slot slot (port-slot port)
^objects peer (frame-peer (.-frame slot))
busy (enter peer)
^objects remote (aget peer peer-slot-remote)
^objects input (input-check-create remote port)
sub (object-array input-sub-slots)]
(aset sub input-sub-slot-input input)
(aset sub input-sub-slot-step step)
(aset sub input-sub-slot-done done)
(aset sub input-sub-slot-diff (aget input input-slot-diff))
(when-not (aget input input-slot-frozen)
(if-some [^objects prv (aget input input-slot-subs)]
(let [^objects nxt (aget prv input-sub-slot-next)]
(aset prv input-sub-slot-next sub)
(aset nxt input-sub-slot-prev sub)
(aset sub input-sub-slot-prev prv)
(aset sub input-sub-slot-next nxt))
(do (aset input input-slot-subs sub)
(aset sub input-sub-slot-prev sub)
(aset sub input-sub-slot-next sub))))
(exit peer busy) (step) (->InputSub sub))))))

(defn make-port [^Slot slot site deps flow]
(let [port (object-array port-slots)
Expand Down Expand Up @@ -1108,36 +1114,36 @@ T T T -> (EXPR T)

(defn remote-handler [opts ^objects peer]
(fn [events]
(fn [step done]
(let [busy (enter peer)
^objects remote (aget peer peer-slot-remote)]
(if (nil? (aget remote remote-slot-channel))
(let [channel (object-array channel-slots)]
(aset remote remote-slot-channel channel)
(aset channel channel-slot-remote remote)
(aset channel channel-slot-step step)
(aset channel channel-slot-done done)
(aset channel channel-slot-busy true)
(aset channel channel-slot-over false)
(aset channel channel-slot-events events)
(aset channel channel-slot-alive (identity 1))
(aset channel channel-slot-shared {})
(aset channel channel-slot-writer-opts (channel-writer-opts opts channel))
(aset channel channel-slot-reader-opts (channel-reader-opts opts channel))
(aset channel channel-slot-ready (aget peer peer-slot-channel-ready))
(aset peer peer-slot-channel-ready channel)
(aset channel channel-slot-process
((aget remote remote-slot-events)
#(let [^objects remote (aget channel channel-slot-remote)]
(channel-ready channel (enter (aget remote remote-slot-peer))))
#(let [^objects remote (aget channel channel-slot-remote)]
(aset channel channel-slot-over true)
(channel-ready channel (enter (aget remote remote-slot-peer))))))
(reduce channel-output-sub channel (vals (aget remote remote-slot-outputs)))
(channel-ready channel busy)
(->Channel channel))
(do (exit peer busy) (step)
(->Failer done (error "Can't connect - remote already up."))))))))
(fpe/flow "r/remote-handler" (fn [step done]
(let [busy (enter peer)
^objects remote (aget peer peer-slot-remote)]
(if (nil? (aget remote remote-slot-channel))
(let [channel (object-array channel-slots)]
(aset remote remote-slot-channel channel)
(aset channel channel-slot-remote remote)
(aset channel channel-slot-step step)
(aset channel channel-slot-done done)
(aset channel channel-slot-busy true)
(aset channel channel-slot-over false)
(aset channel channel-slot-events events)
(aset channel channel-slot-alive (identity 1))
(aset channel channel-slot-shared {})
(aset channel channel-slot-writer-opts (channel-writer-opts opts channel))
(aset channel channel-slot-reader-opts (channel-reader-opts opts channel))
(aset channel channel-slot-ready (aget peer peer-slot-channel-ready))
(aset peer peer-slot-channel-ready channel)
(aset channel channel-slot-process
((aget remote remote-slot-events)
#(let [^objects remote (aget channel channel-slot-remote)]
(channel-ready channel (enter (aget remote remote-slot-peer))))
#(let [^objects remote (aget channel channel-slot-remote)]
(aset channel channel-slot-over true)
(channel-ready channel (enter (aget remote remote-slot-peer))))))
(reduce channel-output-sub channel (vals (aget remote remote-slot-outputs)))
(channel-ready channel busy)
(->Channel channel))
(do (exit peer busy) (step)
(->Failer done (error "Can't connect - remote already up.")))))))))

(defn input-toggle-event [^objects input]
(let [^objects remote (aget input input-slot-remote)]
Expand Down Expand Up @@ -1169,17 +1175,17 @@ T T T -> (EXPR T)
(exit peer busy)))

(defn with-dep [flow ^objects port]
(fn [step done]
(dep-attach port)
(flow step
#(do (dep-detach port)
(done)))))
(fpe/flow "r/with-dep" (fn [step done]
(dep-attach port)
((fpe/flow "r/with-dep input" flow) step
#(do (dep-detach port)
(done))))))

;; is expr always a slot ? if true, we can specialize to ports
(deftype Incseq [peer expr]
IFn
(#?(:clj invoke :cljs -invoke) [_ step done]
((deps expr with-dep (flow expr) (peer-site peer)) step done)))
((fpe/flow "r/incseq" (deps expr with-dep (flow expr) (peer-site peer))) step done)))

(defn incseq-expr [^Incseq incseq]
(.-expr incseq))
Expand Down Expand Up @@ -1211,12 +1217,12 @@ T T T -> (EXPR T)
(aset call call-slot-port
(make-port slot site
(deps expr update-inc {} site)
(i/latest-product
(fn [ctor]
(let [rank (aget call call-slot-rank)
frame (make-frame peer slot rank site ctor)]
(aset call call-slot-rank (inc rank)) frame))
(flow expr))))
(fpe/flow "r/create-call" (i/latest-product
(fn [ctor]
(let [rank (aget call call-slot-rank)
frame (make-frame peer slot rank site ctor)]
(aset call call-slot-rank (inc rank)) frame))
(flow expr)))))
(aset call call-slot-rank (identity 0))
call))

Expand Down
13 changes: 7 additions & 6 deletions src/hyperfiddle/electric3.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
#?(:clj [fipp.edn])
[missionary.core :as m]
[contrib.missionary-contrib :as mx]
[clojure.math :as math])
[clojure.math :as math]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe])
(:import [missionary Cancelled])
#?(:cljs (:require-macros hyperfiddle.electric3)))

Expand Down Expand Up @@ -430,9 +431,9 @@ inhibiting all further reactive updates."
(hyperfiddle.electric3/defn SystemTimeSecs [] (math/floor-div (input system-time-ms) 1000))

(cc/defn uf->is [uf]
(m/ap (m/amb (i/empty-diff 0)
(let [!first (atom true) v (m/?> uf)]
(assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v})))))
(fpe/flow "uf->is" (m/ap (m/amb (i/empty-diff 0)
(let [!first (atom true) v (m/?> uf)]
(assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v}))))))

(cc/letfn [(task->is [t] (uf->is (m/ap (m/? t))))
(initialized [t init-v] (m/relieve {} (m/ap (m/amb= init-v (m/? t)))))]
Expand All @@ -441,8 +442,8 @@ inhibiting all further reactive updates."
([t init-v] (input (initialized t init-v)))))

#?(:clj (cc/defn -offload [tsk executor]
(uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk))))
(catch Cancelled _ (m/amb)))))))
(fpe/flow "offload" (uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk))))
(catch Cancelled _ (m/amb))))))))


(hyperfiddle.electric3/defn Offload
Expand Down
Loading

0 comments on commit 6f613da

Please sign in to comment.