Skip to content

Commit

Permalink
enforcer everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
xificurC committed Sep 11, 2024
1 parent 9f3aa31 commit b7fb12a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 118 deletions.
149 changes: 76 additions & 73 deletions src/hyperfiddle/electric/impl/runtime3.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:refer-clojure :exclude [resolve])
(:require [hyperfiddle.incseq :as i]
[missionary.core :as m]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe]
[cognitect.transit :as t])
(:import missionary.Cancelled
#?(:clj (clojure.lang IFn IDeref))
Expand Down Expand Up @@ -105,6 +106,8 @@
(deps [_ rf r site]) ;; emits ports
(flow [_])) ;; returns incseq

(defn flow! [expr] (fpe/flow (str expr) (flow expr)))

(defn expr-deps [rf r site expr]
(deps expr rf r site))

Expand All @@ -116,7 +119,7 @@
(defn failure? [x]
(instance? Failure x))

(defn invariant [x] (m/cp x))
(defn invariant [x] (fpe/flow "r/invariant" (m/cp x)))

(deftype Pure [value
^:unsynchronized-mutable ^:mutable hash-memo]
Expand All @@ -132,9 +135,9 @@
Expr
(deps [_ _ r _] r)
(flow [_]
(if (failure? value)
(m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)})))
(i/fixed (invariant value)))))
(fpe/flow "r/pure" (if (failure? value)
(m/latest #(throw (ex-info "Illegal access." {:info (failure-info value)})))
(i/fixed (invariant value))))))

(defn pure "
-> (EXPR VOID)
Expand Down Expand Up @@ -176,7 +179,7 @@ T T T -> (EXPR T)
(deps [_ rf r site]
(reduce (fn [r x] (deps x rf r site)) r inputs))
(flow [_]
(apply i/latest-product invoke (map flow inputs))))
(fpe/flow "r/ap" (apply i/latest-product invoke (map flow! inputs)))))

(defn ap "
(EXPR (-> T)) -> (EXPR T)
Expand All @@ -200,7 +203,7 @@ T T T -> (EXPR T)
(= input (.-input ^Join other))))
Expr
(deps [_ rf r site] (deps input rf r site))
(flow [_] (i/latest-concat (flow input))))
(flow [_] (fpe/flow "r/join" (i/latest-concat (flow! input)))))

(defn join "
(EXPR (IS T)) -> (EXPR T)
Expand Down Expand Up @@ -254,8 +257,8 @@ T T T -> (EXPR T)
Expr
(deps [_ _ r _] r)
(flow [_]
(fn [step done]
(step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key)))))))
(fpe/flow "r/unbound" (fn [step done]
(step) (->Failer done (error (str "Unbound electric var lookup - " (pr-str key))))))))

(deftype Cdef [frees nodes calls result build])

Expand Down Expand Up @@ -517,7 +520,7 @@ T T T -> (EXPR T)
(port-deps rf r port)
(rf r port))))
(flow [this]
(port-flow (slot-port this))))
(fpe/flow "r/slot" (port-flow (slot-port this)))))

(defn port-slot
{:tag Slot}
Expand Down Expand Up @@ -561,28 +564,28 @@ T T T -> (EXPR T)
input))))

(defn input-sub [^objects port]
(fn [step done]
(let [^Slot slot (port-slot port)
^objects peer (frame-peer (.-frame slot))
busy (enter peer)
^objects remote (aget peer peer-slot-remote)
^objects input (input-check-create remote port)
sub (object-array input-sub-slots)]
(aset sub input-sub-slot-input input)
(aset sub input-sub-slot-step step)
(aset sub input-sub-slot-done done)
(aset sub input-sub-slot-diff (aget input input-slot-diff))
(when-not (aget input input-slot-frozen)
(if-some [^objects prv (aget input input-slot-subs)]
(let [^objects nxt (aget prv input-sub-slot-next)]
(aset prv input-sub-slot-next sub)
(aset nxt input-sub-slot-prev sub)
(aset sub input-sub-slot-prev prv)
(aset sub input-sub-slot-next nxt))
(do (aset input input-slot-subs sub)
(aset sub input-sub-slot-prev sub)
(aset sub input-sub-slot-next sub))))
(exit peer busy) (step) (->InputSub sub))))
(fpe/flow "r/input-sub" (fn [step done]
(let [^Slot slot (port-slot port)
^objects peer (frame-peer (.-frame slot))
busy (enter peer)
^objects remote (aget peer peer-slot-remote)
^objects input (input-check-create remote port)
sub (object-array input-sub-slots)]
(aset sub input-sub-slot-input input)
(aset sub input-sub-slot-step step)
(aset sub input-sub-slot-done done)
(aset sub input-sub-slot-diff (aget input input-slot-diff))
(when-not (aget input input-slot-frozen)
(if-some [^objects prv (aget input input-slot-subs)]
(let [^objects nxt (aget prv input-sub-slot-next)]
(aset prv input-sub-slot-next sub)
(aset nxt input-sub-slot-prev sub)
(aset sub input-sub-slot-prev prv)
(aset sub input-sub-slot-next nxt))
(do (aset input input-slot-subs sub)
(aset sub input-sub-slot-prev sub)
(aset sub input-sub-slot-next sub))))
(exit peer busy) (step) (->InputSub sub)))))

