Skip to content

Commit

Permalink
Merge pull request hyperledger-archives#152 from jmitch4x/messagequeu…
Browse files Browse the repository at this point in the history
…e-deepcopy

Changes MessageQueue from Queue to a custom implementation wrapping deque
  • Loading branch information
Amundson, Shawn committed Jul 8, 2016
2 parents 102518d + 565e1d9 commit 7b27b8d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
54 changes: 44 additions & 10 deletions gossip/gossip_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
This module defines the core gossip class for communication between nodes.
"""

import Queue
import errno
import logging
import socket
import time
import copy

from collections import deque
from threading import Condition

from twisted.internet import reactor, task
from twisted.internet.protocol import DatagramProtocol
Expand All @@ -45,6 +48,39 @@ def __init__(self, msg):
super(GossipException, self).__init__(msg)


class MessageQueue(object):
"""The message queue used internally by Gossip."""

def __init__(self):
self._queue = deque()
self._condition = Condition()

def pop(self):
self._condition.acquire()
try:
while len(self._queue) < 1:
self._condition.wait()
return self._queue.pop()
finally:
self._condition.release()

def __len__(self):
return len(self._queue)

def __deepcopy__(self, memo):
newmq = MessageQueue()
newmq._queue = copy.deepcopy(self._queue, memo)
return newmq

def appendleft(self, msg):
self._condition.acquire()
try:
self._queue.appendleft(msg)
self._condition.notify()
finally:
self._condition.release()


class Gossip(object, DatagramProtocol):
"""Defines the protocol for gossip communcation between nodes.
Expand Down Expand Up @@ -75,7 +111,7 @@ class Gossip(object, DatagramProtocol):
to call when a node becomes disconnected.
onHeartbeatTimer (EventHandler): An EventHandler for functions
to call when the heartbeat timer fires.
MessageQueue (Queue): The queue of incoming messages.
MessageQueue (MessageQueue): The queue of incoming messages.
ProcessIncomingMessages (bool): Whether or not to process incoming
messages.
Listener (Reactor.listenUDP): The UDP listener.
Expand Down Expand Up @@ -136,7 +172,7 @@ def __init__(self, node, **kwargs):
self._HeartbeatTimer = task.LoopingCall(self._heartbeat)
self._HeartbeatTimer.start(0.05)

self.MessageQueue = Queue.Queue()
self.MessageQueue = MessageQueue()

try:
self.ProcessIncomingMessages = True
Expand Down Expand Up @@ -576,20 +612,18 @@ def _keepalive(self, now):

def _dispatcher(self):
while self.ProcessIncomingMessages:
msg = self.MessageQueue.get()
msg = self.MessageQueue.pop()
try:
if msg and msg.MessageType in self.MessageHandlerMap:
self.MessageHandlerMap[msg.MessageType][1](msg, self)

# handle the attribute error specifically so that the message type
# can be used in the next exception
# handle the attribute error specifically so that the
# message type can be used in the next exception
except:
logger.exception(
'unexpected error handling message of type %s',
msg.MessageType)

self.MessageQueue.task_done()

# --------------------------------- ###
# Locally defined interface methods ###
# --------------------------------- ###
Expand All @@ -609,7 +643,7 @@ def shutdown(self):
# to leave the socket open long enough to send the disconnect messages
# that we just queued up
self.ProcessIncomingMessages = False
self.MessageQueue.put(None)
self.MessageQueue.appendleft(None)

def register_message_handler(self, msg, handler):
"""Register a function to handle incoming messages for the
Expand Down Expand Up @@ -761,7 +795,7 @@ def handle_message(self, msg):

self.MessageHandledMap[msg.Identifier] = time.time(
) + self.ExpireMessageTime
self.MessageQueue.put(msg)
self.MessageQueue.appendleft(msg)

# and now forward it on to the peers if it is marked for forwarding
if msg.IsForward and msg.TimeToLive > 0:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_gossip_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def test_gossip_dispatcher(self):
# Test _dispatch will not loop if not processing messages
core = self._setup(8883)
msg = shutdown_message.ShutdownMessage({'__SIGNATURE__': "test"})
core.MessageQueue.put(msg)
core.MessageQueue.appendleft(msg)
# Should not run if ProcessIncomingMessages is False
# Otherwise it will loop
core.ProcessIncomingMessages = False
Expand Down Expand Up @@ -400,7 +400,7 @@ def test_gossip_shutdown(self):
core = self._setup(8888)
core.shutdown()
self.assertFalse(core.ProcessIncomingMessages)
self.assertFalse(core.MessageQueue.empty())
self.assertFalse(len(core.MessageQueue) == 0)

def test_gossip_register_message_handlers(self):
# Test that a message handler and type can be added and removed
Expand Down

0 comments on commit 7b27b8d

Please sign in to comment.