From b2617c0db70ef648f33eb2751dd67f26842f178b Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 2 Dec 2024 11:08:31 -0800 Subject: [PATCH] Incorporate review comments --- .../executor/ExecutionTaskPlanner.java | 2 + .../cruisecontrol/executor/Executor.java | 43 +++++++++++++------ 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index b86773394..5a400faa3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -389,6 +389,8 @@ public List getInterBrokerReplicaMovementTasks(Map proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId)); LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker); for (ExecutionTask task : proposalsForBroker) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index ef4d3aa0f..06c233ab5 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -832,20 +832,8 @@ public synchronized void executeProposals(Collection proposal requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); if (removedBrokers != null && !removedBrokers.isEmpty()) { - int count = 0; - int totalCount = 0; - for (ExecutionProposal proposal: proposals) { - Set oldBrokers = proposal.oldReplicasBrokerIdSet(); - Set newBrokers = proposal.newReplicasBrokerIdSet(); - if (!oldBrokers.equals(newBrokers)) { - // Only count the proposals that involve partition movement. - totalCount++; - if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) { - count++; - } - } - } - LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount); + int[] numTasks = getNumTasksUnrelatedToBrokerRemoval(removedBrokers, proposals); + LOG.info("User task {}: {} of {} partition move proposals are unrelated to removed brokers.", uuid, numTasks[0], numTasks[1]); } startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { @@ -859,6 +847,32 @@ public synchronized void executeProposals(Collection proposal } } + /** + * Get the number of tasks unrelated to broker removal. + * @param removedBrokers removed brokers + * @param proposals proposals to execute + * @return an array of two integers, the first one is the number of tasks unrelated to broker removal, + * the second one is the total number of tasks. + */ + private int[] getNumTasksUnrelatedToBrokerRemoval(Set removedBrokers, Collection proposals) { + int[] numTasks = new int[2]; + int unrelatedCount = 0; + int totalCount = 0; + for (ExecutionProposal proposal: proposals) { + Set oldBrokers = proposal.oldReplicasBrokerIdSet(); + Set newBrokers = proposal.newReplicasBrokerIdSet(); + if (!oldBrokers.equals(newBrokers)) { + totalCount++; + if (oldBrokers.stream().noneMatch(removedBrokers::contains) && newBrokers.stream().noneMatch(removedBrokers::contains)) { + unrelatedCount++; + } + } + } + numTasks[0] = unrelatedCount; + numTasks[1] = totalCount; + return numTasks; + } + private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException { if (_hasOngoingExecution) { throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution."); @@ -1400,6 +1414,7 @@ public void run() { } else { String status = "succeeded"; if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) { + // The task may be in state of COMPLETED_WITH_ERROR if the user requested to stop the execution. status = userTaskInfo.state().toString(); } LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);