(defn make-port [^Slot slot site deps flow]
(let [port (object-array port-slots)
Expand Down Expand Up @@ -1108,36 +1111,36 @@ T T T -> (EXPR T)

(defn remote-handler [opts ^objects peer]
(fn [events]
(fn [step done]
(let [busy (enter peer)
^objects remote (aget peer peer-slot-remote)]
(if (nil? (aget remote remote-slot-channel))
(let [channel (object-array channel-slots)]
(aset remote remote-slot-channel channel)
(aset channel channel-slot-remote remote)
(aset channel channel-slot-step step)
(aset channel channel-slot-done done)
(aset channel channel-slot-busy true)
(aset channel channel-slot-over false)
(aset channel channel-slot-events events)
(aset channel channel-slot-alive (identity 1))
(aset channel channel-slot-shared {})
(aset channel channel-slot-writer-opts (channel-writer-opts opts channel))
(aset channel channel-slot-reader-opts (channel-reader-opts opts channel))
(aset channel channel-slot-ready (aget peer peer-slot-channel-ready))
(aset peer peer-slot-channel-ready channel)
(aset channel channel-slot-process
((aget remote remote-slot-events)
#(let [^objects remote (aget channel channel-slot-remote)]
(channel-ready channel (enter (aget remote remote-slot-peer))))
#(let [^objects remote (aget channel channel-slot-remote)]
(aset channel channel-slot-over true)
(channel-ready channel (enter (aget remote remote-slot-peer))))))
(reduce channel-output-sub channel (vals (aget remote remote-slot-outputs)))
(channel-ready channel busy)
(->Channel channel))
(do (exit peer busy) (step)
(->Failer done (error "Can't connect - remote already up."))))))))
(fpe/flow "r/remote-handler" (fn [step done]
(let [busy (enter peer)
^objects remote (aget peer peer-slot-remote)]
(if (nil? (aget remote remote-slot-channel))
(let [channel (object-array channel-slots)]
(aset remote remote-slot-channel channel)
(aset channel channel-slot-remote remote)
(aset channel channel-slot-step step)
(aset channel channel-slot-done done)
(aset channel channel-slot-busy true)
(aset channel channel-slot-over false)
(aset channel channel-slot-events events)
(aset channel channel-slot-alive (identity 1))
(aset channel channel-slot-shared {})
(aset channel channel-slot-writer-opts (channel-writer-opts opts channel))
(aset channel channel-slot-reader-opts (channel-reader-opts opts channel))
(aset channel channel-slot-ready (aget peer peer-slot-channel-ready))
(aset peer peer-slot-channel-ready channel)
(aset channel channel-slot-process
((aget remote remote-slot-events)
#(let [^objects remote (aget channel channel-slot-remote)]
(channel-ready channel (enter (aget remote remote-slot-peer))))
#(let [^objects remote (aget channel channel-slot-remote)]
(aset channel channel-slot-over true)
(channel-ready channel (enter (aget remote remote-slot-peer))))))
(reduce channel-output-sub channel (vals (aget remote remote-slot-outputs)))
(channel-ready channel busy)
(->Channel channel))
(do (exit peer busy) (step)
(->Failer done (error "Can't connect - remote already up.")))))))))

