Skip to content

Commit

Permalink
Add endpoint to move all replicas from a disk to another disk of the …
Browse files Browse the repository at this point in the history
…same broker (#7)

Add endpoint to move replicas from a specified disk to other disks of the same broker
  • Loading branch information
ilievladiulian authored and amuraru committed Sep 13, 2022
1 parent 6ca9e73 commit fdef380
Show file tree
Hide file tree
Showing 15 changed files with 786 additions and 26 deletions.
3 changes: 3 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ num.proposal.precompute.threads=1
# The impact of strictness on the relative balancedness score.
#goal.balancedness.strictness.weight

# the error margin between removed disk size and remaining disk size
#remove.disks.remaining.size.error.margin=0.1

# Configurations for the executor
# =======================================

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Set;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance.ACCEPT;
import static com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils.MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING;


/**
* Soft goal to move the replicas to different log dir.
*/
public class DiskRemovalGoal implements Goal {
private static final Logger LOG = LoggerFactory.getLogger(DiskRemovalGoal.class);
private static final Double EPSILON = 0.0001;

private final ProvisionResponse _provisionResponse;
protected final Map<Integer, Set<String>> _brokerIdAndLogdirs;
protected final double _errorMargin;

public DiskRemovalGoal(Map<Integer, Set<String>> brokerIdAndLogdirs, double errorMargin) {
_provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED);
_brokerIdAndLogdirs = brokerIdAndLogdirs;
_errorMargin = errorMargin;
}

private void sanityCheckOptimizationOptions(OptimizationOptions optimizationOptions) {
if (optimizationOptions.isTriggeredByGoalViolation()) {
throw new IllegalArgumentException(String.format("%s goal does not support use by goal violation detector.", name()));
}
}

@Override
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
sanityCheckOptimizationOptions(optimizationOptions);

for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) {
Integer brokerId = brokerIdLogDirs.getKey();
Set<String> logDirsToRemove = brokerIdLogDirs.getValue();
relocateBrokerLogDirs(clusterModel, brokerId, logDirsToRemove);
}

return true;
}

/**
* This method relocates the replicas on the provided log dirs to other log dirs of the same broker.
*
* @param clusterModel the cluster model
* @param brokerId the id of the broker where the movement will take place
* @param logDirsToRemove the set of log dirs to be removed from the broker
*/
private void relocateBrokerLogDirs(ClusterModel clusterModel, Integer brokerId, Set<String> logDirsToRemove) {
Broker currentBroker = clusterModel.broker(brokerId);
List<Disk> remainingDisks = new ArrayList<>();
currentBroker.disks().stream().filter(disk -> !logDirsToRemove.contains(disk.logDir())).forEach(remainingDisks::add);
remainingDisks.sort(Comparator.comparing(Disk::logDir));
List<Replica> replicasToMove = getReplicasToMoveAsListSortedBySizeDesc(currentBroker, logDirsToRemove);

int usedDiskIdx = -1;
for (Replica replicaToMove : replicasToMove) {
usedDiskIdx = relocateReplicaIfPossible(clusterModel, brokerId, remainingDisks, replicaToMove, usedDiskIdx);
}
}

/**
* This method provides the list of replicas to be moved sorted in descending order by the disk utilization.
*
* @param broker the broker where the replicas are
* @param logDirs the log dirs where the replicas are
* @return the sorted list of replicas to be moved
*/
private List<Replica> getReplicasToMoveAsListSortedBySizeDesc(Broker broker, Set<String> logDirs) {
List<Replica> replicasToMove = new ArrayList<>();
for (String logDir : logDirs) {
Set<Replica> logDirReplicas = broker.disk(logDir).replicas();
replicasToMove.addAll(logDirReplicas);
}

replicasToMove.sort(Comparator.comparingDouble(o -> ((Replica) o).load().expectedUtilizationFor(Resource.DISK)).reversed());
return replicasToMove;
}

/**
* This method relocates the given replica on one of the candidate disks in a round-robin manner if there is enough space
*
* @param clusterModel the cluster model
* @param brokerId the broker id where the replica movement occurs
* @param remainingDisks the candidate disks on which to move the replica
* @param replica the replica to move
* @param usedDiskIdx the index of the last disk used to relocate replicas
* @return the index of the disk used to relocate the replica to
*/
private int relocateReplicaIfPossible(ClusterModel clusterModel, Integer brokerId, List<Disk> remainingDisks, Replica replica, int usedDiskIdx) {
int remainingDisksNumber = remainingDisks.size();
int diskIndex = (usedDiskIdx + 1) % remainingDisksNumber;
for (int i = 0; i < remainingDisksNumber; i++) {
Disk destinationDisk = remainingDisks.get(diskIndex);
if (isEnoughSpace(destinationDisk, replica)) {
clusterModel.relocateReplica(replica.topicPartition(), brokerId, destinationDisk.logDir());
return diskIndex;
}
diskIndex = (diskIndex + 1) % remainingDisksNumber;
}
LOG.info("Could not move replica {} to any of the remaining disks.", replica);
return usedDiskIdx;
}

