Skip to content

Commit

Permalink
always publish peer outbound events
Browse files Browse the repository at this point in the history
  • Loading branch information
leonoel committed Sep 18, 2024
1 parent a9b14dd commit 7bc2376
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 275 deletions.
2 changes: 1 addition & 1 deletion src/hyperfiddle/electric/impl/mount_point.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ Mounting a block generates a grow for each active item having this block's frame
(inc (aget reader reader-slot-alive)))
(aset call call-slot-process
((if-some [slot (call-slot call)]
(r/incseq (r/peer-root-frame (aget state slot-peer)) slot)
(r/incseq (r/peer-root (aget state slot-peer)) slot)
(f/flow (r/invariant (aget ^objects (aget call call-slot-children) block-slot-frame))))
#(let [^objects reader (aget call call-slot-reader)
^objects state (aget reader reader-slot-state)
Expand Down
168 changes: 76 additions & 92 deletions src/hyperfiddle/electric/impl/runtime3.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
(def peer-slots 6)

(def remote-slot-peer 0)
(def remote-slot-events 1)
(def remote-slot-input 1)
(def remote-slot-channel 2)
(def remote-slot-inputs 3) ;; hash map of remote ports currently pushed to local peer, indexed by port slot
(def remote-slot-outputs 4) ;; hash map of local port subscriptions pushed to remote peer, indexed by port slot
Expand All @@ -40,7 +40,8 @@
(def remote-slot-freeze 7)
(def remote-slot-acks 8)
(def remote-slot-toggle 9)
(def remote-slots 10)
(def remote-slot-events 10)
(def remote-slots 11)

(def ack-slot-prev 0)
(def ack-slot-next 1)
Expand All @@ -58,18 +59,17 @@
(def output-slots 7)

(def channel-slot-remote 0)
(def channel-slot-events 1)
(def channel-slot-process 2)
(def channel-slot-busy 3)
(def channel-slot-over 4)
(def channel-slot-step 5)
(def channel-slot-done 6)
(def channel-slot-alive 7)
(def channel-slot-ready 8)
(def channel-slot-shared 9)
(def channel-slot-reader-opts 10)
(def channel-slot-writer-opts 11)
(def channel-slots 12)
(def channel-slot-process 1)
(def channel-slot-busy 2)
(def channel-slot-over 3)
(def channel-slot-step 4)
(def channel-slot-done 5)
(def channel-slot-alive 6)
(def channel-slot-ready 7)
(def channel-slot-shared 8)
(def channel-slot-reader-opts 9)
(def channel-slot-writer-opts 10)
(def channel-slots 11)

(def port-slot-slot 0)
(def port-slot-site 1)
Expand Down Expand Up @@ -313,7 +313,7 @@ T T T -> (EXPR T)
(defn peer-site [^objects peer]
(aget peer peer-slot-site))

(defn peer-root [^objects peer key]
(defn peer-resolve [^objects peer key]
(let [defs (peer-defs peer)]
(when-not (contains? defs key) (throw (error (str (pr-str key) " not defined"))))
(defs key)))
Expand All @@ -322,7 +322,7 @@ T T T -> (EXPR T)
"Returns the cdef of given constructor."
{:tag Cdef}
[^objects peer key idx]
((peer-root peer key) idx))
((peer-resolve peer key) idx))

(defn port-flow [^objects port]
(aget port port-slot-flow))
Expand Down Expand Up @@ -405,7 +405,7 @@ T T T -> (EXPR T)
(defn resolve
"Returns the root binding of electric var matching given keyword."
[^Frame frame key]
((peer-root (.-peer frame) key)))
((peer-resolve (.-peer frame) key)))

(defn frame-site
"Returns the site of given frame."
Expand Down Expand Up @@ -800,7 +800,9 @@ T T T -> (EXPR T)
(if-some [prev (aget s input-sub-slot-diff)]
(aset s input-sub-slot-diff (i/combine prev diff))
(let [step (aget s input-sub-slot-step)]
(aset s input-sub-slot-diff diff) (step)))
(aset s input-sub-slot-diff diff)
;; TODO this can nullify slot-next
(step)))
(let [n (aget s input-sub-slot-next)]
(when-not (identical? n sub) (recur n))))))
remote)
Expand Down Expand Up @@ -1105,38 +1107,12 @@ T T T -> (EXPR T)
(fn [_]
(->Failure :unserializable)))})})

