diff --git a/src/hyperfiddle/electric/impl/mount_point.cljc b/src/hyperfiddle/electric/impl/mount_point.cljc index 85ab6b5a7..3f224a2b9 100644 --- a/src/hyperfiddle/electric/impl/mount_point.cljc +++ b/src/hyperfiddle/electric/impl/mount_point.cljc @@ -312,7 +312,7 @@ Mounting a block generates a grow for each active item having this block's frame (inc (aget reader reader-slot-alive))) (aset call call-slot-process ((if-some [slot (call-slot call)] - (r/incseq (r/peer-root-frame (aget state slot-peer)) slot) + (r/incseq (r/peer-root (aget state slot-peer)) slot) (f/flow (r/invariant (aget ^objects (aget call call-slot-children) block-slot-frame)))) #(let [^objects reader (aget call call-slot-reader) ^objects state (aget reader reader-slot-state) diff --git a/src/hyperfiddle/electric/impl/runtime3.cljc b/src/hyperfiddle/electric/impl/runtime3.cljc index c701695f0..eba81ad94 100644 --- a/src/hyperfiddle/electric/impl/runtime3.cljc +++ b/src/hyperfiddle/electric/impl/runtime3.cljc @@ -31,7 +31,7 @@ (def peer-slots 6) (def remote-slot-peer 0) -(def remote-slot-events 1) +(def remote-slot-input 1) (def remote-slot-channel 2) (def remote-slot-inputs 3) ;; hash map of remote ports currently pushed to local peer, indexed by port slot (def remote-slot-outputs 4) ;; hash map of local port subscriptions pushed to remote peer, indexed by port slot @@ -40,7 +40,8 @@ (def remote-slot-freeze 7) (def remote-slot-acks 8) (def remote-slot-toggle 9) -(def remote-slots 10) +(def remote-slot-events 10) +(def remote-slots 11) (def ack-slot-prev 0) (def ack-slot-next 1) @@ -58,18 +59,17 @@ (def output-slots 7) (def channel-slot-remote 0) -(def channel-slot-events 1) -(def channel-slot-process 2) -(def channel-slot-busy 3) -(def channel-slot-over 4) -(def channel-slot-step 5) -(def channel-slot-done 6) -(def channel-slot-alive 7) -(def channel-slot-ready 8) -(def channel-slot-shared 9) -(def channel-slot-reader-opts 10) -(def channel-slot-writer-opts 11) -(def channel-slots 12) +(def channel-slot-process 1) +(def channel-slot-busy 2) +(def channel-slot-over 3) +(def channel-slot-step 4) +(def channel-slot-done 5) +(def channel-slot-alive 6) +(def channel-slot-ready 7) +(def channel-slot-shared 8) +(def channel-slot-reader-opts 9) +(def channel-slot-writer-opts 10) +(def channel-slots 11) (def port-slot-slot 0) (def port-slot-site 1) @@ -313,7 +313,7 @@ T T T -> (EXPR T) (defn peer-site [^objects peer] (aget peer peer-slot-site)) -(defn peer-root [^objects peer key] +(defn peer-resolve [^objects peer key] (let [defs (peer-defs peer)] (when-not (contains? defs key) (throw (error (str (pr-str key) " not defined")))) (defs key))) @@ -322,7 +322,7 @@ T T T -> (EXPR T) "Returns the cdef of given constructor." {:tag Cdef} [^objects peer key idx] - ((peer-root peer key) idx)) + ((peer-resolve peer key) idx)) (defn port-flow [^objects port] (aget port port-slot-flow)) @@ -405,7 +405,7 @@ T T T -> (EXPR T) (defn resolve "Returns the root binding of electric var matching given keyword." [^Frame frame key] - ((peer-root (.-peer frame) key))) + ((peer-resolve (.-peer frame) key))) (defn frame-site "Returns the site of given frame." @@ -800,7 +800,9 @@ T T T -> (EXPR T) (if-some [prev (aget s input-sub-slot-diff)] (aset s input-sub-slot-diff (i/combine prev diff)) (let [step (aget s input-sub-slot-step)] - (aset s input-sub-slot-diff diff) (step))) + (aset s input-sub-slot-diff diff) + ;; TODO this can nullify slot-next + (step))) (let [n (aget s input-sub-slot-next)] (when-not (identical? n sub) (recur n)))))) remote) @@ -1105,38 +1107,12 @@ T T T -> (EXPR T) (fn [_] (->Failure :unserializable)))})}) -(defn remote-handler [opts ^objects peer] - (fn [events] - (fn [step done] - (let [busy (enter peer) - ^objects remote (aget peer peer-slot-remote)] - (if (nil? (aget remote remote-slot-channel)) - (let [channel (object-array channel-slots)] - (aset remote remote-slot-channel channel) - (aset channel channel-slot-remote remote) - (aset channel channel-slot-step step) - (aset channel channel-slot-done done) - (aset channel channel-slot-busy true) - (aset channel channel-slot-over false) - (aset channel channel-slot-events events) - (aset channel channel-slot-alive (identity 1)) - (aset channel channel-slot-shared {}) - (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) - (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) - (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) - (aset peer peer-slot-channel-ready channel) - (aset channel channel-slot-process - ((aget remote remote-slot-events) - #(let [^objects remote (aget channel channel-slot-remote)] - (channel-ready channel (enter (aget remote remote-slot-peer)))) - #(let [^objects remote (aget channel channel-slot-remote)] - (aset channel channel-slot-over true) - (channel-ready channel (enter (aget remote remote-slot-peer)))))) - (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) - (channel-ready channel busy) - (->Channel channel)) - (do (exit peer busy) (step) - (->Failer done (error "Can't connect - remote already up.")))))))) +(defn peer-events [^objects peer] + (let [^objects remote (aget peer peer-slot-remote)] + (aget remote remote-slot-events))) + +(defn peer-root [^objects peer] + (aget peer peer-slot-root)) (defn input-toggle-event [^objects input] (let [^objects remote (aget input input-slot-remote)] @@ -1250,41 +1226,59 @@ T T T -> (EXPR T) (let [[_ _ free _] (frame-ctor frame)] (free id))) -(defn make-remote [^objects peer] - (let [^objects remote (object-array remote-slots)] - (aset remote remote-slot-peer peer) - (aset remote remote-slot-inputs {}) - (aset remote remote-slot-outputs {}) - (aset remote remote-slot-acks (identity 0)) - (aset remote remote-slot-freeze #{}) - (aset remote remote-slot-events - (m/stream - (m/observe - (fn [!] - (let [^objects channel (aget remote remote-slot-channel) - events (aget channel channel-slot-events)] - (aset channel channel-slot-events nil) - (events !)))))) - remote)) - (defn make-peer " Returns a new peer instance for given site, from given definitions and main key and optional extra arguments to the entrypoint. -" [site defs main args] +" [site opts subject defs main args] (let [^objects peer (object-array peer-slots) - ^objects remote (make-remote peer)] + ^objects remote (object-array remote-slots) + events (m/stream (m/observe subject))] (aset peer peer-slot-busy #?(:clj (ReentrantLock.) :cljs false)) + (aset peer peer-slot-remote remote) (aset peer peer-slot-site site) (aset peer peer-slot-defs defs) - (aset peer peer-slot-remote remote) + (aset remote remote-slot-peer peer) + (aset remote remote-slot-inputs {}) + (aset remote remote-slot-outputs {}) + (aset remote remote-slot-acks (identity 0)) + (aset remote remote-slot-freeze #{}) (aset peer peer-slot-root (->> args (eduction (map pure)) (apply dispatch "" ((defs main))) - (make-frame peer nil 0 :client))) peer)) - -(defn peer-root-frame [^objects peer] - (aget peer peer-slot-root)) + (make-frame peer nil 0 :client))) + (aset remote remote-slot-events + (m/stream + (fn [step done] + (let [busy (enter peer) + ^objects remote (aget peer peer-slot-remote)] + (if (nil? (aget remote remote-slot-channel)) + (let [channel (object-array channel-slots)] + (aset remote remote-slot-channel channel) + (aset channel channel-slot-remote remote) + (aset channel channel-slot-step step) + (aset channel channel-slot-done done) + (aset channel channel-slot-busy true) + (aset channel channel-slot-over false) + (aset channel channel-slot-alive (identity 1)) + (aset channel channel-slot-shared {}) + (aset channel channel-slot-writer-opts (channel-writer-opts opts channel)) + (aset channel channel-slot-reader-opts (channel-reader-opts opts channel)) + (aset channel channel-slot-ready (aget peer peer-slot-channel-ready)) + (aset peer peer-slot-channel-ready channel) + (aset channel channel-slot-process + (events + #(let [^objects remote (aget channel channel-slot-remote)] + (channel-ready channel (enter (aget remote remote-slot-peer)))) + #(let [^objects remote (aget channel channel-slot-remote)] + (aset channel channel-slot-over true) + (channel-ready channel (enter (aget remote remote-slot-peer)))))) + (reduce channel-output-sub channel (vals (aget remote remote-slot-outputs))) + (channel-ready channel busy) + (->Channel channel)) + (do (exit peer busy) (step) + (->Failer done (error "Can't connect - remote already up.")))))))) + peer)) (defn subject-at [^objects arr slot] (fn [!] (aset arr slot !) #(aset arr slot nil))) @@ -1369,21 +1363,11 @@ entrypoint. (recur (assoc ret k f) (merge (dissoc left k) (f :get :deps)))) ret))) -(defn client " -Allocates a new client peer and returns a task consuming its return value using given connector as its server -communication channel. `connector` must be a function taking the remote handler as an argument and returning -a task managing the lifecycle of the channel. - -The remote handler is a function taking a subject and returning a flow. The flow emits outgoing events and reads -incoming events on the subject. -" [opts connector defs main & args] - (let [peer (make-peer :client defs main args)] - (m/reduce (comp reduced {}) nil - (m/ap - (m/amb= (m/? (connector (remote-handler opts peer))) - (m/? (m/reduce (constantly nil) (peer-root-frame peer)))))))) - -(defn server " -Allocates a new server peer and returns its remote handler. -" [opts defs main & args] - (remote-handler opts (make-peer :server defs main args))) +(defn peer-sink [peer] + (m/reduce (constantly nil) (peer-root peer))) + +(defn peer-boot [peer handler] + (m/reduce (comp reduced {}) nil + (m/ap + (m/amb= (m/? (handler (peer-events peer))) + (m/? (peer-sink peer)))))) diff --git a/src/hyperfiddle/electric3.cljc b/src/hyperfiddle/electric3.cljc index cca72ec77..d2222a7e9 100644 --- a/src/hyperfiddle/electric3.cljc +++ b/src/hyperfiddle/electric3.cljc @@ -329,21 +329,22 @@ A mount point can be : (defmacro boot-server [opts Main & args] (let [env (merge (lang/normalize-env &env) web-config opts) source (lang/->source env ::Main `(fn [] ($ ~Main ~@args)))] - `(r/server ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers]) - (r/->defs {::Main ~source}) ::Main))) + `(clojure.core/fn [subject#] + (r/peer-events + (r/make-peer :server ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers]) + subject# (r/->defs {::Main ~source}) ::Main nil))))) (defmacro boot-client [opts Main & args] (let [env (merge (lang/normalize-env &env) web-config opts) source (lang/->source env ::Main `(fn [] ($ ~Main ~@args)))] - `(r/client ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers]) - (hyperfiddle.electric-client3/connector hyperfiddle.electric-client3/*ws-server-url*) - (r/->defs {::Main ~source}) ::Main ))) + `(let [[subject# handler#] (hyperfiddle.electric-client3/connector hyperfiddle.electric-client3/*ws-server-url*)] + (r/peer-boot (r/make-peer :client ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers]) + subject# (r/->defs {::Main ~source}) ::Main nil) handler#)))) (defmacro boot-single [opts Main & args] (let [env (merge (lang/normalize-env &env) web-config opts) source (lang/->source env ::Main `(fn [] ($ ~Main ~@args)))] - `(r/client {} (constantly m/never) - (r/->defs {::Main ~source}) ::Main))) + `(r/peer-sink (r/make-peer :client {} nil (r/->defs {::Main ~source}) ::Main nil)))) ;; (cc/defn -snapshot [flow] (->> flow (m/eduction (contrib.data/take-upto (complement #{r/pending}))))) (cc/defn -snapshot [flow] (->> flow (m/eduction (contrib.data/take-upto (comp pos-int? :degree))))) diff --git a/src/hyperfiddle/electric_client3.cljs b/src/hyperfiddle/electric_client3.cljs index 7cd973362..9d8714564 100644 --- a/src/hyperfiddle/electric_client3.cljs +++ b/src/hyperfiddle/electric_client3.cljs @@ -74,11 +74,6 @@ (send! ws "HEARTBEAT") (cb msg)))) -(defn ws-subject [ws] - (fn [cb] - (set! (.-onmessage ws) (comp (handle-hf-heartbeat ws cb) payload)) - #(set! (.-onmessage ws) nil))) - (defn fib-iter [[a b]] (case b 0 [1 1] @@ -117,42 +112,48 @@ visible!)) (defn connector [url] - (fn [handler] - (m/sp - (loop [delays retry-delays] - (.log js/console "Connecting...") - (when-some [[delay & delays] - (if-some [ws (m/? (connect url))] - (when-some [{:keys [code] :as info} - (try - (m/? (m/race (send-all ws (handler (ws-subject ws))) - (wait-for-close ws))) - (finally - (when-not (= (.-CLOSED js/WebSocket) (.-readyState ws)) - (.close ws) (m/? (m/compel wait-for-close)))))] - (when (case code ; https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1 - (1000 1001) (do (js/console.debug (str "Electric websocket disconnected - " code)) true) - (1005 1006) (do (js/console.log "Electric Websocket connection lost.") true) - (1008) (throw (ex-info "Stale Electric client" {:hyperfiddle.electric/type ::stale-client})) - (1012) ; Incompatible client. Do not attempt to reconnect (it would fail again) - (js/console.error (str "A mismatch between Electric client and server's programs was detected." - "\nThe connection was closed. Refresh the page to attempt a reconnect." - "\nCommonly, in local dev envs, this is a stale browser tab auto-reconnecting, or the clj and cljs REPLs are out of sync due to evaluating an Electric def in one process but not the other." - "\nThis should not happen in prod. See `https://github.com/hyperfiddle/electric-starter-app/` for a reference configuration.")) - (1013) ; server timeout - The WS spec defines 1011 - arbitrary server error, - ; and 1015 - TLS exception. 1012, 1013, and 1014 are undefined. We - ; pick 1013 for "Server closed the connection because it didn't hear of - ; this client for too long". - (do (js/console.log "Electric server timed out, considering this Electric client inactive.") - true) - ; else - (do (js/console.log (str "Electric Websocket disconnected for an unexpected reason - " (pr-str info))) - true)) - (m/? (wait-for-window-to-be-visible)) - (seq retry-delays))) - (do (.log js/console "Electric client failed to connect to Electric server.") delays))] - (.log js/console (str "Next attempt in " (/ delay 1000) " seconds.")) - (recur (m/? (m/sleep delay delays)))))))) + (let [state (object-array 1)] + [(fn [cb] + (let [ws (aget state 0)] + (set! (.-onmessage ws) (comp (handle-hf-heartbeat ws cb) payload)) + #(set! (.-onmessage ws) nil))) + (fn [events] + (m/sp + (loop [delays retry-delays] + (.log js/console "Connecting...") + (when-some [[delay & delays] + (if-some [ws (m/? (connect url))] + (when-some [{:keys [code] :as info} + (try + (aset state 0 ws) + (m/? (m/race (send-all ws events) (wait-for-close ws))) + (finally + (aset state 0 nil) + (when-not (= (.-CLOSED js/WebSocket) (.-readyState ws)) + (.close ws) (m/? (m/compel (wait-for-close ws))))))] + (when (case code ; https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1 + (1000 1001) (do (js/console.debug (str "Electric websocket disconnected - " code)) true) + (1005 1006) (do (js/console.log "Electric Websocket connection lost.") true) + (1008) (throw (ex-info "Stale Electric client" {:hyperfiddle.electric/type ::stale-client})) + (1012) ; Incompatible client. Do not attempt to reconnect (it would fail again) + (js/console.error (str "A mismatch between Electric client and server's programs was detected." + "\nThe connection was closed. Refresh the page to attempt a reconnect." + "\nCommonly, in local dev envs, this is a stale browser tab auto-reconnecting, or the clj and cljs REPLs are out of sync due to evaluating an Electric def in one process but not the other." + "\nThis should not happen in prod. See `https://github.com/hyperfiddle/electric-starter-app/` for a reference configuration.")) + (1013) ; server timeout - The WS spec defines 1011 - arbitrary server error, + ; and 1015 - TLS exception. 1012, 1013, and 1014 are undefined. We + ; pick 1013 for "Server closed the connection because it didn't hear of + ; this client for too long". + (do (js/console.log "Electric server timed out, considering this Electric client inactive.") + true) + ; else + (do (js/console.log (str "Electric Websocket disconnected for an unexpected reason - " (pr-str info))) + true)) + (m/? (wait-for-window-to-be-visible)) + (seq retry-delays))) + (do (.log js/console "Electric client failed to connect to Electric server.") delays))] + (.log js/console (str "Next attempt in " (/ delay 1000) " seconds.")) + (recur (m/? (m/sleep delay delays)))))))])) (defn reload-when-stale [task] (fn [s f] diff --git a/src/hyperfiddle/electric_local_def3.cljc b/src/hyperfiddle/electric_local_def3.cljc index be093e454..48305aea5 100644 --- a/src/hyperfiddle/electric_local_def3.cljc +++ b/src/hyperfiddle/electric_local_def3.cljc @@ -42,27 +42,27 @@ (ca/is conf map? "provide config map as first argument") `(r/->defs {::Main ~(lang/->source (->env &env conf) ::Main `(e/fn [] (do ~@body)))})) -(defn half-conn [in-cb out-cb handler] +(defn half-conn [out-cb latency events] (m/reduce (constantly nil) - (m/ap (let [v (m/?> (m/stream (handler (fn [!] (in-cb !) #()))))] + (m/ap (let [v (m/?> (m/zip {} latency events))] ((m/? out-cb) v))))) -(defn full-conn [server-handler client-handler] +(defn run-local [[inbound-latency outbound-latency] defs main] (let [s->c (m/dfv) - c->s (m/dfv)] + c->s (m/dfv) + client (r/make-peer :client {} (fn [!] (s->c !) #()) defs main nil) + server (r/make-peer :server {} (fn [!] (c->s !) #()) defs main nil)] (m/join {} - (half-conn c->s s->c server-handler) - (half-conn s->c c->s client-handler)))) + (half-conn s->c inbound-latency (r/peer-events server)) + (r/peer-boot client (partial half-conn c->s outbound-latency))))) -(defn run-local [defs main] - (r/client {} - (fn [handler] - (full-conn (r/server {} defs main) handler)) defs main)) +(defn run-single [defs main] + (r/peer-sink (r/make-peer :client {} nil defs main nil))) -(def run-single (partial r/client {} (fn [_] m/never))) +(def no-latency [(m/seed (repeat nil)) (m/seed (repeat nil))]) (defmacro local {:style/indent 1} [conf & body] - `(run-local (main ~conf ~@body) ::Main)) + `(run-local ~(::lang/remote-latency conf `no-latency) (main ~conf ~@body) ::Main)) (defmacro single {:style/indent 1} [conf & body] `(run-single (main ~conf ~@body) ::Main)) \ No newline at end of file diff --git a/test/hyperfiddle/electric/impl/mount_point_test.cljc b/test/hyperfiddle/electric/impl/mount_point_test.cljc index 5c8953246..c4a08f278 100644 --- a/test/hyperfiddle/electric/impl/mount_point_test.cljc +++ b/test/hyperfiddle/electric/impl/mount_point_test.cljc @@ -35,7 +35,7 @@ (deftest sibling-tags (let [q (queue) - _ (r/make-peer :client + _ (r/make-peer :client {} nil {:root (fn ([] {0 (r/ctor :root 0)}) ([idx] (case idx @@ -79,7 +79,7 @@ (deftest sibling-tags-insert-after-read (let [q (queue) - _ (r/make-peer :client + _ (r/make-peer :client {} nil {:root (fn ([] {0 (r/ctor :root 0)}) ([idx] (case idx @@ -104,8 +104,8 @@ (deftest cousin-tags-insert-after-read (let [q (queue) _ ((m/reduce (constantly nil) - (r/peer-root-frame - (r/make-peer :client + (r/peer-root + (r/make-peer :client {} nil {:root (fn ([] {0 (r/ctor :root 0)}) ([idx] (case idx diff --git a/test/hyperfiddle/electric/impl/runtime3_test.cljc b/test/hyperfiddle/electric/impl/runtime3_test.cljc index f2d449a8a..909c9bfe1 100644 --- a/test/hyperfiddle/electric/impl/runtime3_test.cljc +++ b/test/hyperfiddle/electric/impl/runtime3_test.cljc @@ -11,22 +11,22 @@ cb (fn [e] #?(:clj (.printStackTrace ^Throwable e) :cljs (.error js/console e))))) -(defmacro peer [site form] - `(r/make-peer ~site +(defmacro peer [site events form] + `(r/make-peer ~site {} ~events {::Main ~(l/compile ::Main form (assoc (l/normalize-env &env) ::l/peers {:client :clj, :server :clj}))} ::Main nil)) (tests - (on-diff! rcf/tap (r/peer-root-frame (peer :client "hello electric"))) + (on-diff! rcf/tap (r/peer-root (peer :client nil "hello electric"))) % := {:grow 1, :degree 1, :shrink 0, :permutation {}, :change {0 "hello electric"}, :freeze #{0}} % := nil) #?(:clj ; FIXME fails in cljs (tests (def !x (atom :foo)) - (on-diff! rcf/tap (r/peer-root-frame (peer :client (e/watch !x)))) + (on-diff! rcf/tap (r/peer-root (peer :client nil (e/watch !x)))) % := {:degree 1, :permutation {}, :grow 1, :shrink 0, :change {0 :foo}, :freeze #{}} (reset! !x :bar) % := {:degree 1, :permutation {}, :grow 0, :shrink 0, :change {0 :bar}, :freeze #{}})) @@ -35,8 +35,8 @@ (tests (def !x (atom false)) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (if (e/watch !x) "foo" "bar")))) % := {:degree 1, :permutation {}, :grow 1, :shrink 0, :change {0 "bar"}, :freeze #{0}} (swap! !x not) @@ -46,8 +46,8 @@ (tests (def !bar (atom :bar)) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/amb :foo (e/watch !bar) :baz)))) % := {:degree 3, :permutation {}, :grow 3, :shrink 0, :change {0 :foo, 1 :bar, 2 :baz}, :freeze #{0 2}} (reset! !bar :BAR) @@ -57,8 +57,8 @@ (tests (def !xs (atom [0 1 2])) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/diff-by identity (e/watch !xs))))) % := {:degree 3, :permutation {}, :grow 3, :shrink 0, :change {0 0, 1 1, 2 2}, :freeze #{}} (swap! !xs conj 3) @@ -68,8 +68,8 @@ (tests (def !xs (atom [0 1 2])) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/cursor [x (e/diff-by identity (e/watch !xs))] (+ x x))))) % := {:degree 3, :permutation {}, :grow 3, :shrink 0, :change {0 0, 1 2, 2 4}, :freeze #{}} (swap! !xs conj 3) @@ -81,8 +81,8 @@ (def !fizz (atom "Fizz")) (def !buzz (atom "Buzz")) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/client (let [fizz (e/watch !fizz) ; i/fixed + m/watch + e/join buzz (e/watch !buzz) @@ -109,8 +109,8 @@ (atom [{:kind "cow" :personality "stoic"} {:kind "horse" :personality "skittish"}])) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (let [ks #{:kind}] (e/cursor [animal (e/diff-by identity (e/watch !animals)) personality (e/diff-by identity (e/watch !personalities))] @@ -131,8 +131,8 @@ (def !x (atom "hello")) (def !y (atom "electric")) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/as-vec (e/amb (e/watch !x) (e/watch !y)))))) % := {:degree 1, :permutation {}, :grow 1, :shrink 0, :change {0 ["hello" "electric"]}, :freeze #{}} (reset! !y "world") @@ -142,8 +142,8 @@ (tests (def !n (atom 3)) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/for-by identity [x (range (e/watch !n)) y (range x)] [x y])))) @@ -155,8 +155,8 @@ (tests (def !x (atom 0)) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (e/drain (rcf/tap (e/watch !x)))))) % := 0 % := {:degree 0, :permutation {}, :grow 0, :shrink 0, :change {}, :freeze #{}} @@ -166,8 +166,8 @@ (tests (def !x (atom 0)) (on-diff! rcf/tap - (r/peer-root-frame - (peer :client + (r/peer-root + (peer :client nil (let [Foo (e/fn [x] (e/fn [] x)) x (e/watch !x)] (= (e/$ Foo x) (e/$ Foo x)))))) @@ -175,108 +175,113 @@ % := nil) (tests - (def client (peer :client (rcf/tap (e/server :foo)))) - (def server (peer :server (rcf/tap (e/server :foo)))) - (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) - (def c-ps - (((r/remote-handler {} client) - (fn [!] - (def s->c !) - #(prn :dispose))) - #(rcf/tap :step-c) #(prn :done-c))) + (def client (peer :client + (fn [!] + (def s->c !) + #(prn :dispose)) + (rcf/tap (e/server :foo)))) + (def server (peer :server + (fn [!] + (def c->s !) + #(prn :dispose)) + (rcf/tap (e/server :foo)))) + (def r-ps ((m/reduce (constantly nil) (r/peer-root client)) {} {})) + (def c-ps ((r/peer-events client) #(rcf/tap :step-c) #(prn :done-c))) % := :step-c - (def s-ps - (((r/remote-handler {} server) - (fn [!] - (def c->s !) - #(prn :dispose))) - #(rcf/tap :step-s) #(prn :done-s))) + (def s-ps ((r/peer-events server) #(rcf/tap :step-s) #(prn :done-s))) % := :step-s (c->s @c-ps) (s->c @s-ps) - % := :foo - % := :step-c - (c->s @c-ps) - % := :step-s) - -(tests - (def client (peer :client (rcf/tap (e/client (e/$ (e/server (e/fn [] :foo))))))) - (def server (peer :server (rcf/tap (e/client (e/$ (e/server (e/fn [] :foo))))))) - (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) - (def c-ps - (((r/remote-handler {} client) - (fn [!] - (def s->c !) - #(prn :dispose))) - #(rcf/tap :step-c) #(prn :done-c))) - % := :step-c - (def s-ps - (((r/remote-handler {} server) - (fn [!] - (def c->s !) - #(prn :dispose))) - #(rcf/tap :step-s) #(prn :done-s))) - % := :step-s - (c->s @c-ps) + (hash-set % % %) := #{:foo :step-c :step-s} + ;; TODO investigate why two consecutive messages (s->c @s-ps) - % := :foo - % := :step-c - (c->s @c-ps) - % := :step-s) - -(tests - (def client (peer :client (rcf/tap (e/client (e/$ (e/server (let [foo :foo] (e/fn [] foo)))))))) - (def server (peer :server (rcf/tap (e/client (e/$ (e/server (let [foo :foo] (e/fn [] foo)))))))) - (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) - (def c-ps - (((r/remote-handler {} client) - (fn [!] - (def s->c !) - #(prn :dispose))) - #(rcf/tap :step-c) #(prn :done-c))) - % := :step-c - (def s-ps - (((r/remote-handler {} server) - (fn [!] - (def c->s !) - #(prn :dispose))) - #(rcf/tap :step-s) #(prn :done-s))) - % := :step-s - (c->s @c-ps) - (s->c @s-ps) - % := :step-c (c->s @c-ps) % := :step-s - (s->c @s-ps) - % := :foo - % := :step-c - (c->s @c-ps) - % := :step-s) + (s->c @s-ps)) -(tests - (def client (peer :client (rcf/tap (e/join (e/pure (let [x (e/server 2)] x)))))) - (def server (peer :server (rcf/tap (e/join (e/pure (let [x (e/server 2)] x)))))) - (def r-ps ((m/reduce (constantly nil) (r/peer-root-frame client)) {} {})) - (def c-ps - (((r/remote-handler {} client) - (fn [!] - (def s->c !) - #(prn :dispose))) - #(rcf/tap :step-c) #(rcf/tap :done-c))) - % := :step-c - (def s-ps - (((r/remote-handler {} server) - (fn [!] - (def c->s !) - #(prn :dispose))) - #(rcf/tap :step-s) #(rcf/tap :done-s))) - % := :step-s - (c->s @c-ps) - (s->c @s-ps) - % := 2 - % := :step-c - (c->s @c-ps) - % := :step-s) +#?(:clj ; FIXME fails in cljs + (tests + (def client (peer :client + (fn [!] + (def s->c !) + #(prn :dispose)) + (rcf/tap (e/client (e/$ (e/server (e/fn [] :foo))))))) + (def server (peer :server + (fn [!] + (def c->s !) + #(prn :dispose)) + (rcf/tap (e/client (e/$ (e/server (e/fn [] :foo))))))) + (def r-ps ((m/reduce (constantly nil) (r/peer-root client)) {} {})) + (def c-ps ((r/peer-events client) #(rcf/tap :step-c) #(prn :done-c))) + % := :step-c + (def s-ps ((r/peer-events server) #(rcf/tap :step-s) #(prn :done-s))) + % := :step-s + (c->s @c-ps) + (s->c @s-ps) + (hash-set % % %) := #{:foo :step-c :step-s} + ;; TODO investigate why two consecutive messages + (s->c @s-ps) + (c->s @c-ps) + % := :step-s + (s->c @s-ps))) + +#?(:clj ; FIXME fails in cljs + (tests + (def client (peer :client + (fn [!] + (def s->c !) + #(prn :dispose)) + (rcf/tap (e/client (e/$ (e/server (let [foo :foo] (e/fn [] foo)))))))) + (def server (peer :server + (fn [!] + (def c->s !) + #(prn :dispose)) + (rcf/tap (e/client (e/$ (e/server (let [foo :foo] (e/fn [] foo)))))))) + (def r-ps ((m/reduce (constantly nil) (r/peer-root client)) {} {})) + (def c-ps ((r/peer-events client) #(rcf/tap :step-c) #(prn :done-c))) + % := :step-c + (def s-ps ((r/peer-events server) #(rcf/tap :step-s) #(prn :done-s))) + % := :step-s + (c->s @c-ps) + (s->c @s-ps) + (hash-set % %) := #{:step-c :step-s} + ;; TODO investigate why two consecutive messages + (s->c @s-ps) + (c->s @c-ps) + % := :step-s + (s->c @s-ps) + (hash-set % % %) := #{:foo :step-s :step-c} + ;; TODO investigate why two consecutive messages + (s->c @s-ps) + (c->s @c-ps) + % := :step-s + (s->c @s-ps))) + +#?(:clj ; FIXME fails in cljs + (tests + (def client (peer :client + (fn [!] + (def s->c !) + #(prn :dispose)) + (rcf/tap (e/join (e/pure (let [x (e/server 2)] x)))))) + (def server (peer :server + (fn [!] + (def c->s !) + #(prn :dispose)) + (rcf/tap (e/join (e/pure (let [x (e/server 2)] x)))))) + (def r-ps ((m/reduce (constantly nil) (r/peer-root client)) {} {})) + (def c-ps ((r/peer-events client) #(rcf/tap :step-c) #(rcf/tap :done-c))) + % := :step-c + (def s-ps ((r/peer-events server) #(rcf/tap :step-s) #(rcf/tap :done-s))) + % := :step-s + (c->s @c-ps) + (s->c @s-ps) + (hash-set % % %) := #{2 :step-c :step-s} + ;; TODO investigate why two consecutive messages + (s->c @s-ps) + (c->s @c-ps) + % := :step-s + (s->c @s-ps))) (tests (set (keys (r/->defs {:a (fn [_ _] {:b (fn [_ _] {:a (fn [_ _])})})}))) := #{:a :b})