From fdef38084e617eb9361660d2389e1323a712172b Mon Sep 17 00:00:00 2001 From: Vlad-Iulian Ilie Date: Wed, 24 Aug 2022 15:53:53 +0300 Subject: [PATCH] Add endpoint to move all replicas from a disk to another disk of the same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker --- config/cruisecontrol.properties | 3 + .../analyzer/goals/DiskRemovalGoal.java | 200 ++++++++++++++++++ .../config/constants/AnalyzerConfig.java | 17 +- .../CruiseControlParametersConfig.java | 43 ++-- .../constants/CruiseControlRequestConfig.java | 27 ++- .../servlet/CruiseControlEndPoint.java | 6 +- .../KafkaCruiseControlServletUtils.java | 4 + .../handler/async/RemoveDisksRequest.java | 46 ++++ .../async/runnable/RemoveDisksRunnable.java | 159 ++++++++++++++ .../servlet/parameters/ParameterUtils.java | 1 + .../parameters/RemoveDisksParameters.java | 78 +++++++ .../servlet/response/OptimizationResult.java | 5 +- .../analyzer/DiskRemovalGoalTest.java | 152 +++++++++++++ cruise-control/src/yaml/base.yaml | 2 + .../src/yaml/endpoints/removeDisks.yaml | 69 ++++++ 15 files changed, 786 insertions(+), 26 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/DiskRemovalGoal.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/RemoveDisksRequest.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RemoveDisksParameters.java create mode 100644 cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/DiskRemovalGoalTest.java create mode 100644 cruise-control/src/yaml/endpoints/removeDisks.yaml diff --git a/config/cruisecontrol.properties b/config/cruisecontrol.properties index 7bbe58217..056a802c3 100644 --- a/config/cruisecontrol.properties +++ b/config/cruisecontrol.properties @@ -170,6 +170,9 @@ num.proposal.precompute.threads=1 # The impact of strictness on the relative balancedness score. #goal.balancedness.strictness.weight +# the error margin between removed disk size and remaining disk size +#remove.disks.remaining.size.error.margin=0.1 + # Configurations for the executor # ======================================= diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/DiskRemovalGoal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/DiskRemovalGoal.java new file mode 100644 index 000000000..1a5c13602 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/DiskRemovalGoal.java @@ -0,0 +1,200 @@ +/* + * Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.analyzer.goals; + +import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; +import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance; +import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; +import com.linkedin.kafka.cruisecontrol.common.Resource; +import com.linkedin.kafka.cruisecontrol.model.Broker; +import com.linkedin.kafka.cruisecontrol.model.ClusterModel; +import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats; +import com.linkedin.kafka.cruisecontrol.model.Disk; +import com.linkedin.kafka.cruisecontrol.model.Replica; +import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements; +import java.util.Set; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Comparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance.ACCEPT; +import static com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils.MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING; + + +/** + * Soft goal to move the replicas to different log dir. + */ +public class DiskRemovalGoal implements Goal { + private static final Logger LOG = LoggerFactory.getLogger(DiskRemovalGoal.class); + private static final Double EPSILON = 0.0001; + + private final ProvisionResponse _provisionResponse; + protected final Map> _brokerIdAndLogdirs; + protected final double _errorMargin; + + public DiskRemovalGoal(Map> brokerIdAndLogdirs, double errorMargin) { + _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); + _brokerIdAndLogdirs = brokerIdAndLogdirs; + _errorMargin = errorMargin; + } + + private void sanityCheckOptimizationOptions(OptimizationOptions optimizationOptions) { + if (optimizationOptions.isTriggeredByGoalViolation()) { + throw new IllegalArgumentException(String.format("%s goal does not support use by goal violation detector.", name())); + } + } + + @Override + public boolean optimize(ClusterModel clusterModel, Set optimizedGoals, OptimizationOptions optimizationOptions) { + sanityCheckOptimizationOptions(optimizationOptions); + + for (Map.Entry> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) { + Integer brokerId = brokerIdLogDirs.getKey(); + Set logDirsToRemove = brokerIdLogDirs.getValue(); + relocateBrokerLogDirs(clusterModel, brokerId, logDirsToRemove); + } + + return true; + } + + /** + * This method relocates the replicas on the provided log dirs to other log dirs of the same broker. + * + * @param clusterModel the cluster model + * @param brokerId the id of the broker where the movement will take place + * @param logDirsToRemove the set of log dirs to be removed from the broker + */ + private void relocateBrokerLogDirs(ClusterModel clusterModel, Integer brokerId, Set logDirsToRemove) { + Broker currentBroker = clusterModel.broker(brokerId); + List remainingDisks = new ArrayList<>(); + currentBroker.disks().stream().filter(disk -> !logDirsToRemove.contains(disk.logDir())).forEach(remainingDisks::add); + remainingDisks.sort(Comparator.comparing(Disk::logDir)); + List replicasToMove = getReplicasToMoveAsListSortedBySizeDesc(currentBroker, logDirsToRemove); + + int usedDiskIdx = -1; + for (Replica replicaToMove : replicasToMove) { + usedDiskIdx = relocateReplicaIfPossible(clusterModel, brokerId, remainingDisks, replicaToMove, usedDiskIdx); + } + } + + /** + * This method provides the list of replicas to be moved sorted in descending order by the disk utilization. + * + * @param broker the broker where the replicas are + * @param logDirs the log dirs where the replicas are + * @return the sorted list of replicas to be moved + */ + private List getReplicasToMoveAsListSortedBySizeDesc(Broker broker, Set logDirs) { + List replicasToMove = new ArrayList<>(); + for (String logDir : logDirs) { + Set logDirReplicas = broker.disk(logDir).replicas(); + replicasToMove.addAll(logDirReplicas); + } + + replicasToMove.sort(Comparator.comparingDouble(o -> ((Replica) o).load().expectedUtilizationFor(Resource.DISK)).reversed()); + return replicasToMove; + } + + /** + * This method relocates the given replica on one of the candidate disks in a round-robin manner if there is enough space + * + * @param clusterModel the cluster model + * @param brokerId the broker id where the replica movement occurs + * @param remainingDisks the candidate disks on which to move the replica + * @param replica the replica to move + * @param usedDiskIdx the index of the last disk used to relocate replicas + * @return the index of the disk used to relocate the replica to + */ + private int relocateReplicaIfPossible(ClusterModel clusterModel, Integer brokerId, List remainingDisks, Replica replica, int usedDiskIdx) { + int remainingDisksNumber = remainingDisks.size(); + int diskIndex = (usedDiskIdx + 1) % remainingDisksNumber; + for (int i = 0; i < remainingDisksNumber; i++) { + Disk destinationDisk = remainingDisks.get(diskIndex); + if (isEnoughSpace(destinationDisk, replica)) { + clusterModel.relocateReplica(replica.topicPartition(), brokerId, destinationDisk.logDir()); + return diskIndex; + } + diskIndex = (diskIndex + 1) % remainingDisksNumber; + } + LOG.info("Could not move replica {} to any of the remaining disks.", replica); + return usedDiskIdx; + } + + /** + * This method checks if the usage on the disk that the replica will be moved to is lower than the disk capacity + * including the error margin. + * + * @param disk the disk on which the replica can be moved + * @param replica the replica to move + * @return boolean which reflects if there is enough disk space to move the replica + */ + private boolean isEnoughSpace(Disk disk, Replica replica) { + double futureUsage = disk.utilization() + replica.load().expectedUtilizationFor(Resource.DISK); + double remainingSpacePercentage = (1 - (futureUsage / disk.capacity())); + return remainingSpacePercentage > _errorMargin + || (remainingSpacePercentage > 0 && Math.abs(remainingSpacePercentage - _errorMargin) < EPSILON); + } + + @Override + public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) { + return ACCEPT; + } + + @Override + public ClusterModelStatsComparator clusterModelStatsComparator() { + return new ClusterModelStatsComparator() { + @Override + public int compare(ClusterModelStats stats1, ClusterModelStats stats2) { + return 0; + } + + @Override + public String explainLastComparison() { + return String.format("Comparison for the %s is irrelevant.", name()); + } + }; + } + + @Override + public ModelCompletenessRequirements clusterModelCompletenessRequirements() { + return new ModelCompletenessRequirements(MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING, 0, true); + } + + @Override + public String name() { + return DiskRemovalGoal.class.getSimpleName(); + } + + @Override + public void finish() { + + } + + @Override + public boolean isHardGoal() { + return false; + } + + @Override + public ProvisionStatus provisionStatus() { + // Provision status computation is not relevant to PLE goal. + return provisionResponse().status(); + } + + @Override + public ProvisionResponse provisionResponse() { + return _provisionResponse; + } + + @Override + public void configure(Map configs) { + + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java index 615362781..52be15a93 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java @@ -448,6 +448,15 @@ public final class AnalyzerConfig { String.format("The class implements %s interface and is used to generate replica to broker set mapping.", ReplicaToBrokerSetMappingPolicy.class.getName()); + /** + * remove.disks.remaining.size.error.margin + */ + public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = "remove.disks.remaining.size.error.margin"; + public static final double DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = 0.1; + public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC = "The margin of error between the remaining and the " + + "removed disk sizes. The ratio between the removed and the remaining size should be greater than this parameter. The minimum " + + "value is 0.05 (5%)."; + private AnalyzerConfig() { } @@ -683,6 +692,12 @@ public static ConfigDef define(ConfigDef configDef) { .define(REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_CONFIG, ConfigDef.Type.CLASS, DEFAULT_REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS, ConfigDef.Importance.LOW, - REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC); + REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC) + .define(REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN, + ConfigDef.Type.DOUBLE, + DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN, + atLeast(0.05), + ConfigDef.Importance.LOW, + REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java index 5af3ec975..4f264e42d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java @@ -4,26 +4,27 @@ package com.linkedin.kafka.cruisecontrol.config.constants; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ClusterLoadParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.PartitionLoadParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ProposalsParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters; -import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters; import org.apache.kafka.common.config.ConfigDef; @@ -179,6 +180,13 @@ public final class CruiseControlParametersConfig { public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName(); public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request."; + /** + * remove.disks.parameters.class + */ + public static final String REMOVE_DISKS_PARAMETERS_CLASS_CONFIG = "remove.disks.parameters.class"; + public static final String DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS = RemoveDisksParameters.class.getName(); + public static final String REMOVE_DISKS_PARAMETERS_CLASS_DOC = "The class for parameters of a disks removal request."; + private CruiseControlParametersConfig() { } @@ -293,6 +301,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.CLASS, DEFAULT_RIGHTSIZE_PARAMETERS_CLASS, ConfigDef.Importance.MEDIUM, - RIGHTSIZE_PARAMETERS_CLASS_DOC); + RIGHTSIZE_PARAMETERS_CLASS_DOC) + .define(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS, + ConfigDef.Importance.MEDIUM, + REMOVE_DISKS_PARAMETERS_CLASS_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java index a879e4bf6..8ad0b28b1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java @@ -4,17 +4,18 @@ package com.linkedin.kafka.cruisecontrol.config.constants; -import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ClusterLoadRequest; -import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest; -import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest; -import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.PartitionLoadRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest; -import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveDisksRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest; -import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.KafkaClusterStateRequest; @@ -181,6 +182,13 @@ public final class CruiseControlRequestConfig { public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName(); public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request."; + /** + * remove.disks.request.class + */ + public static final String REMOVE_DISKS_REQUEST_CLASS_CONFIG = "remove.disks.request.class"; + public static final String DEFAULT_REMOVE_DISKS_REQUEST_CLASS = RemoveDisksRequest.class.getName(); + public static final String REMOVE_DISKS_REQUEST_CLASS_DOC = "The class to handle a disks removal request."; + private CruiseControlRequestConfig() { } @@ -295,6 +303,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.CLASS, DEFAULT_RIGHTSIZE_REQUEST_CLASS, ConfigDef.Importance.MEDIUM, - RIGHTSIZE_REQUEST_CLASS_DOC); + RIGHTSIZE_REQUEST_CLASS_DOC) + .define(REMOVE_DISKS_REQUEST_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DEFAULT_REMOVE_DISKS_REQUEST_CLASS, + ConfigDef.Importance.MEDIUM, + REMOVE_DISKS_REQUEST_CLASS_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java index 548d528c0..c9ffe5b10 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java @@ -34,7 +34,8 @@ public enum CruiseControlEndPoint implements EndPoint { ADMIN(CRUISE_CONTROL_ADMIN), REVIEW(CRUISE_CONTROL_ADMIN), TOPIC_CONFIGURATION(KAFKA_ADMIN), - RIGHTSIZE(KAFKA_ADMIN); + RIGHTSIZE(KAFKA_ADMIN), + REMOVE_DISKS(KAFKA_ADMIN); private static final List CACHED_VALUES = List.of(values()); private static final List GET_ENDPOINTS = Arrays.asList(BOOTSTRAP, @@ -57,7 +58,8 @@ public enum CruiseControlEndPoint implements EndPoint { ADMIN, REVIEW, TOPIC_CONFIGURATION, - RIGHTSIZE); + RIGHTSIZE, + REMOVE_DISKS); private final EndpointType _endpointType; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java index 87e2d46c2..0ee4430e1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java @@ -114,6 +114,9 @@ public final class KafkaCruiseControlServletUtils { RequestParameterWrapper rightsize = new RequestParameterWrapper(RIGHTSIZE_PARAMETERS_CLASS_CONFIG, RIGHTSIZE_PARAMETER_OBJECT_CONFIG, RIGHTSIZE_REQUEST_CLASS_CONFIG); + RequestParameterWrapper removeDisks = new RequestParameterWrapper(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG, + REMOVE_DISKS_PARAMETER_OBJECT_CONFIG, + REMOVE_DISKS_REQUEST_CLASS_CONFIG); requestParameterConfigs.put(BOOTSTRAP, bootstrap); requestParameterConfigs.put(TRAIN, train); @@ -136,6 +139,7 @@ public final class KafkaCruiseControlServletUtils { requestParameterConfigs.put(REVIEW_BOARD, reviewBoard); requestParameterConfigs.put(TOPIC_CONFIGURATION, topicConfiguration); requestParameterConfigs.put(RIGHTSIZE, rightsize); + requestParameterConfigs.put(REMOVE_DISKS, removeDisks); REQUEST_PARAMETER_CONFIGS = Collections.unmodifiableMap(requestParameterConfigs); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/RemoveDisksRequest.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/RemoveDisksRequest.java new file mode 100644 index 000000000..bc25ba769 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/RemoveDisksRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.handler.async; + +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.OperationFuture; +import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RemoveDisksRunnable; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters; +import java.util.Map; + +import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.REMOVE_DISKS_PARAMETER_OBJECT_CONFIG; + +public class RemoveDisksRequest extends AbstractAsyncRequest { + protected RemoveDisksParameters _parameters; + + public RemoveDisksRequest() { + super(); + } + + @Override + protected OperationFuture handle(String uuid) { + OperationFuture future = new OperationFuture("Remove disks"); + pending(future.operationProgress()); + _asyncKafkaCruiseControl.sessionExecutor().submit(new RemoveDisksRunnable(_asyncKafkaCruiseControl, future, _parameters, uuid)); + return future; + } + + @Override + public RemoveDisksParameters parameters() { + return _parameters; + } + + @Override + public String name() { + return RemoveDisksRequest.class.getSimpleName(); + } + + @Override + public void configure(Map configs) { + super.configure(configs); + _parameters = (RemoveDisksParameters) validateNotNull(configs.get(REMOVE_DISKS_PARAMETER_OBJECT_CONFIG), + "Parameter configuration is missing from the request."); + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java new file mode 100644 index 000000000..7517f0e5a --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java @@ -0,0 +1,159 @@ +/* + * Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable; + +import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; +import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; +import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskRemovalGoal; +import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException; +import com.linkedin.kafka.cruisecontrol.model.Broker; +import com.linkedin.kafka.cruisecontrol.model.ClusterModel; +import com.linkedin.kafka.cruisecontrol.model.Disk; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters; +import com.linkedin.kafka.cruisecontrol.servlet.response.OptimizationResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Set; +import java.util.Map; +import java.util.Collections; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig.REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN; +import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.*; +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.DEFAULT_START_TIME_FOR_CLUSTER_MODEL; + +public class RemoveDisksRunnable extends GoalBasedOperationRunnable { + private static final Logger LOG = LoggerFactory.getLogger(RemoveDisksRunnable.class); + protected final Map> _brokerIdAndLogdirs; + + protected final double _errorMargin; + + public RemoveDisksRunnable(KafkaCruiseControl kafkaCruiseControl, + OperationFuture future, + RemoveDisksParameters parameters, + String uuid) { + super(kafkaCruiseControl, future, parameters, parameters.dryRun(), parameters.stopOngoingExecution(), parameters.skipHardGoalCheck(), + uuid, parameters::reason); + _brokerIdAndLogdirs = parameters.brokerIdAndLogdirs(); + _errorMargin = (double) _kafkaCruiseControl.config().mergedConfigValues().get(REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN); + } + + @Override + protected OptimizationResult getResult() throws Exception { + return new OptimizationResult(computeResult(), _kafkaCruiseControl.config()); + } + + @Override + protected void init() { + _kafkaCruiseControl.sanityCheckDryRun(_dryRun, _stopOngoingExecution); + _goalsByPriority = new ArrayList<>(1); + _goalsByPriority.add(new DiskRemovalGoal(_brokerIdAndLogdirs, _errorMargin)); + + _operationProgress = _future.operationProgress(); + if (_stopOngoingExecution) { + maybeStopOngoingExecutionToModifyAndWait(_kafkaCruiseControl, _operationProgress); + } + _combinedCompletenessRequirements = _goalsByPriority.get(0).clusterModelCompletenessRequirements(); + } + + @Override + protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlException, NotEnoughValidWindowsException, TimeoutException { + Set brokersToCheckPresence = new HashSet<>(_brokerIdAndLogdirs.keySet()); + _kafkaCruiseControl.sanityCheckBrokerPresence(brokersToCheckPresence); + ClusterModel clusterModel = _kafkaCruiseControl.clusterModel( + DEFAULT_START_TIME_FOR_CLUSTER_MODEL, + _kafkaCruiseControl.timeMs(), + _combinedCompletenessRequirements, + true, + _allowCapacityEstimation, + _operationProgress + ); + + checkCanRemoveDisks(_brokerIdAndLogdirs, clusterModel); + + OptimizationOptions optimizationOptions = computeOptimizationOptions(clusterModel, + false, + _kafkaCruiseControl, + Collections.emptySet(), + _dryRun, + _excludeRecentlyDemotedBrokers, + _excludeRecentlyRemovedBrokers, + _excludedTopics, + Collections.emptySet(), + false, + _fastMode + ); + + OptimizerResult result = _kafkaCruiseControl.optimizations(clusterModel, _goalsByPriority, _operationProgress, null, optimizationOptions); + if (!_dryRun) { + _kafkaCruiseControl.executeProposals( + result.goalProposals(), + Collections.emptySet(), + isKafkaAssignerMode(_goals), + 0, + 0, + 1, + 1, + SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS, + SELF_HEALING_REPLICA_MOVEMENT_STRATEGY, + _kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG), + _isTriggeredByUserRequest, + _uuid, + false + ); + } + + return result; + } + + private void checkCanRemoveDisks(Map> brokerIdAndLogdirs, ClusterModel clusterModel) { + for (Map.Entry> entry : brokerIdAndLogdirs.entrySet()) { + Integer brokerId = entry.getKey(); + Set logDirs = entry.getValue(); + Broker broker = clusterModel.broker(brokerId); + Set brokerLogDirs = broker.disks().stream().map(Disk::logDir).collect(Collectors.toSet()); + if (!brokerLogDirs.containsAll(logDirs)) { + throw new IllegalArgumentException(String.format("Invalid log dirs provided for broker %d.", brokerId)); + } + if (broker.disks().size() < logDirs.size()) { + throw new IllegalArgumentException(String.format("Too many log dirs provided for broker %d.", brokerId)); + } else if (broker.disks().size() == logDirs.size()) { + throw new IllegalArgumentException(String.format("No log dir remaining to move replicas to for broker %d.", brokerId)); + } + + double removedUsage = 0.0; + double remainingCapacity = 0.0; + double currentUsage = 0.0; + for (Disk disk : broker.disks()) { + if (logDirs.contains(disk.logDir())) { + removedUsage += disk.utilization(); + } else { + remainingCapacity += disk.capacity(); + currentUsage += disk.utilization(); + } + } + double futureUsage = removedUsage + currentUsage; + if ((1 - (futureUsage / remainingCapacity)) < _errorMargin) { + throw new IllegalArgumentException("Not enough remaining capacity to move replicas to."); + } + } + } + + @Override + protected boolean shouldWorkWithClusterModel() { + return true; + } + + @Override + protected OptimizerResult workWithoutClusterModel() { + return null; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java index 3ed654b49..da99e59e1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java @@ -161,6 +161,7 @@ public final class ParameterUtils { public static final String REVIEW_PARAMETER_OBJECT_CONFIG = "review.parameter.object"; public static final String TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG = "topic.configuration.parameter.object"; public static final String RIGHTSIZE_PARAMETER_OBJECT_CONFIG = "rightsize.parameter.object"; + public static final String REMOVE_DISKS_PARAMETER_OBJECT_CONFIG = "remove.disks.parameter.object"; private ParameterUtils() { } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RemoveDisksParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RemoveDisksParameters.java new file mode 100644 index 000000000..576f0cf20 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RemoveDisksParameters.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.parameters; + +import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import com.linkedin.kafka.cruisecontrol.servlet.UserRequestException; +import java.io.UnsupportedEncodingException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.Collections; +import java.util.Set; +import java.util.Map; + +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.*; + +public class RemoveDisksParameters extends GoalBasedOptimizationParameters { + protected static final SortedSet CASE_INSENSITIVE_PARAMETER_NAMES; + static { + SortedSet validParameterNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + validParameterNames.add(BROKER_ID_AND_LOGDIRS_PARAM); + validParameterNames.add(DRY_RUN_PARAM); + validParameterNames.add(REASON_PARAM); + validParameterNames.add(SKIP_HARD_GOAL_CHECK_PARAM); + validParameterNames.add(STOP_ONGOING_EXECUTION_PARAM); + CASE_INSENSITIVE_PARAMETER_NAMES = Collections.unmodifiableSortedSet(validParameterNames); + } + protected boolean _dryRun; + protected boolean _skipHardGoalCheck; + protected String _reason; + protected boolean _stopOngoingExecution; + protected Map> _logdirByBrokerId; + + public RemoveDisksParameters() { + super(); + } + + @Override + protected void initParameters() throws UnsupportedEncodingException { + super.initParameters(); + _logdirByBrokerId = ParameterUtils.brokerIdAndLogdirs(_request); + _dryRun = ParameterUtils.getDryRun(_request); + _skipHardGoalCheck = ParameterUtils.skipHardGoalCheck(_request); + boolean requestReasonRequired = _config.getBoolean(ExecutorConfig.REQUEST_REASON_REQUIRED_CONFIG); + _reason = ParameterUtils.reason(_request, requestReasonRequired && !_dryRun); + _stopOngoingExecution = ParameterUtils.stopOngoingExecution(_request); + if (_stopOngoingExecution && _dryRun) { + throw new UserRequestException(String.format("%s and %s cannot both be set to true.", STOP_ONGOING_EXECUTION_PARAM, DRY_RUN_PARAM)); + } + } + + public Map> brokerIdAndLogdirs() { + return _logdirByBrokerId; + } + + @Override + public void configure(Map configs) { + super.configure(configs); + } + + @Override + public SortedSet caseInsensitiveParameterNames() { + return CASE_INSENSITIVE_PARAMETER_NAMES; + } + public String reason() { + return _reason; + } + public boolean dryRun() { + return _dryRun; + } + public boolean skipHardGoalCheck() { + return _skipHardGoalCheck; + } + public boolean stopOngoingExecution() { + return _stopOngoingExecution; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/OptimizationResult.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/OptimizationResult.java index 6704ade84..06a413f4b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/OptimizationResult.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/OptimizationResult.java @@ -6,15 +6,16 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters; import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal; import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats; import com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint; import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddedOrRemovedBrokerParameters; -import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaOptimizationParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -85,6 +86,8 @@ protected String getPlaintextPretext(CruiseControlParameters parameters) { case TOPIC_CONFIGURATION: return String.format("%n%nCluster load after updating replication factor of topics %s%n", _optimizerResult.topicsWithReplicationFactorChange()); + case REMOVE_DISKS: + return String.format("%n%nCluster load after removing disks %s:%n", ((RemoveDisksParameters) parameters).brokerIdAndLogdirs()); default: LOG.error("Unrecognized endpoint."); return "Unrecognized endpoint."; diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/DiskRemovalGoalTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/DiskRemovalGoalTest.java new file mode 100644 index 000000000..0648436b2 --- /dev/null +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/DiskRemovalGoalTest.java @@ -0,0 +1,152 @@ +/* + * Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.analyzer; + +import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues; +import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricValues; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskRemovalGoal; +import com.linkedin.kafka.cruisecontrol.common.Resource; +import com.linkedin.kafka.cruisecontrol.common.TestConstants; +import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo; +import com.linkedin.kafka.cruisecontrol.model.ClusterModel; +import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration; +import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.HashMap; +import java.util.Collections; + +import static com.linkedin.kafka.cruisecontrol.common.TestConstants.*; +import static com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig.DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN; +import static org.junit.Assert.assertEquals; + +public class DiskRemovalGoalTest { + private static final TopicPartition T0P0 = new TopicPartition(TOPIC0, 0); + private static final TopicPartition T0P1 = new TopicPartition(TOPIC0, 1); + private static final TopicPartition T0P2 = new TopicPartition(TOPIC0, 2); + private static final TopicPartition T0P3 = new TopicPartition(TOPIC0, 3); + private static final boolean POPULATE_DISK_INFO = true; + private static final String RACK = "r0"; + private static final String HOST = "h0"; + private static final int BROKER_ID = 0; + private static final int INDEX = 0; + private static final double HIGH_DISK_USAGE = 0.8; + private static final double LOW_DISK_USAGE = 0.3; + public static final String LOGDIR2 = "/mnt/i02"; + + private static final Map EXTENDED_DISK_CAPACITY = new HashMap<>(DISK_CAPACITY); + + static { + EXTENDED_DISK_CAPACITY.put(LOGDIR2, TestConstants.LARGE_BROKER_CAPACITY / 2); + } + + @Test + public void testMoveReplicasToAnotherLogDirWithEnoughDiskSpace() { + ClusterModel clusterModel = createClusterModel(DISK_CAPACITY); + createReplicaAndSetLoad(clusterModel, LOGDIR0, T0P0, true, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR1, T0P1, false, LOW_DISK_USAGE); + Map> brokerIdAndLogDirs = new HashMap<>(); + brokerIdAndLogDirs.put(0, new HashSet<>(Collections.singletonList(LOGDIR0))); + + runOptimization(clusterModel, brokerIdAndLogDirs); + + assertEquals(clusterModel.broker(0).disk(LOGDIR0).replicas().size(), 0); + assertEquals(clusterModel.broker(0).disk(LOGDIR1).replicas().size(), 2); + } + + @Test + public void testReplicasStayIsDestinationHasInsufficientCapacity() { + ClusterModel clusterModel = createClusterModel(DISK_CAPACITY); + createReplicaAndSetLoad(clusterModel, LOGDIR0, T0P0, true, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR1, T0P1, false, HIGH_DISK_USAGE); + Map> brokerIdAndLogDirs = new HashMap<>(); + brokerIdAndLogDirs.put(0, new HashSet<>(Collections.singletonList(LOGDIR0))); + + runOptimization(clusterModel, brokerIdAndLogDirs); + + assertEquals(clusterModel.broker(0).disk(LOGDIR0).replicas().size(), 1); + assertEquals(clusterModel.broker(0).disk(LOGDIR1).replicas().size(), 1); + } + + @Test + public void testMoveReplicasInARoundRobinMannerWithEnoughDiskSpace() { + ClusterModel clusterModel = createClusterModel(EXTENDED_DISK_CAPACITY); + createReplicaAndSetLoad(clusterModel, LOGDIR0, T0P0, true, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR0, T0P3, true, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR1, T0P1, false, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR2, T0P2, false, LOW_DISK_USAGE); + Map> brokerIdAndLogDirs = new HashMap<>(); + brokerIdAndLogDirs.put(0, new HashSet<>(Collections.singletonList(LOGDIR0))); + + runOptimization(clusterModel, brokerIdAndLogDirs); + + assertEquals(clusterModel.broker(0).disk(LOGDIR0).replicas().size(), 0); + assertEquals(clusterModel.broker(0).disk(LOGDIR1).replicas().size(), 2); + assertEquals(clusterModel.broker(0).disk(LOGDIR2).replicas().size(), 2); + } + + @Test + public void testMoveReplicasInARoundRobinMannerWithNotEnoughDiskSpace() { + ClusterModel clusterModel = createClusterModel(EXTENDED_DISK_CAPACITY); + createReplicaAndSetLoad(clusterModel, LOGDIR0, T0P0, true, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR0, T0P3, true, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR1, T0P1, false, LOW_DISK_USAGE); + createReplicaAndSetLoad(clusterModel, LOGDIR2, T0P2, false, HIGH_DISK_USAGE); + Map> brokerIdAndLogDirs = new HashMap<>(); + brokerIdAndLogDirs.put(0, new HashSet<>(Collections.singletonList(LOGDIR0))); + + runOptimization(clusterModel, brokerIdAndLogDirs); + + assertEquals(clusterModel.broker(0).disk(LOGDIR0).replicas().size(), 0); + assertEquals(clusterModel.broker(0).disk(LOGDIR1).replicas().size(), 3); + assertEquals(clusterModel.broker(0).disk(LOGDIR2).replicas().size(), 1); + } + + private void runOptimization(ClusterModel clusterModel, Map> brokerIdAndLogDirs) { + DiskRemovalGoal goal = new DiskRemovalGoal(brokerIdAndLogDirs, DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN); + // Before the optimization, goals are expected to be undecided wrt their provision status. + assertEquals(ProvisionStatus.UNDECIDED, goal.provisionResponse().status()); + goal.optimize(clusterModel, Collections.emptySet(), new OptimizationOptions(Collections.emptySet(), + Collections.emptySet(), + Collections.emptySet())); + // After the optimization, PreferredLeaderElectionGoal is expected to be undecided wrt its provision status. + assertEquals(ProvisionStatus.UNDECIDED, goal.provisionResponse().status()); + } + + private ClusterModel createClusterModel(Map diskCapacity) { + ClusterModel clusterModel = new ClusterModel(new ModelGeneration(0, 0), 1.0); + clusterModel.createRack(RACK); + BrokerCapacityInfo capacityInfo = new BrokerCapacityInfo(TestConstants.BROKER_CAPACITY, null, diskCapacity); + clusterModel.createBroker(RACK, HOST, BROKER_ID, capacityInfo, POPULATE_DISK_INFO); + return clusterModel; + } + + private void createReplicaAndSetLoad(ClusterModel clusterModel, + String logdir, + TopicPartition tp, + boolean isLeader, + double diskUsage) { + clusterModel.createReplica(RACK, BROKER_ID, tp, INDEX, isLeader, false, logdir, false); + MetricValues defaultMetricValues = new MetricValues(1); + MetricValues diskMetricValues = new MetricValues(1); + double[] diskMetric = {EXTENDED_DISK_CAPACITY.get(logdir) * diskUsage}; + diskMetricValues.add(diskMetric); + Map metricValuesByResource = new HashMap<>(); + Resource.cachedValues().forEach(r -> { + for (short id : KafkaMetricDef.resourceToMetricIds(r)) { + if (r.equals(Resource.DISK)) { + metricValuesByResource.put(id, diskMetricValues); + } else { + metricValuesByResource.put(id, defaultMetricValues); + } + } + }); + clusterModel.setReplicaLoad(RACK, BROKER_ID, tp, new AggregatedMetricValues(metricValuesByResource), + Collections.singletonList(1L)); + } +} diff --git a/cruise-control/src/yaml/base.yaml b/cruise-control/src/yaml/base.yaml index 43fd0cf8b..b00406470 100644 --- a/cruise-control/src/yaml/base.yaml +++ b/cruise-control/src/yaml/base.yaml @@ -46,3 +46,5 @@ paths: $ref: 'endpoints/train.yaml#/TrainEndpoint' /kafkacruisecontrol/user_tasks: $ref: 'endpoints/userTasks.yaml#/UserTasksEndpoint' + /kafkacruisecontrol/remove_disks: + $ref: 'endpoints/removeDisks.yaml#/RemoveDisksEndpoint' diff --git a/cruise-control/src/yaml/endpoints/removeDisks.yaml b/cruise-control/src/yaml/endpoints/removeDisks.yaml new file mode 100644 index 000000000..f7f8f7bbe --- /dev/null +++ b/cruise-control/src/yaml/endpoints/removeDisks.yaml @@ -0,0 +1,69 @@ +RemoveDisksEndpoint: + post: + operationId: removeDisks + summary: Move all replicas from the specified disk to other disks of the same broker. + parameters: + - name: dryrun + in: query + description: Whether to dry-run the request or not. + schema: + type: boolean + default: true + - name: stop_ongoing_execution + in: query + description: Whether to stop the ongoing execution (if any) and start executing the given request. + schema: + type: boolean + default: false + - name: reason + in: query + description: Reason for request. + schema: + type: string + example: "Balance disk utilization across all brokers in the cluster." + - name: brokerid_and_logdirs + in: query + description: List of broker id and logdir pair to be demoted in the cluster. + schema: + type: object + additionalProperties: + type: array + items: + type: string + required: true + example: 101-/tmp/kafka-logs-1,101-/tmp/kafka-logs-2 + - name: skip_hard_goal_check + in: query + description: Whether to allow hard goals be skipped in proposal generation. + schema: + type: boolean + default: false + responses: + '200': + description: Successful removed disks response. + content: + application/json: + schema: + $ref: '../responses/optimizationResult.yaml#/OptimizationResult' + text/plain: + schema: + type: string + '202': + description: Remove disks in progress. + content: + application/json: + schema: + $ref: '../responses/progressResult.yaml#/ProgressResult' + text/plain: + schema: + type: string + # Response for all errors + default: + description: Error response. + content: + application/json: + schema: + $ref: '../responses/errorResponse.yaml#/ErrorResponse' + text/plain: + schema: + type: string