(defn remote-handler [opts ^objects peer]
(fn [events]
(fn [step done]
(let [busy (enter peer)
^objects remote (aget peer peer-slot-remote)]
(if (nil? (aget remote remote-slot-channel))
(let [channel (object-array channel-slots)]
(aset remote remote-slot-channel channel)
(aset channel channel-slot-remote remote)
(aset channel channel-slot-step step)
(aset channel channel-slot-done done)
(aset channel channel-slot-busy true)
(aset channel channel-slot-over false)
(aset channel channel-slot-events events)
(aset channel channel-slot-alive (identity 1))
(aset channel channel-slot-shared {})
(aset channel channel-slot-writer-opts (channel-writer-opts opts channel))
(aset channel channel-slot-reader-opts (channel-reader-opts opts channel))
(aset channel channel-slot-ready (aget peer peer-slot-channel-ready))
(aset peer peer-slot-channel-ready channel)
(aset channel channel-slot-process
((aget remote remote-slot-events)
#(let [^objects remote (aget channel channel-slot-remote)]
(channel-ready channel (enter (aget remote remote-slot-peer))))
#(let [^objects remote (aget channel channel-slot-remote)]
(aset channel channel-slot-over true)
(channel-ready channel (enter (aget remote remote-slot-peer))))))
(reduce channel-output-sub channel (vals (aget remote remote-slot-outputs)))
(channel-ready channel busy)
(->Channel channel))
(do (exit peer busy) (step)
(->Failer done (error "Can't connect - remote already up."))))))))
(defn peer-events [^objects peer]
(let [^objects remote (aget peer peer-slot-remote)]
(aget remote remote-slot-events)))

(defn peer-root [^objects peer]
(aget peer peer-slot-root))

(defn input-toggle-event [^objects input]
(let [^objects remote (aget input input-slot-remote)]
Expand Down Expand Up @@ -1250,41 +1226,59 @@ T T T -> (EXPR T)
(let [[_ _ free _] (frame-ctor frame)]
(free id)))

(defn make-remote [^objects peer]
(let [^objects remote (object-array remote-slots)]
(aset remote remote-slot-peer peer)
(aset remote remote-slot-inputs {})
(aset remote remote-slot-outputs {})
(aset remote remote-slot-acks (identity 0))
(aset remote remote-slot-freeze #{})
(aset remote remote-slot-events
(m/stream
(m/observe
(fn [!]
(let [^objects channel (aget remote remote-slot-channel)
events (aget channel channel-slot-events)]
(aset channel channel-slot-events nil)
(events !))))))
remote))

(defn make-peer "
Returns a new peer instance for given site, from given definitions and main key and optional extra arguments to the
entrypoint.
" [site defs main args]
" [site opts subject defs main args]
(let [^objects peer (object-array peer-slots)
^objects remote (make-remote peer)]
^objects remote (object-array remote-slots)
events (m/stream (m/observe subject))]
(aset peer peer-slot-busy #?(:clj (ReentrantLock.) :cljs false))
(aset peer peer-slot-remote remote)
(aset peer peer-slot-site site)
(aset peer peer-slot-defs defs)
(aset peer peer-slot-remote remote)
(aset remote remote-slot-peer peer)
(aset remote remote-slot-inputs {})
(aset remote remote-slot-outputs {})
(aset remote remote-slot-acks (identity 0))
(aset remote remote-slot-freeze #{})
(aset peer peer-slot-root
(->> args
(eduction (map pure))
(apply dispatch "<root>" ((defs main)))
(make-frame peer nil 0 :client))) peer))

(defn peer-root-frame [^objects peer]
(aget peer peer-slot-root))
(make-frame peer nil 0 :client)))
(aset remote remote-slot-events
(m/stream
(fn [step done]
(let [busy (enter peer)
^objects remote (aget peer peer-slot-remote)]
(if (nil? (aget remote remote-slot-channel))
(let [channel (object-array channel-slots)]
(aset remote remote-slot-channel channel)
(aset channel channel-slot-remote remote)
(aset channel channel-slot-step step)
(aset channel channel-slot-done done)
(aset channel channel-slot-busy true)
(aset channel channel-slot-over false)
(aset channel channel-slot-alive (identity 1))
(aset channel channel-slot-shared {})
(aset channel channel-slot-writer-opts (channel-writer-opts opts channel))
(aset channel channel-slot-reader-opts (channel-reader-opts opts channel))
(aset channel channel-slot-ready (aget peer peer-slot-channel-ready))
(aset peer peer-slot-channel-ready channel)
(aset channel channel-slot-process
(events
#(let [^objects remote (aget channel channel-slot-remote)]
(channel-ready channel (enter (aget remote remote-slot-peer))))
#(let [^objects remote (aget channel channel-slot-remote)]
(aset channel channel-slot-over true)
(channel-ready channel (enter (aget remote remote-slot-peer))))))
(reduce channel-output-sub channel (vals (aget remote remote-slot-outputs)))
(channel-ready channel busy)
(->Channel channel))
(do (exit peer busy) (step)
(->Failer done (error "Can't connect - remote already up."))))))))
peer))