(defn input-toggle-event [^objects input]
(let [^objects remote (aget input input-slot-remote)]
Expand Down Expand Up @@ -1169,17 +1172,17 @@ T T T -> (EXPR T)
(exit peer busy)))

(defn with-dep [flow ^objects port]
(fn [step done]
(dep-attach port)
(flow step
#(do (dep-detach port)
(done)))))
(fpe/flow "r/with-dep" (fn [step done]
(dep-attach port)
((fpe/flow "r/with-dep input" flow) step
#(do (dep-detach port)
(done))))))

;; is expr always a slot ? if true, we can specialize to ports
(deftype Incseq [peer expr]
IFn
(#?(:clj invoke :cljs -invoke) [_ step done]
((deps expr with-dep (flow expr) (peer-site peer)) step done)))
((fpe/flow "r/incseq" (deps expr with-dep (flow expr) (peer-site peer))) step done)))

(defn incseq-expr [^Incseq incseq]
(.-expr incseq))
Expand Down Expand Up @@ -1211,12 +1214,12 @@ T T T -> (EXPR T)
(aset call call-slot-port
(make-port slot site
(deps expr update-inc {} site)
(i/latest-product
(fn [ctor]
(let [rank (aget call call-slot-rank)
frame (make-frame peer slot rank site ctor)]
(aset call call-slot-rank (inc rank)) frame))
(flow expr))))
(fpe/flow "r/create-call" (i/latest-product
(fn [ctor]
(let [rank (aget call call-slot-rank)
frame (make-frame peer slot rank site ctor)]
(aset call call-slot-rank (inc rank)) frame))
(flow expr)))))
(aset call call-slot-rank (identity 0))
call))

Expand Down
13 changes: 7 additions & 6 deletions src/hyperfiddle/electric3.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
#?(:clj [fipp.edn])
[missionary.core :as m]
[contrib.missionary-contrib :as mx]
[clojure.math :as math])
[clojure.math :as math]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe])
(:import [missionary Cancelled])
#?(:cljs (:require-macros hyperfiddle.electric3)))

Expand Down Expand Up @@ -430,9 +431,9 @@ inhibiting all further reactive updates."
(hyperfiddle.electric3/defn SystemTimeSecs [] (math/floor-div (input system-time-ms) 1000))

(cc/defn uf->is [uf]
(m/ap (m/amb (i/empty-diff 0)
(let [!first (atom true) v (m/?> uf)]
(assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v})))))
(fpe/flow "uf->is" (m/ap (m/amb (i/empty-diff 0)
(let [!first (atom true) v (m/?> uf)]
(assoc (i/empty-diff 1) :grow (if @!first (do (swap! !first not) 1) 0), :change {0 v}))))))

(cc/letfn [(task->is [t] (uf->is (m/ap (m/? t))))
(initialized [t init-v] (m/relieve {} (m/ap (m/amb= init-v (m/? t)))))]
Expand All @@ -441,8 +442,8 @@ inhibiting all further reactive updates."
([t init-v] (input (initialized t init-v)))))

#?(:clj (cc/defn -offload [tsk executor]
(uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk))))
(catch Cancelled _ (m/amb)))))))
(fpe/flow "offload" (uf->is (m/ap (try (m/? (m/via-call executor (m/?< (mx/poll-task tsk))))
(catch Cancelled _ (m/amb))))))))


