Skip to content

Commit

Permalink
fix ring websocket writer
Browse files Browse the repository at this point in the history
  • Loading branch information
leonoel committed Apr 16, 2024
1 parent 7669736 commit 10f606e
Showing 1 changed file with 32 additions and 2 deletions.
34 changes: 32 additions & 2 deletions src/hyperfiddle/electric_ring_adapter_de.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
[hyperfiddle.electric.debug :as dbg]
[missionary.core :as m]
[ring.websocket :as ws])
(:import missionary.Cancelled))
(:import missionary.Cancelled
(java.util.concurrent.atomic AtomicInteger)))

(def ELECTRIC-CONNECTION-TIMEOUT
"Time after which the server will close the socket if it hasn't seen any websocket activity from the client."
Expand Down Expand Up @@ -88,6 +89,35 @@
(catch Throwable e (f e)))
#()))

(defn write-msgs
"Returns a task writing all messages emitted by flow on websocket."
[socket msgs]
(fn [s f]
(let [slot-ps 0
slot-done 1
slot-error 2
slots (object-array 3)
state (AtomicInteger.)]
(letfn [(ready []
(if (aget slots slot-done)
(if-some [e (aget slots slot-error)] (f e) (s nil))
(if (nil? (aget slots slot-error))
(try (send socket @(aget slots slot-ps) ack crash)
(catch Throwable e (crash e)))
(do (try @(aget slots slot-ps) (catch Throwable _))
(ack)))))
(ack [] (when (zero? (.decrementAndGet state)) (ready)))
(crash [e]
(aset slots slot-done e)
(cancel) (ack))
(cancel [] ((aget slots slot-ps)))]
(aset slots slot-done false)
(aset slots slot-ps
(msgs #(when (zero? (.incrementAndGet state)) (ready))
#(do (aset slots slot-done true)
(when (zero? (.incrementAndGet state)) (ready)))))
(ack) cancel))))

(defn timeout
"Throw if `mailbox` haven't got any message after given `time` ms"
[mailbox time]
Expand Down Expand Up @@ -153,7 +183,7 @@
(aset state on-close-slot
((m/join (fn [& _])
(timeout keepalive-mailbox ELECTRIC-CONNECTION-TIMEOUT)
(m/reduce #(write-msg socket %2) nil ((boot-fn ring-req) (r/subject-at state on-message-slot)))
(write-msgs socket ((boot-fn ring-req) (r/subject-at state on-message-slot)))
(send-hf-heartbeat ELECTRIC-HEARTBEAT-INTERVAL #(ping socket "HEARTBEAT")))
{} (partial failure socket)))) ; Start Electric process
:on-close (fn on-close [_socket _status-code & [_reason]]
Expand Down

0 comments on commit 10f606e

Please sign in to comment.