diff --git a/src/hyperfiddle/electric/impl/runtime_de.cljc b/src/hyperfiddle/electric/impl/runtime_de.cljc index 68c9ba143..39836616b 100644 --- a/src/hyperfiddle/electric/impl/runtime_de.cljc +++ b/src/hyperfiddle/electric/impl/runtime_de.cljc @@ -27,46 +27,50 @@ (def peer-slot-site 2) (def peer-slot-defs 3) (def peer-slot-remote 4) -(def peer-slot-sub-ready 7) -(def peer-slot-channel-ready 8) -(def peer-slots 9) +(def peer-slot-sub-ready 5) +(def peer-slot-channel-ready 6) +(def peer-slots 7) (def remote-slot-peer 0) (def remote-slot-events 1) (def remote-slot-channel 2) (def remote-slot-inputs 3) ;; hash map of remote ports currently pushed to local peer, indexed by port slot (def remote-slot-outputs 4) ;; hash map of local port subscriptions pushed to remote peer, indexed by port slot -(def remote-slots 5) +(def remote-slot-ready 5) +(def remote-slot-ackq 6) +(def remote-slot-freeze 7) +(def remote-slot-acks 8) +(def remote-slot-toggle 9) +(def remote-slots 10) + +(def ack-slot-prev 0) +(def ack-slot-next 1) +(def ack-slot-convicted-inputs 2) +(def ack-slot-convicted-outputs 3) +(def ack-slots 4) (def output-slot-remote 0) (def output-slot-port 1) -(def output-slot-sub 2) -(def output-slot-requested 3) ;; true iff remote input has at least one incseq depending on it - updated by remote toggle events -(def output-slot-refcount 4) ;; count of active inputs depending on this output - updated by local incseqs -(def output-slots 5) - -(def output-sub-slot-output 0) -(def output-sub-slot-process 1) -(def output-sub-slot-ready 2) -(def output-sub-slots 3) +(def output-slot-requested 2) ;; true iff remote input has at least one incseq depending on it - updated by remote toggle events +(def output-slot-refcount 3) ;; count of active inputs depending on this output - updated by local incseqs +(def output-slot-convicted 4) +(def output-slot-process 5) +(def output-slot-ready 6) +(def output-slots 7) (def channel-slot-remote 0) (def channel-slot-events 1) (def channel-slot-process 2) (def channel-slot-busy 3) (def channel-slot-over 4) -(def channel-slot-sub-ready 5) -(def channel-slot-step 6) -(def channel-slot-done 7) -(def channel-slot-alive 8) -(def channel-slot-freeze 9) -(def channel-slot-acks 10) -(def channel-slot-toggle 11) -(def channel-slot-ready 12) -(def channel-slot-shared 13) -(def channel-slot-reader-opts 14) -(def channel-slot-writer-opts 15) -(def channel-slots 16) +(def channel-slot-step 5) +(def channel-slot-done 6) +(def channel-slot-alive 7) +(def channel-slot-ready 8) +(def channel-slot-shared 9) +(def channel-slot-reader-opts 10) +(def channel-slot-writer-opts 11) +(def channel-slots 12) (def port-slot-slot 0) (def port-slot-site 1) @@ -81,7 +85,10 @@ (def input-slot-subs 4) (def input-slot-refcount 5) (def input-slot-requested 6) -(def input-slots 7) +(def input-slot-convicted 7) +(def input-slot-prev 8) +(def input-slot-next 9) +(def input-slots 10) (def input-sub-slot-input 0) (def input-sub-slot-step 1) @@ -548,6 +555,10 @@ T T T -> (EXPR T) (conj path [(.-id slot) (.-rank ^Frame frame)])) (vec path)))) +(defn port-coordinates [^objects port] + (let [slot (port-slot port)] + [(frame-path (.-frame slot)) (.-id slot)])) + (defn input-check-create [^objects remote port] (let [slot (port-slot port)] (if-some [^objects input (get (aget remote remote-slot-inputs) slot)] @@ -558,6 +569,7 @@ T T T -> (EXPR T) (aset input input-slot-refcount (identity 0)) (aset input input-slot-diff (i/empty-diff 0)) (aset input input-slot-frozen false) + (aset input input-slot-convicted input) (aset remote remote-slot-inputs (assoc (aget remote remote-slot-inputs) slot input)) input)))) @@ -620,6 +632,37 @@ T T T -> (EXPR T) (flow expr)))] (aset ^objects (.-nodes frame) (- -1 id) port) nil)) +(defn node + "Returns the signal node id for given frame." + {:tag Slot} + [^Frame frame id] + (->Slot frame (- -1 id))) + +(defn call + "Returns the call site id for given frame." + {:tag Slot} + [^Frame frame id] + (->Slot frame id)) + +(defn define-node + "Defines signals node id for given frame." + [^Frame frame id expr] + (define-slot (node frame id) expr)) + +(defn slot-frame + "Returns the frame of given slot." + {:tag Frame} + [^Slot slot] + (.-frame slot)) + +(defn slot-id + "Returns the id of given slot." + [^Slot slot] + (.-id slot)) + +(defn frame-slot [^Frame frame] + (.-slot frame)) + (defn make-frame [^objects peer ^Slot slot rank site ctor] (let [[key idx _ _] ctor cdef (peer-cdef peer key idx) @@ -642,58 +685,71 @@ T T T -> (EXPR T) :cljs (t/write (t/writer :json opts) value))) -(defn remote-ack [^objects remote] - ;; TODO - ) +(defn input-dispose [^objects input] + (let [^objects remote (aget input input-slot-remote)] + (aset remote remote-slot-inputs + (dissoc (aget remote remote-slot-inputs) + (port-slot (aget input input-slot-port)))))) -(defn remote-port-tap [^objects remote ^objects port] - (let [^objects input (input-check-create remote port)] - (aset input input-slot-refcount (inc (aget input input-slot-refcount)))) - remote) +(defn channel-output-sub [^objects channel ^objects output] + (aset channel channel-slot-alive + (inc (aget channel channel-slot-alive))) + (aset output output-slot-process + ((port-flow (aget output output-slot-port)) + #(let [^objects remote (aget output output-slot-remote) + ^objects peer (aget remote remote-slot-peer) + busy (enter peer)] + (if (nil? (aget output output-slot-port)) + (try @(aget output output-slot-process) + (catch #?(:clj Throwable :cljs :default) _)) + (do (aset output output-slot-ready (aget remote remote-slot-ready)) + (aset remote remote-slot-ready output) + (channel-output-event channel))) + (exit peer busy)) + #(let [^objects remote (aget output output-slot-remote) + ^objects peer (aget remote remote-slot-peer) + busy (enter peer)] + (if (nil? (aget output output-slot-port)) + (channel-terminated channel) + (do (aset channel channel-slot-alive + (dec (aget channel channel-slot-alive))) + (aset remote remote-slot-freeze + (conj (aget remote remote-slot-freeze) + (port-slot (aget output output-slot-port)))) + (channel-output-event channel))) + (exit peer busy)))) + channel) -(defn remote-port-untap [^objects remote ^objects port] - (let [slot (port-slot port) - ^objects inputs (aget remote remote-slot-inputs) - ^objects input (get inputs slot) - refcount (dec (aget input input-slot-refcount))] - (aset input input-slot-refcount refcount) - (when (zero? refcount) - (when (zero? (aget input input-slot-requested)) - (aset remote remote-slot-inputs (dissoc inputs slot))))) - remote) +(defn output-dispose [^objects output] + (let [^objects remote (aget output output-slot-remote)] + (aset remote remote-slot-outputs + (dissoc (aget remote remote-slot-outputs) + (port-slot (aget output output-slot-port)))) + (aset output output-slot-port nil) + ((aget output output-slot-process)))) -(defn channel-output-sub [^objects channel ^objects output] - (let [sub (object-array output-sub-slots)] - (aset channel channel-slot-alive - (inc (aget channel channel-slot-alive))) - (aset output output-slot-sub sub) - (aset sub output-sub-slot-ready sub) - (aset sub output-sub-slot-output output) - (aset sub output-sub-slot-process - ((port-flow (aget output output-slot-port)) - #(let [^objects remote (aget output output-slot-remote) - ^objects peer (aget remote remote-slot-peer) - busy (enter peer)] - (if (identical? sub (aget output output-slot-sub)) - (do (aset sub output-sub-slot-ready (aget channel channel-slot-sub-ready)) - (aset channel channel-slot-sub-ready sub) - (channel-output-event channel)) - (try @(aget sub output-sub-slot-process) - (catch #?(:clj Throwable :cljs :default) _))) - (exit peer busy)) - #(let [^objects remote (aget output output-slot-remote) - ^objects peer (aget remote remote-slot-peer) - busy (enter peer)] - (if (identical? sub (aget output output-slot-sub)) - (do (aset channel channel-slot-alive - (dec (aget channel channel-slot-alive))) - (aset channel channel-slot-freeze - (conj (aget channel channel-slot-freeze) - (port-slot (aget output output-slot-port)))) - (channel-output-event channel)) - (channel-terminated channel)) - (exit peer busy)))) - channel)) +(defn reset-diff [n] + {:grow 0, + :degree n, + :shrink n, + :permutation {}, + :change {}, + :freeze #{}}) + +(defn input-reset [^objects input] + (when-some [^objects sub (aget input input-slot-subs)] + (loop [^objects s sub] + (if-some [{:keys [grow degree]} (aget s input-sub-slot-diff)] + (aset s input-sub-slot-diff (reset-diff (- degree grow))) + (let [^objects remote (aget input input-slot-remote) + ^objects peer (aget remote remote-slot-peer)] + (aset s input-sub-slot-diff (reset-diff (:degree (aget input input-slot-diff)))) + (when (identical? s (aget s input-sub-slot-ready)) + (aset s input-sub-slot-ready (aget peer peer-slot-sub-ready)) + (aset peer peer-slot-sub-ready s)))) + (let [n (aget s input-sub-slot-next)] + (when-not (identical? n sub) (recur n))))) + (aset input input-slot-diff (i/empty-diff 0))) (defn output-check-create [^objects remote ^objects local-port] (let [slot (port-slot local-port) @@ -704,44 +760,56 @@ T T T -> (EXPR T) (aset output output-slot-port local-port) (aset output output-slot-refcount (identity 0)) (aset output output-slot-requested false) + (aset output output-slot-ready output) + (aset output output-slot-convicted output) (aset remote remote-slot-outputs (assoc (aget remote remote-slot-outputs) slot output)) (when-some [channel (aget remote remote-slot-channel)] (channel-output-sub channel output)) output)))) -(defn output-reset [^objects output] - (when-some [^objects sub (aget output output-slot-sub)] - (let [^objects remote (aget output output-slot-remote)] - (aset output output-slot-sub nil) - (aset remote remote-slot-outputs - (dissoc (aget remote remote-slot-outputs) - (port-slot (aget output output-slot-port)))) - ((aget sub output-sub-slot-process))))) - -(defn reset-diff [n] - {:grow 0, - :degree n, - :shrink n, - :permutation {}, - :change {}, - :freeze #{}}) - -(defn input-reset [^objects input] - (let [^objects remote (aget input input-slot-remote) - ^objects peer (aget remote remote-slot-peer) - rd (reset-diff (:degree (aget input input-slot-diff)))] - (aset input input-slot-diff (i/empty-diff 0)) - (when-some [^objects sub (aget input input-slot-subs)] - (loop [^objects s sub] - (if-some [{:keys [grow degree]} (aget s input-sub-slot-diff)] - (aset s input-sub-slot-diff (reset-diff (- degree grow))) - (do (aset s input-sub-slot-diff rd) - (when (identical? s (aget s input-sub-slot-ready)) - (aset s input-sub-slot-ready (aget peer peer-slot-sub-ready)) - (aset peer peer-slot-sub-ready s)))) - (let [n (aget s input-sub-slot-next)] - (when-not (identical? n sub) (recur n))))))) +(defn remote-ack [^objects remote] + (let [^objects channel (aget remote remote-slot-channel) + ^objects ack (aget remote remote-slot-ackq) + ^objects nxt (aget ack ack-slot-next) + ^objects prv (aget ack ack-slot-prev)] + (aset remote remote-slot-ackq + (when-not (identical? ack prv) + (aset nxt ack-slot-prev prv) + (aset prv ack-slot-next nxt))) + (loop [^objects input (aget ack ack-slot-convicted-inputs)] + (when-not (nil? input) + (let [x (aget input input-slot-convicted)] + (aset input input-slot-convicted input) + (when (zero? (aget input input-slot-refcount)) + (if (zero? (aget input input-slot-requested)) + (input-dispose input) + (input-reset input))) + (recur x)))) + (loop [^objects output (aget ack ack-slot-convicted-outputs)] + (when-not (nil? output) + (let [x (aget output output-slot-convicted) + port (aget output output-slot-port) + refcount (aget output output-slot-refcount)] + (aset output output-slot-convicted output) + (when-not (aget output output-slot-requested) + (output-dispose output) + (when-not (zero? refcount) + (let [output (object-array output-slots)] + (aset output output-slot-remote remote) + (aset output output-slot-port port) + (aset output output-slot-refcount refcount) + (aset output output-slot-requested false) + (aset output output-slot-ready output) + (aset output output-slot-convicted output) + (aset remote remote-slot-outputs + (assoc (aget remote remote-slot-outputs) + (port-slot port) output)) + (channel-output-sub channel output)))) + (when (identical? channel (aget output output-slot-ready)) + (aset output output-slot-ready (aget remote remote-slot-ready)) + (aset remote remote-slot-ready output)) + (recur x)))))) (defn remote-change [^objects remote ^Slot slot diff] (let [^objects input (get (aget remote remote-slot-inputs) slot) @@ -778,18 +846,105 @@ T T T -> (EXPR T) (when-not (identical? n sub) (recur n)))))) remote) +(defn remote-port-tap [^objects remote ^objects port] + (let [^objects input (input-check-create remote port) + refcount (aget input input-slot-refcount)] + (aset input input-slot-refcount (inc refcount))) + remote) + +(defn remote-port-untap [^objects remote ^objects port] + (let [^objects input (get (aget remote remote-slot-inputs) (port-slot port)) + refcount (dec (aget input input-slot-refcount))] + (aset input input-slot-refcount refcount) + (when (zero? refcount) + (when (zero? (aget input input-slot-requested)) + (input-dispose input)))) + remote) + +(defn output-down [^objects output] + (aset output output-slot-requested false) + (port-deps remote-port-untap (aget output output-slot-remote) (aget output output-slot-port)) + (when (zero? (aget output output-slot-refcount)) + (output-dispose output))) + +(defn output-up [^objects output] + (aset output output-slot-requested true) + (port-deps remote-port-tap (aget output output-slot-remote) (aget output output-slot-port))) + +(defn input-dequeue [^objects input] + (let [^objects remote (aget input input-slot-remote) + ^objects prv (aget input input-slot-prev) + ^objects nxt (aget input input-slot-next)] + (aset input input-slot-prev nil) + (aset input input-slot-next nil) + (aset remote remote-slot-toggle + (when-not (identical? prv input) + (aset prv input-slot-next nxt) + (aset nxt input-slot-prev prv))))) + +(defn input-enqueue [^objects input] + (let [^objects remote (aget input input-slot-remote)] + (if-some [^objects prv (aget remote remote-slot-toggle)] + (let [^objects nxt (aget prv input-slot-next)] + (aset prv input-slot-next input) + (aset nxt input-slot-prev input) + (aset input input-slot-prev prv) + (aset input input-slot-next nxt)) + (do (aset remote remote-slot-toggle input) + (aset input input-slot-prev input) + (aset input input-slot-next input))))) + (defn channel-crash [^objects channel] (let [^objects remote (aget channel channel-slot-remote)] (aset remote remote-slot-channel nil) - (run! input-reset (vals (aget remote remote-slot-inputs))) - (run! output-reset (vals (aget remote remote-slot-outputs))) + (aset remote remote-slot-acks (identity 0)) + (aset remote remote-slot-freeze #{}) + (loop [] + (when-not (nil? (aget remote remote-slot-ackq)) + (remote-ack remote) + (recur))) + (loop [] + (when-some [^objects input (aget remote remote-slot-toggle)] + (input-dequeue input) + (recur))) (loop [] - (when-some [^objects sub (aget channel channel-slot-sub-ready)] - (aset channel channel-slot-sub-ready (aget sub output-sub-slot-ready)) - (aset sub output-sub-slot-ready sub) - (try @(aget sub output-sub-slot-process) + (when-some [^objects output (aget remote remote-slot-ready)] + (aset remote remote-slot-ready (aget output output-slot-ready)) + (aset output output-slot-ready output) + (try @(aget output output-slot-process) (catch #?(:clj Throwable :cljs :default) _)) - (recur))))) + (recur))) + (reduce-kv (fn [_ slot ^objects output] + (when (aget output output-slot-requested) + (output-down output))) + nil (aget remote remote-slot-outputs)) + (reduce-kv (fn [_ slot ^objects input] + (input-reset input) + (when-not (zero? (aget input input-slot-requested)) + (input-enqueue input))) + nil (aget remote remote-slot-inputs)))) + +(defn tap-output [^objects convicted ^objects port] + (let [slot (port-slot port) + ^objects peer (frame-peer (slot-frame slot)) + ^objects remote (aget peer peer-slot-remote) + ^objects output (output-check-create remote port) + refcount (aget output output-slot-refcount)] + (aset output output-slot-refcount (inc refcount)) + convicted)) + +(defn untap-output [^objects convicted ^objects port] + (let [slot (port-slot port) + ^objects peer (frame-peer (slot-frame slot)) + ^objects remote (aget peer peer-slot-remote) + ^objects output (get (aget remote remote-slot-outputs) slot) + refcount (dec (aget output output-slot-refcount))] + (aset output output-slot-refcount refcount) + (if (zero? refcount) + (if (aget output output-slot-requested) + convicted + (do (aset output output-slot-convicted convicted) output)) + convicted))) (defn channel-cancel [^objects channel] ((aget channel channel-slot-process))) @@ -799,29 +954,52 @@ T T T -> (EXPR T) ^objects peer (aget remote remote-slot-peer) busy (enter peer)] (try (if (identical? channel (aget remote remote-slot-channel)) - (loop [change {}] - (if-some [^objects sub (aget channel channel-slot-sub-ready)] - (let [^objects output (aget sub output-sub-slot-output) - ^objects port (aget output output-slot-port) - ps (aget sub output-sub-slot-process) - slot (port-slot port)] - (aset channel channel-slot-sub-ready (aget sub output-sub-slot-ready)) - (aset sub output-sub-slot-ready sub) - (recur (if (identical? sub (aget output output-slot-sub)) - (assoc change slot (let [diff @ps] - (if-some [p (change slot)] - (i/combine p diff) diff))) - (do (try @ps (catch #?(:clj Throwable :cljs :default) _)) change)))) - (let [acks (aget channel channel-slot-acks) - toggle (aget channel channel-slot-toggle) - freeze (aget channel channel-slot-freeze)] - (aset channel channel-slot-acks (identity 0)) - (aset channel channel-slot-toggle #{}) - (aset channel channel-slot-freeze #{}) - (encode [acks toggle change freeze] - (aget channel channel-slot-writer-opts))))) - (let [e (aget channel channel-slot-sub-ready)] - (aset channel channel-slot-sub-ready nil) + (let [^objects ack (object-array ack-slots)] + (loop [toggle #{} + change {}] + (if-some [^objects input (aget remote remote-slot-toggle)] + (let [^objects port (aget input input-slot-port)] + (input-dequeue input) + (if (zero? (aget input input-slot-requested)) + (do (aset ack ack-slot-convicted-outputs + (port-deps untap-output + (aget ack ack-slot-convicted-outputs) port)) + (when (zero? (aget input input-slot-refcount)) + (aset input input-slot-convicted (aget ack ack-slot-convicted-inputs)) + (aset ack ack-slot-convicted-inputs input))) + (aset ack ack-slot-convicted-outputs + (port-deps tap-output + (aget ack ack-slot-convicted-outputs) port))) + (recur (conj toggle (port-slot port)) change)) + (if-some [^objects output (aget remote remote-slot-ready)] + (do (aset remote remote-slot-ready (aget output output-slot-ready)) + (recur toggle + (if (identical? output (aget output output-slot-convicted)) + (let [ps (aget output output-slot-process)] + (aset output output-slot-ready output) + (if-some [port (aget output output-slot-port)] + (let [slot (port-slot port), diff @ps] + (assoc change slot (if-some [p (change slot)] (i/combine p diff) diff))) + (do (try @ps (catch #?(:clj Throwable :cljs :default) _)) change))) + (do (aset output output-slot-ready channel) change)))) + (let [acks (aget remote remote-slot-acks) + freeze (aget remote remote-slot-freeze)] + (aset remote remote-slot-acks (identity 0)) + (aset remote remote-slot-freeze #{}) + (when (pos? (+ (count toggle) (count change) (count freeze))) + (if-some [^objects nxt (aget remote remote-slot-ackq)] + (let [^objects prv (aget nxt ack-slot-prev)] + (aset ack ack-slot-next nxt) + (aset ack ack-slot-prev prv) + (aset prv ack-slot-next ack) + (aset nxt ack-slot-prev ack)) + (do (aset remote remote-slot-ackq ack) + (aset ack ack-slot-prev ack) + (aset ack ack-slot-next ack)))) + (encode [acks toggle change freeze] + (aget channel channel-slot-writer-opts))))))) + (let [e (aget channel channel-slot-shared)] + (aset channel channel-slot-shared nil) (throw e))) (catch #?(:clj Throwable :cljs :default) e (channel-crash channel) @@ -835,15 +1013,9 @@ T T T -> (EXPR T) (exit peer busy))))) (defn remote-toggle [^objects remote ^Slot slot] - (let [^objects local-port (slot-port slot) - ^objects output (output-check-create remote local-port) - requested (aget output output-slot-requested)] - (aset output output-slot-requested (not requested)) - (port-deps (if requested remote-port-untap remote-port-tap) remote local-port) - (when requested - (when (zero? (aget output output-slot-refcount)) - (output-reset output)))) - remote) + (let [^objects output (output-check-create remote (slot-port slot))] + ((if (aget output output-slot-requested) + output-down output-up) output) remote)) (defn channel-ready [^objects channel busy] (let [^objects remote (aget channel channel-slot-remote) @@ -857,17 +1029,17 @@ T T T -> (EXPR T) (let [[acks toggle change freeze] (decode @(aget channel channel-slot-process) (aget channel channel-slot-reader-opts))] - (dotimes [_ acks] (remote-ack remote)) (reduce remote-toggle remote toggle) (reduce-kv remote-change remote change) (reduce remote-freeze remote freeze) + (dotimes [_ acks] (remote-ack remote)) (when (pos? (+ (count toggle) (count change) (count freeze))) - (aset channel channel-slot-acks - (inc (aget channel channel-slot-acks))) + (aset remote remote-slot-acks + (inc (aget remote remote-slot-acks))) (channel-output-event channel))) (catch #?(:clj Throwable :cljs :default) e (channel-crash channel) - (aset channel channel-slot-sub-ready e) + (aset channel channel-slot-shared e) (channel-output-event channel))) (try @(aget channel channel-slot-process) (catch #?(:clj Throwable :cljs :default) _))))) @@ -957,105 +1129,45 @@ T T T -> (EXPR T) (fn [_] (->Failure :unserializable)))})}) -(defn remote-handler - ([^objects peer] (remote-handler {} peer)) - ([opts ^objects peer] - (fn [events] - (fn [step done] - (let [busy (enter peer) - ^objects remote (aget peer peer-slot-remote)] - (if (nil? (aget remote remote-slot-channel)) - (let [channel (object-array channel-slots)] - (aset remote remote-slot-channel channel) - (aset channel channel-slot-remote remote) - (aset channel channel-slot-step step) - (aset channel channel-slot-done done) - (aset channel channel-slot-acks (identity 0)) - (aset channel channel-slot-toggle - (into #{} - (comp - (filter #(aget ^objects % input-slot-requested)) - (map #(port-slot (aget ^objects % input-slot-port)))) - (vals (aget remote remote-slot-inputs)))) - (aset channel channel-slot-freeze #{}) - (aset channel channel-slot-busy true) - (aset channel channel-slot-over false) - (aset channel channel-slot-events events) - (aset channel channel-slot-alive (identity 1)) - (aset channel channel-slot-shared {}) - (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) - (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) - (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) - (aset peer peer-slot-channel-ready channel) - (aset channel channel-slot-process - ((aget remote remote-slot-events) - #(let [^objects remote (aget channel channel-slot-remote)] - (channel-ready channel (enter (aget remote remote-slot-peer)))) - #(let [^objects remote (aget channel channel-slot-remote)] - (aset channel channel-slot-over true) - (channel-ready channel (enter (aget remote remote-slot-peer)))))) - (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) - (channel-ready channel busy) - (->Channel channel)) - (do (exit peer busy) (step) - (->Failer done (error "Can't connect - remote already up."))))))))) - -(defn local-port-tap [^objects remote ^objects local-port] - (let [^objects output (output-check-create remote local-port)] - (aset output output-slot-refcount (inc (aget output output-slot-refcount))) - remote)) - -(defn local-port-untap [^objects remote ^objects local-port] - (let [^objects output (get (aget remote remote-slot-outputs) (port-slot local-port)) - refcount (dec (aget output output-slot-refcount))] - (aset output output-slot-refcount refcount) - (when (zero? refcount) - (when-not (aget output output-slot-requested) - (output-reset output))) - remote)) - -(defn port-coordinates [^objects port] - (let [slot (port-slot port)] - [(frame-path (.-frame slot)) (.-id slot)])) - -(defn node - "Returns the signal node id for given frame." - {:tag Slot} - [^Frame frame id] - (->Slot frame (- -1 id))) - -(defn call - "Returns the call site id for given frame." - {:tag Slot} - [^Frame frame id] - (->Slot frame id)) - -(defn define-node - "Defines signals node id for given frame." - [^Frame frame id expr] - (define-slot (node frame id) expr)) - -(defn slot-frame - "Returns the frame of given slot." - {:tag Frame} - [^Slot slot] - (.-frame slot)) - -(defn slot-id - "Returns the id of given slot." - [^Slot slot] - (.-id slot)) - -(defn frame-slot [^Frame frame] - (.-slot frame)) - -(defn remote-toggle-event [^objects remote slot] - (when-some [^objects channel (aget remote remote-slot-channel)] - (let [toggle (aget channel channel-slot-toggle)] - (aset channel channel-slot-toggle - ((if (contains? toggle slot) disj conj) - toggle slot))) - (channel-output-event channel))) +(defn remote-handler [opts ^objects peer] + (fn [events] + (fn [step done] + (let [busy (enter peer) + ^objects remote (aget peer peer-slot-remote)] + (if (nil? (aget remote remote-slot-channel)) + (let [channel (object-array channel-slots)] + (aset remote remote-slot-channel channel) + (aset channel channel-slot-remote remote) + (aset channel channel-slot-step step) + (aset channel channel-slot-done done) + (aset channel channel-slot-busy true) + (aset channel channel-slot-over false) + (aset channel channel-slot-events events) + (aset channel channel-slot-alive (identity 1)) + (aset channel channel-slot-shared {}) + (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) + (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) + (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) + (aset peer peer-slot-channel-ready channel) + (aset channel channel-slot-process + ((aget remote remote-slot-events) + #(let [^objects remote (aget channel channel-slot-remote)] + (channel-ready channel (enter (aget remote remote-slot-peer)))) + #(let [^objects remote (aget channel channel-slot-remote)] + (aset channel channel-slot-over true) + (channel-ready channel (enter (aget remote remote-slot-peer)))))) + (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) + (channel-ready channel busy) + (->Channel channel)) + (do (exit peer busy) (step) + (->Failer done (error "Can't connect - remote already up.")))))))) + +(defn input-toggle-event [^objects input] + (let [^objects remote (aget input input-slot-remote)] + ((if (nil? (aget input input-slot-prev)) + input-enqueue input-dequeue) input) + (when-some [^objects channel (aget remote remote-slot-channel)] + (channel-output-event channel)))) (defn dep-attach [^objects port] (let [slot (port-slot port) @@ -1065,9 +1177,7 @@ T T T -> (EXPR T) ^objects input (input-check-create remote port) requested (aget input input-slot-requested)] (aset input input-slot-requested (inc requested)) - (when (zero? requested) - (remote-toggle-event remote slot)) - (port-deps local-port-tap remote port) + (when (zero? requested) (input-toggle-event input)) (exit peer busy))) (defn dep-detach [^objects port] @@ -1078,12 +1188,7 @@ T T T -> (EXPR T) ^objects input (input-check-create remote port) requested (dec (aget input input-slot-requested))] (aset input input-slot-requested requested) - (when (zero? requested) - (remote-toggle-event remote slot) - (when (zero? (aget input input-slot-refcount)) - (aset remote remote-slot-inputs - (dissoc (aget remote remote-slot-inputs) slot)))) - (port-deps local-port-untap remote port) + (when (zero? requested) (input-toggle-event input)) (exit peer busy))) (defn with-dep [flow ^objects port] @@ -1209,6 +1314,8 @@ T T T -> (EXPR T) (aset remote remote-slot-peer peer) (aset remote remote-slot-inputs {}) (aset remote remote-slot-outputs {}) + (aset remote remote-slot-acks (identity 0)) + (aset remote remote-slot-freeze #{}) (aset remote remote-slot-events (m/stream (m/observe diff --git a/test/hyperfiddle/electric/impl/runtime_test.cljc b/test/hyperfiddle/electric/impl/runtime_test.cljc index d1dcc1344..c92f584b0 100644 --- a/test/hyperfiddle/electric/impl/runtime_test.cljc +++ b/test/hyperfiddle/electric/impl/runtime_test.cljc @@ -169,14 +169,14 @@ (def server (peer :server (rcf/tap (e/server :foo)))) (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) (def c-ps - (((r/remote-handler client) + (((r/remote-handler {} client) (fn [!] (def s->c !) #(prn :dispose))) #(rcf/tap :step-c) #(prn :done-c))) % := :step-c (def s-ps - (((r/remote-handler server) + (((r/remote-handler {} server) (fn [!] (def c->s !) #(prn :dispose))) @@ -194,14 +194,14 @@ (def server (peer :server (rcf/tap (e/client (e/$ (e/server (e/fn [] :foo))))))) (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) (def c-ps - (((r/remote-handler client) + (((r/remote-handler {} client) (fn [!] (def s->c !) #(prn :dispose))) #(rcf/tap :step-c) #(prn :done-c))) % := :step-c (def s-ps - (((r/remote-handler server) + (((r/remote-handler {} server) (fn [!] (def c->s !) #(prn :dispose))) @@ -219,14 +219,14 @@ (def server (peer :server (rcf/tap (e/client (e/$ (e/server (let [foo :foo] (e/fn [] foo)))))))) (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) (def c-ps - (((r/remote-handler client) + (((r/remote-handler {} client) (fn [!] (def s->c !) #(prn :dispose))) #(rcf/tap :step-c) #(prn :done-c))) % := :step-c (def s-ps - (((r/remote-handler server) + (((r/remote-handler {} server) (fn [!] (def c->s !) #(prn :dispose))) @@ -248,14 +248,14 @@ (def server (peer :server (rcf/tap (e/join (e/pure (let [x (e/server 2)] x)))))) (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) (def c-ps - (((r/remote-handler client) + (((r/remote-handler {} client) (fn [!] (def s->c !) #(prn :dispose))) #(rcf/tap :step-c) #(rcf/tap :done-c))) % := :step-c (def s-ps - (((r/remote-handler server) + (((r/remote-handler {} server) (fn [!] (def c->s !) #(prn :dispose)))