From 2f28f7dc99b202f7257c3e658f43f454717723c7 Mon Sep 17 00:00:00 2001 From: Seshu Yamajala Date: Wed, 8 Jan 2025 10:25:22 -0500 Subject: [PATCH] Use zmq.NOBLOCK for AsyncFetcher. --- ami/flowchart/library/DisplayWidgets.py | 29 +++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/ami/flowchart/library/DisplayWidgets.py b/ami/flowchart/library/DisplayWidgets.py index 0447667..13008f6 100644 --- a/ami/flowchart/library/DisplayWidgets.py +++ b/ami/flowchart/library/DisplayWidgets.py @@ -93,16 +93,37 @@ def update_topics(self, topics, terms): self.sockets[name] = (sock, count+1) def run(self): + while self.running: for sock, flag in self.poller.poll(): if flag != zmq.POLLIN: continue if sock == self.recv_interrupt and sock.recv_pyobj(): break - topic = sock.recv_string() - topic = topic.rstrip('\0') - heartbeat = sock.recv_pyobj() - reply = sock.recv_serialized(self.deserializer, copy=False) + + limit = 10 + + while True: + count = 0 + topic = None + heartbeat = None + reply = None + while count < limit: + try: + topic = sock.recv_string(flags=zmq.NOBLOCK) + topic = topic.rstrip('\0') + heartbeat = sock.recv_pyobj(flags=zmq.NOBLOCK) + reply = sock.recv_serialized(self.deserializer, copy=False, flags=zmq.NOBLOCK) + count += 1 + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + break + else: + raise + + if topic is not None: + break + self.data[self.view_subs[topic]] = reply self.timestamps[self.view_subs[topic]] = heartbeat # check if the data is ready