Skip to content

Commit

Permalink
Fault tolerant gossip implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhishikesh-helpshift committed Mar 6, 2021
1 parent a66390c commit 0ff5505
Showing 1 changed file with 70 additions and 20 deletions.
90 changes: 70 additions & 20 deletions demo/clojure/gossip.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,46 @@
(println input)))))


(defn reply
(def node-id (atom ""))
(def node-nbrs (atom []))
(def messages (atom #{}))
(def gossips (atom {}))
(def next-message-id (atom 0))


(defn- reply
([src dest body]
{:src src
:dest dest
:body body}))


(def node-id (atom ""))
(def node-nbrs (atom []))
(def messages (atom #{}))
(defn- send!
([input]
(-> input
generate-json
printout))
([src dest body]
(send! (reply src dest body))))


(defn- gossip-loop
[]
(future
(try
(loop []
(doseq [g @gossips]
(send! (second g)))
(Thread/sleep 1000)
(recur))
(catch Exception e
(printerr e)))))


(defn- process-request
[input]
(let [body (:body input)
r-body {:msg_id (rand-int 100)
r-body {:msg_id (swap! next-message-id inc)
:in_reply_to (:msg_id body)}
nid (:node_id body)]
(case (:type body)
Expand All @@ -93,29 +118,52 @@
:topology
(keyword @node-id)]))
(reply @node-id
(:src input)
(assoc r-body
:type "topology_ok")))
(:src input)
(assoc r-body
:type "topology_ok")))

"broadcast"
(do
;; if we already have the message, it means
;; this is a broadcast, dont rebroadcast
(when-not (@messages (:message body))
(doseq [n @node-nbrs]
;; we have to explicitly print this out
;; because just calling reply only generates a
;; map which the outer loop prints out
(printout (generate-json (reply @node-id
n
{:type "broadcast"
:message (:message body)})))))
(doseq [n @node-nbrs
;; also don't broadcast to sender
:when (not= n (:src input))]
;; here we need to distinguish between msg-id received
;; directly from a broadcast message which will be :msg_id
;; and msg-id received from a peer as part of gossip.
;; We can receive gossip with msg-id 1 and message 1
;; and later receive broadcast with msg-id 1 and message 3
;; Also, the msg-id we send here has to be used to
;; record the message for retries which is then removed
;; when we receive gossip_ok message
(let [msg-id (or (:msg_id body)
(str "g:" (:g_id body)))
msg {:src @node-id
:dest n
:body {:type "broadcast"
:message (:message body)
:g_id msg-id}}]
(swap! gossips assoc (str n "::" msg-id) msg)
;; we have to explicitly send this out
;; because just calling reply only generates a
;; map which the outer loop prints out
(send! msg))))
(swap! messages conj (:message body))
(when (:msg_id body)
(if (:msg_id body)
(reply @node-id
(:src input)
(assoc r-body
:type "broadcast_ok"))))
:type "broadcast_ok"))
(when (:g_id body)
(reply @node-id
(:src input)
{:type "gossip_ok"
:g_id (:g_id body)}))))

"gossip_ok"
(do
(swap! gossips dissoc (str (:src input) "::" (:g_id body)))
nil)

"read"
(reply @node-id
Expand All @@ -128,9 +176,11 @@
(defn -main
"Read transactions from stdin and send output to stdout"
[]
(gossip-loop)
(process-stdin (comp printout
generate-json
process-request
parse-json)))


(-main)

0 comments on commit 0ff5505

Please sign in to comment.