(hyperfiddle.electric3/defn Offload
Expand Down
52 changes: 21 additions & 31 deletions src/hyperfiddle/incseq.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ successive sequence diffs. Incremental sequences are applicative functors with `
[hyperfiddle.incseq.latest-concat-impl :as lc]
[hyperfiddle.rcf :refer [tests]]
[clojure.core :as cc]
[missionary.core :as m])
[missionary.core :as m]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe]
[contrib.debug :as dbg])
(:import #?(:clj (clojure.lang IFn IDeref))
missionary.Cancelled))

Expand Down Expand Up @@ -224,19 +226,19 @@ Returns a flow producing the successive diffs of given continuous flow of collec
(if (aset state slot-busy (not (aget state slot-busy)))
(recur) nop))))) nop))))
(scan [ctor flow]
(fn [n t]
(let [state (object-array slots)]
(aset state slot-notifier n)
(aset state slot-terminator t)
(aset state slot-stepfn (ctor))
(aset state slot-busy false)
(aset state slot-done false)
(aset state slot-process
(flow #(ready state)
#(do (aset state slot-done true)
(ready state))))
(->Ps state cancel transfer))))]
(fn [kf flow] (scan #(->seq-differ kf) flow)))))
(fpe/flow "i/diff-by" (fn [n t]
(let [state (object-array slots)]
(aset state slot-notifier n)
(aset state slot-terminator t)
(aset state slot-stepfn (ctor))
(aset state slot-busy false)
(aset state slot-done false)
(aset state slot-process
(flow #(ready state)
#(do (aset state slot-done true)
(ready state))))
(->Ps state cancel transfer)))))]
(fn [kf flow] (scan #(->seq-differ kf) (fpe/flow "i/diff-by input" flow))))))


(def ^{:doc "
Expand All @@ -248,24 +250,12 @@ Returns the diff applying given diffs successively.
"} combine d/combine)


(def ^{:doc "
Returns the incremental sequence defined by the fixed collection of given continuous flows.
A collection is fixed iff its size is invariant and its items are immobile.
"} fixed f/flow)

(defn fixed [& flows] (fpe/flow "i/fixed" (apply f/flow (into [] (map-indexed (fn [i flow] (fpe/flow (str "i/fixed input" i) flow))) flows))))

(def ^{:arglists '([f & incseqs])
:doc "
Returns the incremental sequence defined by applying the cartesian product of items in given incremental sequences,
combined with given function.
"} latest-product lp/flow)
(defn latest-product [f & incseqs]
(fpe/flow (apply lp/flow f (into [] (map-indexed (fn [i flow] (fpe/flow (str "i/latest-product child " i) flow))) incseqs))))


(def ^{:arglists '([incseq-of-incseqs])
:doc "
Returns the incremental sequence defined by the concatenation of incremental sequences defined by given incremental
sequence.
"} latest-concat lc/flow)
(defn latest-concat [is-of-is] (fpe/flow "i/latest-concat" (lc/flow (fpe/flow "i/latest-concat child" is-of-is))))


(def ^{:arglists '([] [sentinel] [sentinel compare])
Expand Down Expand Up @@ -564,7 +554,7 @@ optional `compare` function, `clojure.core/compare` by default.
:change {0 curr}
:freeze #{}})))))))))))))))

(def ^{:arglists '([incseq])} items i/flow)
(defn items [incseq] (dbg/instrument* "i/items" (fpe/flow "i/items" (i/flow (dbg/instrument* "i/items input" (fpe/flow "i/items input" incseq))))))

(def ^{:arglists '([incseq])
:doc "
Expand Down
18 changes: 10 additions & 8 deletions src/hyperfiddle/incseq/items_eager_impl.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[clojure.set :as set]
[hyperfiddle.electric.impl.array-fields :as a]
[hyperfiddle.incseq.diff-impl :as d]
[hyperfiddle.incseq.perm-impl :as p])
[hyperfiddle.incseq.perm-impl :as p]
[hyperfiddle.incseq.flow-protocol-enforcer :as fpe])
(:import #?(:clj [clojure.lang IDeref IFn])
#?(:clj [java.util.concurrent.atomic AtomicLong])
#?(:clj [java.util.concurrent.locks ReentrantLock])
Expand Down Expand Up @@ -106,13 +107,14 @@
(a/fset item -ps* (atom #{}))
(a/set (a/fget ps -item*) i item)
(a/fswap ps -diff update :change assoc (idx i i)
(a/fset item -flow (fn [step done]
(if (a/fget item -dead)
(->dead-item-ps step done (a/fget item -v))
(let [item-ps (->item-ps item step done)]
(swap! (a/fget item -ps*) conj item-ps)
(item-ps (a/fget item -v))
item-ps)))))))
(a/fset item -flow (fpe/flow (str "i/items child " (idx i i))
(fn [step done]
(if (a/fget item -dead)
(->dead-item-ps step done (a/fget item -v))
(let [item-ps (->item-ps item step done)]
(swap! (a/fget item -ps*) conj item-ps)
(item-ps (a/fget item -v))
item-ps))))))))
(range (- d n) d))))

(defn permute! [^Ps ps {p :permutation}]
Expand Down

0 comments on commit b7fb12a

Please sign in to comment.