(defn subject-at [^objects arr slot]
(fn [!] (aset arr slot !) #(aset arr slot nil)))
Expand Down Expand Up @@ -1369,21 +1363,11 @@ entrypoint.
(recur (assoc ret k f) (merge (dissoc left k) (f :get :deps))))
ret)))

(defn client "
Allocates a new client peer and returns a task consuming its return value using given connector as its server
communication channel. `connector` must be a function taking the remote handler as an argument and returning
a task managing the lifecycle of the channel.
The remote handler is a function taking a subject and returning a flow. The flow emits outgoing events and reads
incoming events on the subject.
" [opts connector defs main & args]
(let [peer (make-peer :client defs main args)]
(m/reduce (comp reduced {}) nil
(m/ap
(m/amb= (m/? (connector (remote-handler opts peer)))
(m/? (m/reduce (constantly nil) (peer-root-frame peer))))))))

(defn server "
Allocates a new server peer and returns its remote handler.
" [opts defs main & args]
(remote-handler opts (make-peer :server defs main args)))
(defn peer-sink [peer]
(m/reduce (constantly nil) (peer-root peer)))

(defn peer-boot [peer handler]
(m/reduce (comp reduced {}) nil
(m/ap
(m/amb= (m/? (handler (peer-events peer)))
(m/? (peer-sink peer))))))
15 changes: 8 additions & 7 deletions src/hyperfiddle/electric3.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -329,21 +329,22 @@ A mount point can be :
(defmacro boot-server [opts Main & args]
(let [env (merge (lang/normalize-env &env) web-config opts)
source (lang/->source env ::Main `(fn [] ($ ~Main ~@args)))]
`(r/server ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers])
(r/->defs {::Main ~source}) ::Main)))
`(clojure.core/fn [subject#]
(r/peer-events
(r/make-peer :server ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers])
subject# (r/->defs {::Main ~source}) ::Main nil)))))

(defmacro boot-client [opts Main & args]
(let [env (merge (lang/normalize-env &env) web-config opts)
source (lang/->source env ::Main `(fn [] ($ ~Main ~@args)))]
`(r/client ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers])
(hyperfiddle.electric-client3/connector hyperfiddle.electric-client3/*ws-server-url*)
(r/->defs {::Main ~source}) ::Main )))
`(let [[subject# handler#] (hyperfiddle.electric-client3/connector hyperfiddle.electric-client3/*ws-server-url*)]
(r/peer-boot (r/make-peer :client ~(select-keys opts [:cognitect.transit/read-handlers :cognitect.transit/write-handlers])
subject# (r/->defs {::Main ~source}) ::Main nil) handler#))))

(defmacro boot-single [opts Main & args]
(let [env (merge (lang/normalize-env &env) web-config opts)
source (lang/->source env ::Main `(fn [] ($ ~Main ~@args)))]
`(r/client {} (constantly m/never)
(r/->defs {::Main ~source}) ::Main)))
`(r/peer-sink (r/make-peer :client {} nil (r/->defs {::Main ~source}) ::Main nil))))

;; (cc/defn -snapshot [flow] (->> flow (m/eduction (contrib.data/take-upto (complement #{r/pending})))))
(cc/defn -snapshot [flow] (->> flow (m/eduction (contrib.data/take-upto (comp pos-int? :degree)))))
Expand Down
83 changes: 42 additions & 41 deletions src/hyperfiddle/electric_client3.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@
(send! ws "HEARTBEAT")
(cb msg))))

(defn ws-subject [ws]
(fn [cb]
(set! (.-onmessage ws) (comp (handle-hf-heartbeat ws cb) payload))
#(set! (.-onmessage ws) nil)))

(defn fib-iter [[a b]]
(case b
0 [1 1]
Expand Down Expand Up @@ -117,42 +112,48 @@
visible!))

(defn connector [url]
(fn [handler]
(m/sp
(loop [delays retry-delays]
(.log js/console "Connecting...")
(when-some [[delay & delays]
(if-some [ws (m/? (connect url))]
(when-some [{:keys [code] :as info}
(try
(m/? (m/race (send-all ws (handler (ws-subject ws)))
(wait-for-close ws)))
(finally
(when-not (= (.-CLOSED js/WebSocket) (.-readyState ws))
(.close ws) (m/? (m/compel wait-for-close)))))]
(when (case code ; https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
(1000 1001) (do (js/console.debug (str "Electric websocket disconnected - " code)) true)
(1005 1006) (do (js/console.log "Electric Websocket connection lost.") true)
(1008) (throw (ex-info "Stale Electric client" {:hyperfiddle.electric/type ::stale-client}))
(1012) ; Incompatible client. Do not attempt to reconnect (it would fail again)
(js/console.error (str "A mismatch between Electric client and server's programs was detected."
"\nThe connection was closed. Refresh the page to attempt a reconnect."
"\nCommonly, in local dev envs, this is a stale browser tab auto-reconnecting, or the clj and cljs REPLs are out of sync due to evaluating an Electric def in one process but not the other."
"\nThis should not happen in prod. See `https://github.com/hyperfiddle/electric-starter-app/` for a reference configuration."))
(1013) ; server timeout - The WS spec defines 1011 - arbitrary server error,
; and 1015 - TLS exception. 1012, 1013, and 1014 are undefined. We
; pick 1013 for "Server closed the connection because it didn't hear of
; this client for too long".
(do (js/console.log "Electric server timed out, considering this Electric client inactive.")
true)
; else
(do (js/console.log (str "Electric Websocket disconnected for an unexpected reason - " (pr-str info)))
true))
(m/? (wait-for-window-to-be-visible))
(seq retry-delays)))
(do (.log js/console "Electric client failed to connect to Electric server.") delays))]
(.log js/console (str "Next attempt in " (/ delay 1000) " seconds."))
(recur (m/? (m/sleep delay delays))))))))
(let [state (object-array 1)]
[(fn [cb]
(let [ws (aget state 0)]
(set! (.-onmessage ws) (comp (handle-hf-heartbeat ws cb) payload))
#(set! (.-onmessage ws) nil)))
(fn [events]
(m/sp
(loop [delays retry-delays]
(.log js/console "Connecting...")
(when-some [[delay & delays]
(if-some [ws (m/? (connect url))]
(when-some [{:keys [code] :as info}
(try
(aset state 0 ws)
(m/? (m/race (send-all ws events) (wait-for-close ws)))
(finally
(aset state 0 nil)
(when-not (= (.-CLOSED js/WebSocket) (.-readyState ws))
(.close ws) (m/? (m/compel (wait-for-close ws))))))]
(when (case code ; https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
(1000 1001) (do (js/console.debug (str "Electric websocket disconnected - " code)) true)
(1005 1006) (do (js/console.log "Electric Websocket connection lost.") true)
(1008) (throw (ex-info "Stale Electric client" {:hyperfiddle.electric/type ::stale-client}))
(1012) ; Incompatible client. Do not attempt to reconnect (it would fail again)
(js/console.error (str "A mismatch between Electric client and server's programs was detected."
"\nThe connection was closed. Refresh the page to attempt a reconnect."
"\nCommonly, in local dev envs, this is a stale browser tab auto-reconnecting, or the clj and cljs REPLs are out of sync due to evaluating an Electric def in one process but not the other."
"\nThis should not happen in prod. See `https://github.com/hyperfiddle/electric-starter-app/` for a reference configuration."))
(1013) ; server timeout - The WS spec defines 1011 - arbitrary server error,
; and 1015 - TLS exception. 1012, 1013, and 1014 are undefined. We
; pick 1013 for "Server closed the connection because it didn't hear of
; this client for too long".
(do (js/console.log "Electric server timed out, considering this Electric client inactive.")
true)
; else
(do (js/console.log (str "Electric Websocket disconnected for an unexpected reason - " (pr-str info)))
true))
(m/? (wait-for-window-to-be-visible))
(seq retry-delays)))
(do (.log js/console "Electric client failed to connect to Electric server.") delays))]
(.log js/console (str "Next attempt in " (/ delay 1000) " seconds."))
(recur (m/? (m/sleep delay delays)))))))]))

(defn reload-when-stale [task]
(fn [s f]
Expand Down
Loading

0 comments on commit 7bc2376

Please sign in to comment.