Skip to content

Commit

Permalink
added default_request_timeout to prevent long request
Browse files Browse the repository at this point in the history
  • Loading branch information
yanchengnv committed Dec 26, 2023
1 parent 3fedafd commit 5e01c6c
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
heartbeat_timeout=30.0,
resend_interval=2.0,
max_resends=None,
default_request_timeout=5.0,
):
"""
Constructor of the PipeHandler.
Expand All @@ -75,6 +76,7 @@ def __init__(
resend_interval: how often to resend a message if failing to send. None means no resend.
Note that if the pipe does not support resending, then no resend.
max_resends: max number of resends. None means no limit.
default_request_timeout: default timeout for request if timeout not specified
"""
check_positive_number("read_interval", read_interval)
check_positive_number("heartbeat_interval", heartbeat_interval)
Expand All @@ -89,6 +91,7 @@ def __init__(
self.read_interval = read_interval
self.heartbeat_interval = heartbeat_interval
self.heartbeat_timeout = heartbeat_timeout
self.default_request_timeout = default_request_timeout
self.resend_interval = resend_interval
self.max_resends = max_resends
self.messages = deque([])
Expand Down Expand Up @@ -164,6 +167,8 @@ def _send_to_pipe(self, msg: Message, timeout=None, abort_signal: Signal = None)
return False

if not timeout or not pipe.can_resend() or not self.resend_interval:
if not timeout:
timeout = self.default_request_timeout
return pipe.send(msg, timeout)

num_sends = 0
Expand All @@ -186,6 +191,12 @@ def _send_to_pipe(self, msg: Message, timeout=None, abort_signal: Signal = None)
self.logger.info(f"will resend '{msg.topic}' in {self.resend_interval} secs")
start_wait = time.time()
while True:
if self.asked_to_stop:
return False

if abort_signal and abort_signal.triggered:
return False

if time.time() - start_wait > self.resend_interval:
break
time.sleep(0.1)
Expand Down

0 comments on commit 5e01c6c

Please sign in to comment.