diff --git a/src/hyperfiddle/electric_ring_adapter_de.clj b/src/hyperfiddle/electric_ring_adapter_de.clj index 913372de6..2f4bb2d76 100644 --- a/src/hyperfiddle/electric_ring_adapter_de.clj +++ b/src/hyperfiddle/electric_ring_adapter_de.clj @@ -10,7 +10,8 @@ [hyperfiddle.electric.debug :as dbg] [missionary.core :as m] [ring.websocket :as ws]) - (:import missionary.Cancelled)) + (:import missionary.Cancelled + (java.util.concurrent.atomic AtomicInteger))) (def ELECTRIC-CONNECTION-TIMEOUT "Time after which the server will close the socket if it hasn't seen any websocket activity from the client." @@ -88,6 +89,35 @@ (catch Throwable e (f e))) #())) +(defn write-msgs + "Returns a task writing all messages emitted by flow on websocket." + [socket msgs] + (fn [s f] + (let [slot-ps 0 + slot-done 1 + slot-error 2 + slots (object-array 3) + state (AtomicInteger.)] + (letfn [(ready [] + (if (aget slots slot-done) + (if-some [e (aget slots slot-error)] (f e) (s nil)) + (if (nil? (aget slots slot-error)) + (try (send socket @(aget slots slot-ps) ack crash) + (catch Throwable e (crash e))) + (do (try @(aget slots slot-ps) (catch Throwable _)) + (ack))))) + (ack [] (when (zero? (.decrementAndGet state)) (ready))) + (crash [e] + (aset slots slot-done e) + (cancel) (ack)) + (cancel [] ((aget slots slot-ps)))] + (aset slots slot-done false) + (aset slots slot-ps + (msgs #(when (zero? (.incrementAndGet state)) (ready)) + #(do (aset slots slot-done true) + (when (zero? (.incrementAndGet state)) (ready))))) + (ack) cancel)))) + (defn timeout "Throw if `mailbox` haven't got any message after given `time` ms" [mailbox time] @@ -153,7 +183,7 @@ (aset state on-close-slot ((m/join (fn [& _]) (timeout keepalive-mailbox ELECTRIC-CONNECTION-TIMEOUT) - (m/reduce #(write-msg socket %2) nil ((boot-fn ring-req) (r/subject-at state on-message-slot))) + (write-msgs socket ((boot-fn ring-req) (r/subject-at state on-message-slot))) (send-hf-heartbeat ELECTRIC-HEARTBEAT-INTERVAL #(ping socket "HEARTBEAT"))) {} (partial failure socket)))) ; Start Electric process :on-close (fn on-close [_socket _status-code & [_reason]]