From 99c2809d4173b4d15455440e9221d733ccff82ba Mon Sep 17 00:00:00 2001 From: Aswin A Date: Sun, 27 Oct 2024 16:30:31 +0530 Subject: [PATCH] Optimize replication throttling in Executor to reduce Kafka admin requests Optimizes replication throttling to enhance efficiency during rebalance by reducing redundant Kafka admin requests, thereby improving overall task execution time. Signed-off-by: Aswin A --- .../cruisecontrol/executor/Executor.java | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index f8100cde8..3b518dcef 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1613,6 +1613,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; + Set throttledBrokers = new HashSet<>(); + // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. @@ -1621,11 +1623,21 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + // Set throttles only if not already set for brokers involved + List proposals = tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()); + for (ExecutionProposal proposal : proposals) { + int brokerId = proposal.oldLeader().brokerId(); + if (!throttledBrokers.contains(brokerId)) { + throttleHelper.setThrottles(Collections.singletonList(proposal)); + throttledBrokers.add(brokerId); + } + } + // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); } + // Wait indefinitely for partition movements to finish. List completedTasks = waitForInterBrokerReplicaTasksToFinish(result); partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); @@ -1644,7 +1656,11 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + // Clear throttles only after all tasks are complete for the brokers involved + if (partitionsToMove == 0) { + throttleHelper.clearThrottles(completedTasks, inProgressTasks); + throttledBrokers.clear(); + } } // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not @@ -1663,17 +1679,17 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc + "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker " + "partition movement {} tasks cancelled; for leadership movements {} tasks cancelled.", _uuid, - partitionMovementTasksByState.get(ExecutionTaskState.PENDING), - partitionMovementTasksByState.get(ExecutionTaskState.IN_PROGRESS), - partitionMovementTasksByState.get(ExecutionTaskState.ABORTING), - partitionMovementTasksByState.get(ExecutionTaskState.ABORTED), - partitionMovementTasksByState.get(ExecutionTaskState.DEAD), - partitionMovementTasksByState.get(ExecutionTaskState.COMPLETED), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.PENDING, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.ABORTING, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.ABORTED, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.DEAD, 0), + partitionMovementTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, 0), executionTasksSummary.remainingInterBrokerDataToMoveInMB(), - executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION).get(ExecutionTaskState.PENDING), - executionTasksSummary.taskStat().get(LEADER_ACTION).get(ExecutionTaskState.PENDING)); + executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION).getOrDefault(ExecutionTaskState.PENDING, 0), + executionTasksSummary.taskStat().get(LEADER_ACTION).getOrDefault(ExecutionTaskState.PENDING, 0)); } - } + } private void intraBrokerMoveReplicas() { int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();