Skip to content

Commit

Permalink
Incorporate review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
allenxwang committed Dec 2, 2024
1 parent e020671 commit b2617c0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
continue;
}
// Check the available balancing proposals of this broker to see if we can find one ready to execute.
// Make a TreeSet copy of the proposals for this broker to avoid ConcurrentModificationException and
// keep the same order of proposals.
SortedSet<ExecutionTask> proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId));
LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker);
for (ExecutionTask task : proposalsForBroker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,20 +832,8 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
if (removedBrokers != null && !removedBrokers.isEmpty()) {
int count = 0;
int totalCount = 0;
for (ExecutionProposal proposal: proposals) {
Set<Integer> oldBrokers = proposal.oldReplicasBrokerIdSet();
Set<Integer> newBrokers = proposal.newReplicasBrokerIdSet();
if (!oldBrokers.equals(newBrokers)) {
// Only count the proposals that involve partition movement.
totalCount++;
if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) {
count++;
}
}
}
LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount);
int[] numTasks = getNumTasksUnrelatedToBrokerRemoval(removedBrokers, proposals);
LOG.info("User task {}: {} of {} partition move proposals are unrelated to removed brokers.", uuid, numTasks[0], numTasks[1]);
}
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
Expand All @@ -859,6 +847,32 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
}
}

/**
* Get the number of tasks unrelated to broker removal.
* @param removedBrokers removed brokers
* @param proposals proposals to execute
* @return an array of two integers, the first one is the number of tasks unrelated to broker removal,
* the second one is the total number of tasks.
*/
private int[] getNumTasksUnrelatedToBrokerRemoval(Set<Integer> removedBrokers, Collection<ExecutionProposal> proposals) {
int[] numTasks = new int[2];
int unrelatedCount = 0;
int totalCount = 0;
for (ExecutionProposal proposal: proposals) {
Set<Integer> oldBrokers = proposal.oldReplicasBrokerIdSet();
Set<Integer> newBrokers = proposal.newReplicasBrokerIdSet();
if (!oldBrokers.equals(newBrokers)) {
totalCount++;
if (oldBrokers.stream().noneMatch(removedBrokers::contains) && newBrokers.stream().noneMatch(removedBrokers::contains)) {
unrelatedCount++;
}
}
}
numTasks[0] = unrelatedCount;
numTasks[1] = totalCount;
return numTasks;
}

private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException {
if (_hasOngoingExecution) {
throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution.");
Expand Down Expand Up @@ -1400,6 +1414,7 @@ public void run() {
} else {
String status = "succeeded";
if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) {
// The task may be in state of COMPLETED_WITH_ERROR if the user requested to stop the execution.
status = userTaskInfo.state().toString();
}
LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);
Expand Down

0 comments on commit b2617c0

Please sign in to comment.