/**
* This method checks if the usage on the disk that the replica will be moved to is lower than the disk capacity
* including the error margin.
*
* @param disk the disk on which the replica can be moved
* @param replica the replica to move
* @return boolean which reflects if there is enough disk space to move the replica
*/
private boolean isEnoughSpace(Disk disk, Replica replica) {
double futureUsage = disk.utilization() + replica.load().expectedUtilizationFor(Resource.DISK);
double remainingSpacePercentage = (1 - (futureUsage / disk.capacity()));
return remainingSpacePercentage > _errorMargin
|| (remainingSpacePercentage > 0 && Math.abs(remainingSpacePercentage - _errorMargin) < EPSILON);
}

@Override
public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
return ACCEPT;
}

@Override
public ClusterModelStatsComparator clusterModelStatsComparator() {
return new ClusterModelStatsComparator() {
@Override
public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
return 0;
}

@Override
public String explainLastComparison() {
return String.format("Comparison for the %s is irrelevant.", name());
}
};
}

@Override
public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
return new ModelCompletenessRequirements(MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING, 0, true);
}

@Override
public String name() {
return DiskRemovalGoal.class.getSimpleName();
}

@Override
public void finish() {

}

@Override
public boolean isHardGoal() {
return false;
}

@Override
public ProvisionStatus provisionStatus() {
// Provision status computation is not relevant to PLE goal.
return provisionResponse().status();
}

@Override
public ProvisionResponse provisionResponse() {
return _provisionResponse;
}

@Override
public void configure(Map<String, ?> configs) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,15 @@ public final class AnalyzerConfig {
String.format("The class implements %s interface and is used to generate replica to broker set mapping.",
ReplicaToBrokerSetMappingPolicy.class.getName());

/**
* <code>remove.disks.remaining.size.error.margin</code>
*/
public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = "remove.disks.remaining.size.error.margin";
public static final double DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = 0.1;
public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC = "The margin of error between the remaining and the "
+ "removed disk sizes. The ratio between the removed and the remaining size should be greater than this parameter. The minimum "
+ "value is 0.05 (5%).";

private AnalyzerConfig() {
}

Expand Down Expand Up @@ -683,6 +692,12 @@ public static ConfigDef define(ConfigDef configDef) {
.define(REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_CONFIG,
ConfigDef.Type.CLASS, DEFAULT_REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS,
ConfigDef.Importance.LOW,
REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC);
REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC)
.define(REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN,
ConfigDef.Type.DOUBLE,
DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN,
atLeast(0.05),
ConfigDef.Importance.LOW,
REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ClusterLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PartitionLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ProposalsParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import org.apache.kafka.common.config.ConfigDef;


Expand Down Expand Up @@ -179,6 +180,13 @@ public final class CruiseControlParametersConfig {
public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName();
public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request.";

/**
* <code>remove.disks.parameters.class</code>
*/
public static final String REMOVE_DISKS_PARAMETERS_CLASS_CONFIG = "remove.disks.parameters.class";
public static final String DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS = RemoveDisksParameters.class.getName();
public static final String REMOVE_DISKS_PARAMETERS_CLASS_DOC = "The class for parameters of a disks removal request.";

private CruiseControlParametersConfig() {
}

Expand Down Expand Up @@ -293,6 +301,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_PARAMETERS_CLASS_DOC);
RIGHTSIZE_PARAMETERS_CLASS_DOC)
.define(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_PARAMETERS_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ClusterLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.PartitionLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveDisksRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.KafkaClusterStateRequest;
Expand Down Expand Up @@ -181,6 +182,13 @@ public final class CruiseControlRequestConfig {
public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName();
public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request.";

/**
* <code>remove.disks.request.class</code>
*/
public static final String REMOVE_DISKS_REQUEST_CLASS_CONFIG = "remove.disks.request.class";
public static final String DEFAULT_REMOVE_DISKS_REQUEST_CLASS = RemoveDisksRequest.class.getName();
public static final String REMOVE_DISKS_REQUEST_CLASS_DOC = "The class to handle a disks removal request.";

private CruiseControlRequestConfig() {
}

Expand Down Expand Up @@ -295,6 +303,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_REQUEST_CLASS_DOC);
RIGHTSIZE_REQUEST_CLASS_DOC)
.define(REMOVE_DISKS_REQUEST_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_REQUEST_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN(CRUISE_CONTROL_ADMIN),
REVIEW(CRUISE_CONTROL_ADMIN),
TOPIC_CONFIGURATION(KAFKA_ADMIN),
RIGHTSIZE(KAFKA_ADMIN);
RIGHTSIZE(KAFKA_ADMIN),
REMOVE_DISKS(KAFKA_ADMIN);

private static final List<CruiseControlEndPoint> CACHED_VALUES = List.of(values());
private static final List<CruiseControlEndPoint> GET_ENDPOINTS = Arrays.asList(BOOTSTRAP,
Expand All @@ -57,7 +58,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN,
REVIEW,
TOPIC_CONFIGURATION,
RIGHTSIZE);
RIGHTSIZE,
REMOVE_DISKS);

private final EndpointType _endpointType;

Expand Down
Loading

0 comments on commit fdef380

Please sign in to comment.