-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix port already in use when running inference #2064
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughThe changes in the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #2064 +/- ##
===========================================
+ Coverage 75.43% 76.14% +0.71%
===========================================
Files 134 134
Lines 24749 24783 +34
===========================================
+ Hits 18670 18872 +202
+ Misses 6079 5911 -168 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
sleap/gui/widgets/monitor.py (2)
Line range hint
780-830
: Refactor _setup_zmq for better separation of concerns.The method handles multiple responsibilities: context management, port selection, socket setup, and timer initialization. Consider breaking it down into smaller, focused methods.
Suggested refactoring:
def _setup_zmq(self, zmq_context: Optional[zmq.Context] = None): """Connect to ZMQ ports that listen to commands and updates.""" self._initialize_context(zmq_context) self._setup_subscriber() if self.show_controller: self._setup_controller() self._setup_message_timer() def _initialize_context(self, zmq_context: Optional[zmq.Context] = None): """Initialize ZMQ context.""" self.ctx_given = zmq_context is not None self.ctx = zmq.Context() if zmq_context is None else zmq_context def _setup_subscriber(self): """Set up the subscriber socket.""" self.sub = self.ctx.socket(zmq.SUB) self.sub.subscribe("") publish_port = self._find_free_port(self.zmq_ports["publish_port"]) self.zmq_ports["publish_port"] = publish_port publish_address = f"tcp://127.0.0.1:{publish_port}" self._bind_with_retry(self.sub, publish_address) def _setup_controller(self): """Set up the controller socket.""" self.zmq_ctrl = self.ctx.socket(zmq.PUB) controller_port = self._find_free_port(self.zmq_ports["controller_port"]) self.zmq_ports["controller_port"] = controller_port controller_address = f"tcp://127.0.0.1:{controller_port}" self.zmq_ctrl.bind(controller_address) def _setup_message_timer(self): """Set up the message polling timer.""" self.timer = QtCore.QTimer() self.timer.timeout.connect(self._check_messages) self.timer.start(self.message_poll_time_ms)
Line range hint
1012-1031
: Enhance socket cleanup robustness.Consider implementing the context manager protocol and adding error handling for socket cleanup operations.
Suggested implementation:
+ def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._unbind() + def _unbind(self): """Disconnect from all ZMQ sockets.""" if self.sub is not None: - self.sub.unbind(self.sub.LAST_ENDPOINT) - self.sub.close() + try: + if self.sub.LAST_ENDPOINT: + self.sub.unbind(self.sub.LAST_ENDPOINT) + self.sub.close() + except zmq.ZMQError as e: + logger.warning(f"Error during subscriber socket cleanup: {e}") self.sub = None if self.zmq_ctrl is not None: - url = self.zmq_ctrl.LAST_ENDPOINT - self.zmq_ctrl.unbind(url) - self.zmq_ctrl.close() + try: + if self.zmq_ctrl.LAST_ENDPOINT: + self.zmq_ctrl.unbind(self.zmq_ctrl.LAST_ENDPOINT) + self.zmq_ctrl.close() + except zmq.ZMQError as e: + logger.warning(f"Error during controller socket cleanup: {e}") self.zmq_ctrl = None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a race condition issue, and it can this tie-up can last several minutes
TIME_WAIT is the state that typically ties up the port for several minutes after the process has completed.
, then an automated test that quickly makes back-to-back calls to the code we use for finding a free port should be able to consistently catch this issue.
I know that the code for find_free_port
is a nested function at the moment, but perhaps it is worth breaking out the logic inside _setup_zmq
to a more test-able file-level function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
sleap/gui/utils.py (2)
33-42
: Add return type hint and improve documentationThe function's type hints and documentation could be enhanced:
- Add return type hint
- Document possible exceptions
- Add example usage
-def find_free_port(port: int, zmq_context: zmq.Context): +def find_free_port(port: int, zmq_context: zmq.Context) -> int: """Find free port to bind to. Args: port: The port to start searching from. zmq_context: The ZMQ context to use. Returns: The free port. + + Raises: + RuntimeError: If no free port is found after max_attempts. + ValueError: If port number is invalid. + + Example: + >>> ctx = zmq.Context() + >>> free_port = find_free_port(5555, ctx) """
33-55
: Consider implementing port reservation mechanismThe current implementation has a potential race condition: another process could take the port between the time we check it's free and when it's actually used. Consider these architectural improvements:
- Implement a port reservation system using file locks
- Keep the socket bound until actual use
- Use a dedicated port range for the application
This would provide more reliable port allocation in multi-process scenarios.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
sleap/gui/utils.py
(3 hunks)sleap/gui/widgets/monitor.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- sleap/gui/widgets/monitor.py
🔇 Additional comments (1)
sleap/gui/utils.py (1)
4-4
: LGTM: Import added correctly
The time
import is necessary for the new sleep functionality and follows PEP 8 guidelines.
attempts = 0 | ||
max_attempts = 10 | ||
while not is_port_free(port=port, zmq_context=zmq_context): | ||
if attempts >= max_attempts: | ||
raise RuntimeError( | ||
f"Could not find free port to display training progress after " | ||
f"{max_attempts} attempts. Please check your network settings " | ||
"or use the CLI `sleap-train` command." | ||
) | ||
port = select_zmq_port(zmq_context=zmq_context) | ||
attempts += 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation and improve error handling
The function should validate the port number and provide more specific error messages about TIME_WAIT states.
+ if not isinstance(port, int) or port < 1 or port > 65535:
+ raise ValueError(f"Invalid port number: {port}. Must be between 1 and 65535.")
+
attempts = 0
max_attempts = 10
while not is_port_free(port=port, zmq_context=zmq_context):
if attempts >= max_attempts:
raise RuntimeError(
- f"Could not find free port to display training progress after "
- f"{max_attempts} attempts. Please check your network settings "
- "or use the CLI `sleap-train` command."
+ f"Could not find free port after {max_attempts} attempts. "
+ "This might be due to ports in TIME_WAIT state. "
+ "Please wait a few minutes and try again, or use a different "
+ "port range. Alternatively, use the CLI `sleap-train` command."
)
port = select_zmq_port(zmq_context=zmq_context)
attempts += 1
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
attempts = 0 | |
max_attempts = 10 | |
while not is_port_free(port=port, zmq_context=zmq_context): | |
if attempts >= max_attempts: | |
raise RuntimeError( | |
f"Could not find free port to display training progress after " | |
f"{max_attempts} attempts. Please check your network settings " | |
"or use the CLI `sleap-train` command." | |
) | |
port = select_zmq_port(zmq_context=zmq_context) | |
attempts += 1 | |
if not isinstance(port, int) or port < 1 or port > 65535: | |
raise ValueError(f"Invalid port number: {port}. Must be between 1 and 65535.") | |
attempts = 0 | |
max_attempts = 10 | |
while not is_port_free(port=port, zmq_context=zmq_context): | |
if attempts >= max_attempts: | |
raise RuntimeError( | |
f"Could not find free port after {max_attempts} attempts. " | |
"This might be due to ports in TIME_WAIT state. " | |
"Please wait a few minutes and try again, or use a different " | |
"port range. Alternatively, use the CLI `sleap-train` command." | |
) | |
port = select_zmq_port(zmq_context=zmq_context) | |
attempts += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
sleap/gui/utils.py
(3 hunks)sleap/gui/widgets/monitor.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- sleap/gui/utils.py
🧰 Additional context used
🪛 Ruff (0.8.2)
sleap/gui/widgets/monitor.py
1093-1093: Undefined name time
(F821)
1100-1100: Undefined name time
(F821)
🔇 Additional comments (1)
sleap/gui/widgets/monitor.py (1)
1092-1094
: LGTM: Socket cleanup delay addresses port reuse issue.
The added sleep delays after unbinding sockets allow proper cleanup and address the TIME_WAIT state issue mentioned in the PR objectives.
Also applies to: 1099-1101
🧰 Tools
🪛 Ruff (0.8.2)
1093-1093: Undefined name time
(F821)
sleap/gui/widgets/monitor.py
Outdated
@@ -1114,12 +1090,14 @@ def _unbind(self): | |||
if self.sub is not None: | |||
self.sub.unbind(self.sub.LAST_ENDPOINT) | |||
self.sub.close() | |||
time.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing import for the time module.
The time
module is used but not imported.
Add this import at the beginning of the file:
from time import perf_counter
+import time
Also applies to: 1100-1100
🧰 Tools
🪛 Ruff (0.8.2)
1093-1093: Undefined name time
(F821)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
sleap/gui/widgets/monitor.py (1)
Line range hint
447-448
: Fix inconsistency in log scale minimum value.The comment mentions setting minimum to 0.001 but the code uses 1e-8. This could lead to numerical stability issues with very small values.
- y_min = max(y_min, 1e-8) + y_min = max(y_min, 0.001) # Minimum value for log scale to maintain numerical stability
🧹 Nitpick comments (1)
sleap/gui/widgets/monitor.py (1)
Line range hint
1052-1059
: Add missing type hints and improve documentation.Consider adding return type hints and improving parameter documentation for better code maintainability.
- def _toggle_ignore_outliers(self): + def _toggle_ignore_outliers(self) -> None: """Toggles whether to ignore outliers in chart scaling.""" - def _toggle_log_scale(self): + def _toggle_log_scale(self) -> None: """Toggle whether to use log-scaled y-axis."""
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
sleap/gui/utils.py
(3 hunks)sleap/gui/widgets/monitor.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- sleap/gui/utils.py
🔇 Additional comments (1)
sleap/gui/widgets/monitor.py (1)
Line range hint 1-1124
: Overall changes look good!
The implementation effectively addresses the port usage issue while also improving the plotting functionality and code quality. The changes are well-documented and maintain good separation of concerns.
@@ -12,7 +12,7 @@ | |||
import matplotlib.transforms as mtransforms | |||
from qtpy import QtCore, QtWidgets | |||
|
|||
from sleap.gui.utils import is_port_free, select_zmq_port | |||
from sleap.gui.utils import find_free_port |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding retry logic for port binding.
While using find_free_port
helps, there's still a potential race condition between finding a free port and binding to it. Another process could take the port in this window.
Consider implementing a retry mechanism:
def _setup_zmq(self, zmq_context: Optional[zmq.Context] = None):
# ... existing setup code ...
# Find a free port and bind to it.
self.zmq_ports["publish_port"] = find_free_port(
port=self.zmq_ports["publish_port"], zmq_context=self.ctx
)
publish_address = f"tcp://127.0.0.1:{self.zmq_ports['publish_port']}"
- self.sub.bind(publish_address)
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ self.sub.bind(publish_address)
+ break
+ except zmq.error.ZMQError as e:
+ if attempt == max_retries - 1:
+ raise RuntimeError(f"Failed to bind to {publish_address} after {max_retries} attempts: {e}")
+ self.zmq_ports["publish_port"] = find_free_port(
+ port=self.zmq_ports["publish_port"], zmq_context=self.ctx
+ )
+ publish_address = f"tcp://127.0.0.1:{self.zmq_ports['publish_port']}"
Also applies to: 1093-1100
Description
This PR solves the port bug mentioned in the Issue #2056.
The cause of the bug is that the port after checking if it is free becomes TIME_WAIT state where it remains bound after being recently closed. When binding to the TIME_STATE ports, there will be an port already in use error. To solve this issue, I added a wait for 0.1 seconds in order to wait until the port is out of TIME_WAIT state after the port is bound to check if it is free to bind because I cannot directly add the
SO_REUSEADDR
that binds to other addresses without conflicts to the socket options (Source).Types of changes
Does this address any currently open issues?
#2056
Outside contributors checklist
Thank you for contributing to SLEAP!
❤️
Summary by CodeRabbit
New Features
Bug Fixes
Refactor