Skip to content

Commit

Permalink
reduce heartbeat error log
Browse files Browse the repository at this point in the history
  • Loading branch information
yanchengnv committed Dec 26, 2023
1 parent f4bd1b4 commit 3fedafd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
15 changes: 11 additions & 4 deletions nvflare/fuel/utils/pipe/cell_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from nvflare.fuel.utils.constants import Mode
from nvflare.fuel.utils.validation_utils import check_object_type, check_str

from .pipe import Message, Pipe
from .pipe import Message, Pipe, Topic

SSL_ROOT_CERT = "rootCA.pem"
_PREFIX = "cell_pipe."
Expand Down Expand Up @@ -215,21 +215,28 @@ def send(self, msg: Message, timeout=None) -> bool:
if self.closed:
raise BrokenPipeError("pipe closed")

optional = False
if msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]:
optional = True

reply = self.cell.send_request(
channel=self.channel,
topic=msg.topic,
target=self.peer_fqcn,
request=_to_cell_message(msg),
timeout=timeout,
optional=optional,
)
if reply:
rc = reply.get_header(MessageHeaderKey.RETURN_CODE)
if rc == ReturnCode.OK:
return True
else:
self.logger.error(
f"failed to send '{msg.topic}' to '{self.peer_fqcn}' in channel '{self.channel}': {rc}"
)
err = f"failed to send '{msg.topic}' to '{self.peer_fqcn}' in channel '{self.channel}': {rc}"
if optional:
self.logger.debug(err)
else:
self.logger.error(err)
return False
else:
return False
Expand Down
8 changes: 8 additions & 0 deletions nvflare/fuel/utils/pipe/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
from nvflare.fuel.utils.validation_utils import check_str


class Topic(object):

ABORT = "_ABORT_"
END = "_END_"
HEARTBEAT = "_HEARTBEAT_"
PEER_GONE = "_PEER_GONE_"


class Message:

REQUEST = "REQ"
Expand Down
10 changes: 1 addition & 9 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Optional

from nvflare.apis.signal import Signal
from nvflare.fuel.utils.pipe.pipe import Message, Pipe
from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic
from nvflare.fuel.utils.validation_utils import (
check_callable,
check_non_negative_number,
Expand All @@ -29,14 +29,6 @@
from nvflare.security.logging import secure_format_exception


class Topic(object):

ABORT = "_ABORT_"
END = "_END_"
HEARTBEAT = "_HEARTBEAT_"
PEER_GONE = "_PEER_GONE_"


class PipeHandler(object):
"""
PipeHandler monitors a pipe for messages from the peer. It reads the pipe periodically and puts received data
Expand Down

0 comments on commit 3fedafd

Please sign in to comment.