Skip to content

Commit

Permalink
Optimize replication throttling in Executor to reduce Kafka admin req…
Browse files Browse the repository at this point in the history
…uests

Optimizes replication throttling to enhance efficiency during
rebalance by reducing redundant Kafka admin requests, thereby
improving overall task execution time.

Signed-off-by: Aswin A <[email protected]>
  • Loading branch information
aswinayyolath committed Oct 27, 2024
1 parent 2b81fb1 commit 99c2809
Showing 1 changed file with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
Set<Integer> throttledBrokers = new HashSet<>();

// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
Expand All @@ -1621,11 +1623,21 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc

AlterPartitionReassignmentsResult result = null;
if (!tasksToExecute.isEmpty()) {
throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()));
// Set throttles only if not already set for brokers involved
List<ExecutionProposal> proposals = tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList());
for (ExecutionProposal proposal : proposals) {
int brokerId = proposal.oldLeader().brokerId();
if (!throttledBrokers.contains(brokerId)) {
throttleHelper.setThrottles(Collections.singletonList(proposal));
throttledBrokers.add(brokerId);
}
}

// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
}

// Wait indefinitely for partition movements to finish.
List<ExecutionTask> completedTasks = waitForInterBrokerReplicaTasksToFinish(result);
partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
Expand All @@ -1644,7 +1656,11 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());

throttleHelper.clearThrottles(completedTasks, inProgressTasks);
// Clear throttles only after all tasks are complete for the brokers involved
if (partitionsToMove == 0) {
throttleHelper.clearThrottles(completedTasks, inProgressTasks);
throttledBrokers.clear();
}
}

// Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not
Expand All @@ -1663,17 +1679,17 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
+ "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker "
+ "partition movement {} tasks cancelled; for leadership movements {} tasks cancelled.",
_uuid,
partitionMovementTasksByState.get(ExecutionTaskState.PENDING),
partitionMovementTasksByState.get(ExecutionTaskState.IN_PROGRESS),
partitionMovementTasksByState.get(ExecutionTaskState.ABORTING),
partitionMovementTasksByState.get(ExecutionTaskState.ABORTED),
partitionMovementTasksByState.get(ExecutionTaskState.DEAD),
partitionMovementTasksByState.get(ExecutionTaskState.COMPLETED),
partitionMovementTasksByState.getOrDefault(ExecutionTaskState.PENDING, 0),
partitionMovementTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, 0),
partitionMovementTasksByState.getOrDefault(ExecutionTaskState.ABORTING, 0),
partitionMovementTasksByState.getOrDefault(ExecutionTaskState.ABORTED, 0),
partitionMovementTasksByState.getOrDefault(ExecutionTaskState.DEAD, 0),
partitionMovementTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, 0),
executionTasksSummary.remainingInterBrokerDataToMoveInMB(),
executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION).get(ExecutionTaskState.PENDING),
executionTasksSummary.taskStat().get(LEADER_ACTION).get(ExecutionTaskState.PENDING));
executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION).getOrDefault(ExecutionTaskState.PENDING, 0),
executionTasksSummary.taskStat().get(LEADER_ACTION).getOrDefault(ExecutionTaskState.PENDING, 0));
}
}
}

private void intraBrokerMoveReplicas() {
int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();
Expand Down

0 comments on commit 99c2809

Please sign in to comment.