diff --git a/src/hyperfiddle/electric/impl/runtime_de.cljc b/src/hyperfiddle/electric/impl/runtime_de.cljc index c4d49ac87..8d32608ae 100644 --- a/src/hyperfiddle/electric/impl/runtime_de.cljc +++ b/src/hyperfiddle/electric/impl/runtime_de.cljc @@ -324,6 +324,8 @@ T T T -> (EXPR T) [^Frame frame] (.-site frame)) +(declare port-slot) + (defn peer-push [^Peer peer offset item] (let [^objects state (.-state peer) ^objects queues (.-queues peer) @@ -367,8 +369,6 @@ T T T -> (EXPR T) (defn port-site [^objects port] (aget port port-slot-site)) -(declare port-slot) - (deftype Remote [port step done] IFn (#?(:clj invoke :cljs -invoke) [_] @@ -552,60 +552,60 @@ T T T -> (EXPR T) tap-pull 0 untap-pull 0 toggle-pull 0 - change-pull 0] + ready-pull 0] (let [^objects tap-queue (aget queues peer-queue-tap) ^objects untap-queue (aget queues peer-queue-untap) ^objects toggle-queue (aget queues peer-queue-toggle) ^objects ready-queue (aget queues peer-queue-ready)] (if-some [^objects remote-port (aget tap-queue tap-pull)] (do (aset tap-queue tap-pull nil) - (let [prev (aget remote-port port-slot-requested)] + (let [tap-pull (rem (unchecked-inc-int tap-pull) + (alength tap-queue)) + prev (aget remote-port port-slot-requested)] (aset remote-port port-slot-requested (inc prev)) (reduce-kv local-port-tap nil (port-deps remote-port)) (recur (if (zero? (+ prev (aget remote-port port-slot-refcount))) (conj toggle (port-slot remote-port)) toggle) change freeze - (rem (unchecked-inc-int tap-pull) - (alength tap-queue)) untap-pull toggle-pull change-pull))) + tap-pull untap-pull toggle-pull ready-pull))) (if-some [^objects remote-port (aget untap-queue untap-pull)] (do (aset untap-queue untap-pull nil) - (let [curr (dec (aget remote-port port-slot-requested))] + (let [untap-pull (rem (unchecked-inc-int untap-pull) + (alength untap-queue)) + curr (dec (aget remote-port port-slot-requested))] (aset remote-port port-slot-requested curr) (reduce-kv local-port-untap nil (port-deps remote-port)) (recur (if (zero? (+ curr (aget remote-port port-slot-refcount))) (conj toggle (port-slot remote-port)) toggle) change freeze - tap-pull (rem (unchecked-inc-int untap-pull) - (alength untap-queue)) toggle-pull change-pull))) + tap-pull untap-pull toggle-pull ready-pull))) (if-some [^objects local-port (aget toggle-queue toggle-pull)] (do (aset toggle-queue toggle-pull nil) - (if (zero? (aget local-port port-slot-requested)) - (do (aset local-port port-slot-requested (identity 1)) - (reduce-kv remote-port-tap nil (port-deps local-port)) - (when (zero? (aget local-port port-slot-refcount)) - (enable local-port))) - (do (aset local-port port-slot-requested (identity 0)) - (reduce-kv remote-port-untap nil (port-deps local-port)) - (when (zero? (aget local-port port-slot-refcount)) - (disable local-port)))) - (recur toggle change freeze tap-pull untap-pull - (rem (unchecked-inc-int toggle-pull) - (alength toggle-queue)) change-pull)) - (if-some [^objects local-port (aget ready-queue change-pull)] - (do (aset ready-queue change-pull nil) - (if-some [ps (port-process local-port)] - (if (aget local-port port-slot-state) - (recur toggle change (conj freeze (port-slot local-port)) - tap-pull untap-pull toggle-pull - (rem (unchecked-inc-int change-pull) - (alength ready-queue))) - (let [diff @ps - slot (port-slot local-port)] - (recur toggle (assoc change - slot (if-some [p (change slot)] - (i/combine p diff) diff)) - freeze tap-pull untap-pull toggle-pull - (rem (unchecked-inc-int change-pull) - (alength ready-queue))))) - (recur toggle change freeze tap-pull untap-pull toggle-pull change-pull))) + (let [toggle-pull (rem (unchecked-inc-int toggle-pull) + (alength toggle-queue))] + (if (zero? (aget local-port port-slot-requested)) + (do (aset local-port port-slot-requested (identity 1)) + (reduce-kv remote-port-tap nil (port-deps local-port)) + (when (zero? (aget local-port port-slot-refcount)) + (enable local-port))) + (do (aset local-port port-slot-requested (identity 0)) + (reduce-kv remote-port-untap nil (port-deps local-port)) + (when (zero? (aget local-port port-slot-refcount)) + (disable local-port)))) + (recur toggle change freeze tap-pull untap-pull toggle-pull ready-pull))) + (if-some [^objects local-port (aget ready-queue ready-pull)] + (do (aset ready-queue ready-pull nil) + (let [ready-pull (rem (unchecked-inc-int ready-pull) + (alength ready-queue))] + (if-some [ps (port-process local-port)] + (if (aget local-port port-slot-state) + (recur toggle change (conj freeze (port-slot local-port)) + tap-pull untap-pull toggle-pull ready-pull) + (let [diff @ps + slot (port-slot local-port)] + (recur toggle (assoc change + slot (if-some [p (change slot)] + (i/combine p diff) diff)) + freeze tap-pull untap-pull toggle-pull ready-pull))) + (recur toggle change freeze tap-pull untap-pull toggle-pull ready-pull)))) (let [acks (aget state peer-slot-output-acks)] (aset state peer-slot-output-acks (identity 0)) (aset state peer-slot-output-pending true) diff --git a/test/hyperfiddle/electric_de_test.cljc b/test/hyperfiddle/electric_de_test.cljc index d9f7e18ef..6e38715bb 100644 --- a/test/hyperfiddle/electric_de_test.cljc +++ b/test/hyperfiddle/electric_de_test.cljc @@ -2175,3 +2175,15 @@ (binding [Self (e/fn [] 111)] (tap (= Bar (e/$ Bar)))))) tap tap) % := false)) + +(tests + (def !offset (atom 0)) + (with ((l/local {} + (e/cursor [j (let [o (e/watch !offset)] + (e/diff-by identity + (range o (+ o 2))))] + (e/server (tap j)))) + tap tap) + (hash-set % %) := #{0 1} + (swap! !offset inc) + % := 2)) \ No newline at end of file