Skip to content

Commit

Permalink
Reduce CellPipe heartbeat error log (#2245)
Browse files Browse the repository at this point in the history
* reduce heartbeat error log

* added default_request_timeout to prevent long request

---------

Co-authored-by: Chester Chen <[email protected]>
  • Loading branch information
yanchengnv and chesterxgchen authored Dec 26, 2023
1 parent 46450db commit c5aea16
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
15 changes: 11 additions & 4 deletions nvflare/fuel/utils/pipe/cell_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from nvflare.fuel.utils.constants import Mode
from nvflare.fuel.utils.validation_utils import check_object_type, check_str

from .pipe import Message, Pipe
from .pipe import Message, Pipe, Topic

SSL_ROOT_CERT = "rootCA.pem"
_PREFIX = "cell_pipe."
Expand Down Expand Up @@ -215,21 +215,28 @@ def send(self, msg: Message, timeout=None) -> bool:
if self.closed:
raise BrokenPipeError("pipe closed")

optional = False
if msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]:
optional = True

reply = self.cell.send_request(
channel=self.channel,
topic=msg.topic,
target=self.peer_fqcn,
request=_to_cell_message(msg),
timeout=timeout,
optional=optional,
)
if reply:
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
return True
else:
self.logger.error(
f"failed to send '{msg.topic}' to '{self.peer_fqcn}' in channel '{self.channel}': {rc}"
)
err = f"failed to send '{msg.topic}' to '{self.peer_fqcn}' in channel '{self.channel}': {rc}"
if optional:
self.logger.debug(err)
else:
self.logger.error(err)
return False
else:
return False
Expand Down
8 changes: 8 additions & 0 deletions nvflare/fuel/utils/pipe/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
from nvflare.fuel.utils.validation_utils import check_str


class Topic(object):

ABORT = "_ABORT_"
END = "_END_"
HEARTBEAT = "_HEARTBEAT_"
PEER_GONE = "_PEER_GONE_"


class Message:

REQUEST = "REQ"
Expand Down
21 changes: 12 additions & 9 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Optional

from nvflare.apis.signal import Signal
from nvflare.fuel.utils.pipe.pipe import Message, Pipe
from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic
from nvflare.fuel.utils.validation_utils import (
check_callable,
check_non_negative_number,
Expand All @@ -29,14 +29,6 @@
from nvflare.security.logging import secure_format_exception


class Topic(object):

ABORT = "_ABORT_"
END = "_END_"
HEARTBEAT = "_HEARTBEAT_"
PEER_GONE = "_PEER_GONE_"


class PipeHandler(object):
"""
PipeHandler monitors a pipe for messages from the peer. It reads the pipe periodically and puts received data
Expand Down Expand Up @@ -70,6 +62,7 @@ def __init__(
heartbeat_timeout=30.0,
resend_interval=2.0,
max_resends=None,
default_request_timeout=5.0,
):
"""
Constructor of the PipeHandler.
Expand All @@ -83,6 +76,7 @@ def __init__(
resend_interval: how often to resend a message if failing to send. None means no resend.
Note that if the pipe does not support resending, then no resend.
max_resends: max number of resends. None means no limit.
default_request_timeout: default timeout for request if timeout not specified
"""
check_positive_number("read_interval", read_interval)
check_positive_number("heartbeat_interval", heartbeat_interval)
Expand All @@ -97,6 +91,7 @@ def __init__(
self.read_interval = read_interval
self.heartbeat_interval = heartbeat_interval
self.heartbeat_timeout = heartbeat_timeout
self.default_request_timeout = default_request_timeout
self.resend_interval = resend_interval
self.max_resends = max_resends
self.messages = deque([])
Expand Down Expand Up @@ -172,6 +167,8 @@ def _send_to_pipe(self, msg: Message, timeout=None, abort_signal: Signal = None)
return False

if not timeout or not pipe.can_resend() or not self.resend_interval:
if not timeout:
timeout = self.default_request_timeout
return pipe.send(msg, timeout)

num_sends = 0
Expand All @@ -194,6 +191,12 @@ def _send_to_pipe(self, msg: Message, timeout=None, abort_signal: Signal = None)
self.logger.info(f"will resend '{msg.topic}' in {self.resend_interval} secs")
start_wait = time.time()
while True:
if self.asked_to_stop:
return False

if abort_signal and abort_signal.triggered:
return False

if time.time() - start_wait > self.resend_interval:
break
time.sleep(0.1)
Expand Down

0 comments on commit c5aea16

Please sign in to comment.