Skip to content

Commit

Permalink
Add execution task statistics and fix junit test
Browse files Browse the repository at this point in the history
  • Loading branch information
allenxwang committed Nov 25, 2024
1 parent 90829f3 commit eb899fa
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ExecutionProposal {
private final Set<ReplicaPlacementInfo> _replicasToRemove;
// Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker.
private final Map<Integer, ReplicaPlacementInfo> _replicasToMoveBetweenDisksByBroker;
private final Set<Integer> _oldReplicasSet;
private final Set<Integer> _newReplicasSet;

/**
* Construct an execution proposals.
Expand All @@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp,
validate();

// Populate replicas to add, to remove and to move across disk.
Set<Integer> newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
Set<Integer> oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
_newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToMoveBetweenDisksByBroker = new HashMap<>();
newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r))
.forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r));
Expand Down Expand Up @@ -177,6 +179,20 @@ public List<ReplicaPlacementInfo> oldReplicas() {
return Collections.unmodifiableList(_oldReplicas);
}

/**
* @return The broker ID set of the partitions before executing the proposal.
*/
public Set<Integer> oldReplicasBrokerIdSet() {
return Collections.unmodifiableSet(_oldReplicasSet);
}

/**
* @return The broker ID set of the partitions after executing the proposal.
*/
public Set<Integer> newReplicasBrokerIdSet() {
return Collections.unmodifiableSet(_newReplicasSet);
}

/**
* @return The new replica list fo the partition after executing the proposal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection<ExecutionProposal> pro
}
}

Map<Integer, Integer> getSotedBrokerIdToInterBrokerMoveTaskCountMap() {
return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
}

/**
* Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -544,4 +545,27 @@ private Comparator<Integer> brokerComparator(StrategyOptions strategyOptions, Re
: broker1 - broker2;
};
}

Map<Integer, Integer> getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) {
return Collections.emptyMap();
}
Map<Integer, Integer> resultMap = _interPartMoveTasksByBrokerId.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().size()
))
.entrySet()
.stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(e1, e2) -> e1,
// maintain the order of the sorted map.
LinkedHashMap::new
));
return resultMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,22 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
if (removedBrokers != null && !removedBrokers.isEmpty()) {
int count = 0;
int totalCount = 0;
for (ExecutionProposal proposal: proposals) {
Set<Integer> oldBrokers = proposal.oldReplicasBrokerIdSet();
Set<Integer> newBrokers = proposal.newReplicasBrokerIdSet();
if (!oldBrokers.equals(newBrokers)) {
// Only count the proposals that involve partition movement.
totalCount++;
if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) {
count++;
}
}
}
LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount);
}
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
if (e instanceof OngoingExecutionException) {
Expand Down Expand Up @@ -1382,7 +1398,10 @@ public void run() {
if (_executionException != null) {
LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
} else {
String status = userTaskInfo.state() == COMPLETED ? "succeeded" : userTaskInfo.state().toString();
String status = "succeeded";
if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) {
status = userTaskInfo.state().toString();
}
LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);
}
// Clear completed execution.
Expand Down Expand Up @@ -1613,6 +1632,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);
Map<Integer, Integer> map = _executionTaskManager.getSotedBrokerIdToInterBrokerMoveTaskCountMap();
LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map);
if (!map.isEmpty()) {
LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid,
map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements);
}

int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() {
UserTaskManager.UserTaskInfo mockUserTaskInfo = EasyMock.mock(UserTaskManager.UserTaskInfo.class);
// Run it any times to enable consecutive executions in tests.
EasyMock.expect(mockUserTaskInfo.requestUrl()).andReturn("mock-request").anyTimes();
expect(mockUserTaskInfo.state()).andReturn(UserTaskManager.TaskState.COMPLETED).anyTimes();
return mockUserTaskInfo;
}

Expand Down

0 comments on commit eb899fa

Please sign in to comment.