From 0ff55054ce65797cb1231897dfbd799718a03dc0 Mon Sep 17 00:00:00 2001 From: rhishikesh Date: Sat, 6 Mar 2021 16:32:15 +0530 Subject: [PATCH] Fault tolerant gossip implementation. --- demo/clojure/gossip.clj | 90 ++++++++++++++++++++++++++++++++--------- 1 file changed, 70 insertions(+), 20 deletions(-) diff --git a/demo/clojure/gossip.clj b/demo/clojure/gossip.clj index 2db31e1..9af9fb8 100755 --- a/demo/clojure/gossip.clj +++ b/demo/clojure/gossip.clj @@ -62,21 +62,46 @@ (println input))))) -(defn reply +(def node-id (atom "")) +(def node-nbrs (atom [])) +(def messages (atom #{})) +(def gossips (atom {})) +(def next-message-id (atom 0)) + + +(defn- reply ([src dest body] {:src src :dest dest :body body})) -(def node-id (atom "")) -(def node-nbrs (atom [])) -(def messages (atom #{})) +(defn- send! + ([input] + (-> input + generate-json + printout)) + ([src dest body] + (send! (reply src dest body)))) + + +(defn- gossip-loop + [] + (future + (try + (loop [] + (doseq [g @gossips] + (send! (second g))) + (Thread/sleep 1000) + (recur)) + (catch Exception e + (printerr e))))) + (defn- process-request [input] (let [body (:body input) - r-body {:msg_id (rand-int 100) + r-body {:msg_id (swap! next-message-id inc) :in_reply_to (:msg_id body)} nid (:node_id body)] (case (:type body) @@ -93,29 +118,52 @@ :topology (keyword @node-id)])) (reply @node-id - (:src input) - (assoc r-body - :type "topology_ok"))) + (:src input) + (assoc r-body + :type "topology_ok"))) "broadcast" (do - ;; if we already have the message, it means - ;; this is a broadcast, dont rebroadcast (when-not (@messages (:message body)) - (doseq [n @node-nbrs] - ;; we have to explicitly print this out - ;; because just calling reply only generates a - ;; map which the outer loop prints out - (printout (generate-json (reply @node-id - n - {:type "broadcast" - :message (:message body)}))))) + (doseq [n @node-nbrs + ;; also don't broadcast to sender + :when (not= n (:src input))] + ;; here we need to distinguish between msg-id received + ;; directly from a broadcast message which will be :msg_id + ;; and msg-id received from a peer as part of gossip. + ;; We can receive gossip with msg-id 1 and message 1 + ;; and later receive broadcast with msg-id 1 and message 3 + ;; Also, the msg-id we send here has to be used to + ;; record the message for retries which is then removed + ;; when we receive gossip_ok message + (let [msg-id (or (:msg_id body) + (str "g:" (:g_id body))) + msg {:src @node-id + :dest n + :body {:type "broadcast" + :message (:message body) + :g_id msg-id}}] + (swap! gossips assoc (str n "::" msg-id) msg) + ;; we have to explicitly send this out + ;; because just calling reply only generates a + ;; map which the outer loop prints out + (send! msg)))) (swap! messages conj (:message body)) - (when (:msg_id body) + (if (:msg_id body) (reply @node-id (:src input) (assoc r-body - :type "broadcast_ok")))) + :type "broadcast_ok")) + (when (:g_id body) + (reply @node-id + (:src input) + {:type "gossip_ok" + :g_id (:g_id body)})))) + + "gossip_ok" + (do + (swap! gossips dissoc (str (:src input) "::" (:g_id body))) + nil) "read" (reply @node-id @@ -128,9 +176,11 @@ (defn -main "Read transactions from stdin and send output to stdout" [] + (gossip-loop) (process-stdin (comp printout generate-json process-request parse-json))) + (-main)