diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index f8100cde8..3b518dcef 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1613,6 +1613,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; + Set throttledBrokers = new HashSet<>(); + // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. @@ -1621,11 +1623,21 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + // Set throttles only if not already set for brokers involved + List proposals = tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()); + for (ExecutionProposal proposal : proposals) { + int brokerId = proposal.oldLeader().brokerId(); + if (!throttledBrokers.contains(brokerId)) { + throttleHelper.setThrottles(Collections.singletonList(proposal)); + throttledBrokers.add(brokerId); + } + } + // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); } + // Wait indefinitely for partition movements to finish. List completedTasks = waitForInterBrokerReplicaTasksToFinish(result); partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); @@ -1644,7 +1656,11 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + // Clear throttles only after all tasks are complete for the brokers involved + if (partitionsToMove == 0) { + throttleHelper.clearThrottles(completedTasks, inProgressTasks); + throttledBrokers.clear(); + } } // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not @@ -1663,17 +1679,17 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc + "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker " + "partition movement {} tasks cancelled; for leadership movements {} tasks cancelled.", _uuid, - partitionMovementTasksByState.get(ExecutionTaskState.PENDING), - partitionMovementTasksByState.get(ExecutionTaskState.IN_PROGRESS), - partitionMovementTasksByState.get(ExecutionTaskState.ABORTING), - partitionMovementTasksByState.get(ExecutionTaskState.ABORTED), - partitionMovementTasksByState.get(ExecutionTaskState.DEAD), - partitionMovementTasksByState.get(ExecutionTaskState.COMPLETED), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.PENDING, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.ABORTING, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.ABORTED, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.DEAD, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, 0), executionTasksSummary.remainingInterBrokerDataToMoveInMB(), - executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION).get(ExecutionTaskState.PENDING), - executionTasksSummary.taskStat().get(LEADER_ACTION).get(ExecutionTaskState.PENDING)); + executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION).getOrDefault(ExecutionTaskState.PENDING, 0), + executionTasksSummary.taskStat().get(LEADER_ACTION).getOrDefault(ExecutionTaskState.PENDING, 0)); } - } + } private void intraBrokerMoveReplicas() { int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();