forked from Ericsson/ecchronos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Retry Policy for Jmx Connection Ericsson#700
sajid riaz
committed
Sep 3, 2024
1 parent
bdb23fd
commit d564424
Showing
12 changed files
with
487 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
...com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Copyright 2024 Telefonaktiebolaget LM Ericsson | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.ericsson.bss.cassandra.ecchronos.application.config.connection; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
||
public final class RetryPolicyConfig | ||
{ | ||
|
||
private static final int DEFAULT_MAX_ATTEMPTS = 5; | ||
private static final long DEFAULT_DELAY = 5000; | ||
private static final long DEFAULT_MAX_DELAY = 30000; | ||
|
||
@JsonProperty ("maxAttempts") | ||
private Integer myMaxAttempts = DEFAULT_MAX_ATTEMPTS; | ||
|
||
@JsonProperty ("delay") | ||
private long myDelay = DEFAULT_DELAY; | ||
|
||
@JsonProperty ("maxDelay") | ||
private long myMaxDelay = DEFAULT_MAX_DELAY; | ||
|
||
@JsonProperty ("unit") | ||
private String myUnit = "seconds"; // Default to seconds | ||
|
||
public RetryPolicyConfig() | ||
{ | ||
} | ||
|
||
public RetryPolicyConfig(final Integer maxAttempts, final Integer delay, final Integer maxDelay, final String unit) | ||
{ | ||
this.myMaxAttempts = maxAttempts; | ||
this.myDelay = convertToMillis(delay, unit); | ||
this.myMaxDelay = convertToMillis(maxDelay, unit); | ||
this.myUnit = unit; | ||
} | ||
|
||
@JsonProperty ("maxAttempts") | ||
public Integer getMaxAttempts() | ||
{ | ||
return myMaxAttempts; | ||
} | ||
|
||
@JsonProperty ("maxAttempts") | ||
public void setMaxAttempts(final Integer maxAttempts) | ||
{ | ||
this.myMaxAttempts = maxAttempts; | ||
} | ||
|
||
@JsonProperty ("delay") | ||
public long getDelay() | ||
{ | ||
return myDelay; | ||
} | ||
|
||
@JsonProperty ("delay") | ||
public void setDelay(final Integer delay) | ||
{ | ||
this.myDelay = convertToMillis(delay, myUnit); | ||
} | ||
|
||
@JsonProperty ("maxDelay") | ||
public long getMaxDelay() | ||
{ | ||
return myMaxDelay; | ||
} | ||
|
||
@JsonProperty ("maxDelay") | ||
public void setMaxDelay(final Integer maxDelay) | ||
{ | ||
this.myMaxDelay = convertToMillis(maxDelay, myUnit); | ||
} | ||
|
||
@JsonProperty ("unit") | ||
public String getUnit() | ||
{ | ||
return myUnit; | ||
} | ||
|
||
@JsonProperty ("unit") | ||
public void setUnit(final String unit) | ||
{ | ||
this.myUnit = unit; | ||
// Recalculate delays with the new unit | ||
this.myDelay = convertToMillis((int) TimeUnit.MILLISECONDS.toSeconds(this.myDelay), unit); | ||
this.myMaxDelay = convertToMillis((int) TimeUnit.MILLISECONDS.toSeconds(this.myMaxDelay), unit); | ||
} | ||
|
||
private long convertToMillis(final Integer value, final String unit) | ||
{ | ||
return switch (unit.toLowerCase()) | ||
{ | ||
case "milliseconds" -> value; | ||
case "seconds" -> TimeUnit.SECONDS.toMillis(value); | ||
case "minutes" -> TimeUnit.MINUTES.toMillis(value); | ||
default -> throw new IllegalArgumentException("Unsupported time unit: " + unit); | ||
}; | ||
} | ||
|
||
public long currentDelay(final Integer count) | ||
{ | ||
long currentDelay = myDelay * count; | ||
if (currentDelay > myMaxDelay) | ||
{ | ||
currentDelay = myMaxDelay; | ||
} | ||
return currentDelay; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
236 changes: 236 additions & 0 deletions
236
...n/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
/* | ||
* Copyright 2024 Telefonaktiebolaget LM Ericsson | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.ericsson.bss.cassandra.ecchronos.application.spring; | ||
|
||
import com.datastax.oss.driver.api.core.cql.ResultSet; | ||
import com.datastax.oss.driver.api.core.cql.Row; | ||
import com.datastax.oss.driver.api.core.metadata.Node; | ||
import com.ericsson.bss.cassandra.ecchronos.application.config.Config; | ||
import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig; | ||
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; | ||
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; | ||
import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus; | ||
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; | ||
import jakarta.annotation.PostConstruct; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.DisposableBean; | ||
import org.springframework.stereotype.Service; | ||
|
||
import javax.management.remote.JMXConnector; | ||
import java.io.IOException; | ||
import java.util.*; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Service responsible for managing and scheduling retry attempts to reconnect to Cassandra nodes that have become unavailable. | ||
* <p> | ||
* This service periodically checks the status of nodes and attempts to reconnect based on a configurable retry policy. | ||
* It uses a scheduled executor service to perform retries at fixed intervals, with the intervals and the retry logic | ||
* configurable via external configurations. | ||
* </p> | ||
* | ||
* <p> | ||
* The retry logic involves calculating the delay between attempts, which increases with each subsequent retry for a node. | ||
* If the maximum number of retry attempts is reached, the node is marked as unreachable. | ||
* </p> | ||
* | ||
* <p> | ||
* This service is designed to run continuously in the background, adjusting its behavior based on the state of the | ||
* Cassandra cluster and the provided configurations. It also ensures that resources are properly cleaned up on shutdown. | ||
* </p> | ||
*/ | ||
|
||
@Service | ||
public final class RetrySchedulerService implements DisposableBean | ||
{ | ||
private static final Logger LOG = LoggerFactory.getLogger(RetrySchedulerService.class); | ||
private static final String COLUMN_DC_NAME = "datacenter_name"; | ||
private static final String COLUMN_NODE_ID = "node_id"; | ||
private static final String COLUMN_NODE_ENDPOINT = "node_endpoint"; | ||
private static final String COLUMN_NODE_STATUS = "node_status"; | ||
private final EccNodesSync myEccNodesSync; | ||
private final DistributedJmxConnectionProvider myJmxConnectionProvider; | ||
private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider; | ||
private final RetryPolicyConfig myRetryPolicyConfig; | ||
private final Map<UUID, RetryAttempt> myRetryAttempts = new ConcurrentHashMap<>(); | ||
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); | ||
|
||
public RetrySchedulerService(final EccNodesSync eccNodesSync, | ||
final Config config, | ||
final DistributedJmxConnectionProvider jmxConnectionProvider, | ||
final DistributedNativeConnectionProvider distributedNativeConnectionProvider) | ||
{ | ||
myEccNodesSync = eccNodesSync; | ||
myJmxConnectionProvider = jmxConnectionProvider; | ||
myDistributedNativeConnectionProvider = distributedNativeConnectionProvider; | ||
myRetryPolicyConfig = config.getConnectionConfig().getJmxConnection().getRetryPolicyConfig(); | ||
} | ||
|
||
@PostConstruct | ||
public void startScheduler() | ||
{ | ||
long initialDelay = myRetryPolicyConfig.getDelay(); | ||
long fixedDelay = myRetryPolicyConfig.getMaxDelay(); | ||
|
||
LOG.info("Starting RetrySchedulerService with initialDelay={} ms and fixedDelay={} ms", initialDelay, fixedDelay); | ||
|
||
scheduler.scheduleWithFixedDelay(this::retryNodes, initialDelay, fixedDelay, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
public void retryNodes() | ||
{ | ||
LOG.debug("Retrying unavailable nodes"); | ||
List<Node> unavailableNodes = new ArrayList<>(); | ||
ResultSet resultSet = myEccNodesSync.getResultSet(); | ||
|
||
// Process the results and filter out AVAILABLE nodes | ||
for (Row row : resultSet) | ||
{ | ||
UUID nodeId = row.getUuid(COLUMN_NODE_ID); | ||
String nodeEndpoint = row.getString(COLUMN_NODE_ENDPOINT); | ||
String datacenter = row.getString(COLUMN_DC_NAME); | ||
String status = row.getString(COLUMN_NODE_STATUS); | ||
|
||
// Only add nodes that are not AVAILABLE | ||
if (!NodeStatus.AVAILABLE.name().equals(status)) | ||
{ | ||
// Find the corresponding Node object in the existing nodes list | ||
myDistributedNativeConnectionProvider.getNodes() | ||
.stream() | ||
.filter(node -> Objects.equals(node.getHostId(), nodeId)) | ||
.findFirst() | ||
.ifPresent(unavailableNodes::add); | ||
} | ||
} | ||
|
||
if (unavailableNodes.isEmpty()) | ||
{ | ||
LOG.info("No unavailable nodes found."); | ||
return; | ||
} | ||
|
||
for (Node node : unavailableNodes) | ||
{ | ||
UUID nodeId = node.getHostId(); | ||
RetryAttempt retryAttempt = myRetryAttempts.getOrDefault(nodeId, new RetryAttempt(0, System.currentTimeMillis())); | ||
|
||
LOG.info("Processing node: {}, attempt: {}", node.getHostId(), retryAttempt.attempt()); | ||
|
||
if (retryAttempt.attempt() < myRetryPolicyConfig.getMaxAttempts()) | ||
{ | ||
long nextRetryTime = retryAttempt.lastAttemptTime() + calculateDelay(retryAttempt.attempt()); | ||
|
||
if (System.currentTimeMillis() >= nextRetryTime) | ||
{ | ||
LOG.info("Attempting to reconnect to node: {}", nodeId); | ||
boolean success = attemptConnection(node); | ||
|
||
if (success) | ||
{ | ||
LOG.info("Successfully reconnected to node: {}", nodeId); | ||
myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId); | ||
myRetryAttempts.remove(nodeId); // Reset retry attempts on success | ||
} | ||
else | ||
{ | ||
LOG.warn("Failed to reconnect to node: {}, incrementing retry attempt.", nodeId); | ||
myRetryAttempts.put(nodeId, new RetryAttempt(retryAttempt.attempt() + 1, System.currentTimeMillis())); | ||
} | ||
} | ||
} | ||
else | ||
{ | ||
LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId); | ||
myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId); | ||
myRetryAttempts.remove(nodeId); // Remove entry after max attempts reached | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void destroy() | ||
{ | ||
LOG.info("Shutting down RetrySchedulerService..."); | ||
|
||
scheduler.shutdown(); | ||
try | ||
{ | ||
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) | ||
{ | ||
LOG.warn("Scheduler did not terminate within the timeout. Attempting to force shutdown..."); | ||
scheduler.shutdownNow(); | ||
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) | ||
{ | ||
LOG.error("Scheduler did not terminate after force shutdown."); | ||
} | ||
} | ||
LOG.info("RetrySchedulerService shut down complete."); | ||
} | ||
catch (InterruptedException e) | ||
{ | ||
LOG.error("Interrupted during shutdown. Forcing shutdown now...", e); | ||
scheduler.shutdownNow(); | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
private long calculateDelay(int attempt) | ||
{ | ||
long calculatedDelay = myRetryPolicyConfig.getDelay() * (attempt + 1) * 2; | ||
LOG.debug("Calculated delay for attempt {}: {} ms", attempt, calculatedDelay); | ||
return Math.min(calculatedDelay, myRetryPolicyConfig.getMaxDelay()); | ||
} | ||
|
||
private boolean attemptConnection(final Node node) | ||
{ | ||
UUID nodeId = node.getHostId(); | ||
JMXConnector jmxConnector = myJmxConnectionProvider.getJmxConnector(nodeId); | ||
boolean isConnected = jmxConnector != null && isConnected(jmxConnector); | ||
|
||
if (isConnected) | ||
{ | ||
myJmxConnectionProvider.getJmxConnections().put(nodeId, jmxConnector); | ||
LOG.info("Node {} connected successfully.", nodeId); | ||
} | ||
else | ||
{ | ||
LOG.warn("Failed to connect to node {}.", nodeId); | ||
} | ||
|
||
return isConnected; | ||
} | ||
|
||
private boolean isConnected(final JMXConnector jmxConnector) | ||
{ | ||
try | ||
{ | ||
jmxConnector.getConnectionId(); | ||
return true; | ||
} | ||
catch (IOException e) | ||
{ | ||
LOG.error("Error while checking connection for JMX connector", e); | ||
return false; | ||
} | ||
} | ||
|
||
// Helper class to track retry attempts and last attempt time | ||
private record RetryAttempt(int attempt, long lastAttemptTime) { | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,5 +20,6 @@ | |
public enum NodeStatus | ||
{ | ||
UNAVAILABLE, | ||
AVAILABLE | ||
AVAILABLE, | ||
UNREACHABLE | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters