Skip to content

Commit

Permalink
handle comments to update unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr committed Sep 3, 2024
1 parent 48d106d commit 8ad4da6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Long getThrottlingLimit(final String taskKey) {
return tasksThreshold.get(taskKey);
}

private void checkForClusterManagerThrottling(
private void failFastWhenThrottlingThresholdsAreAlreadyBreached(
final boolean throttlingEnabledWithThreshold,
final Long threshold,
final long existingTaskCount,
Expand All @@ -234,7 +234,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
tasksCount.putIfAbsent(taskThrottlingKey, 0L);
// Perform shallow check before acquiring lock to avoid blocking of network threads
// if throttling is ongoing for a specific task
checkForClusterManagerThrottling(
failFastWhenThrottlingThresholdsAreAlreadyBreached(
isThrottlingEnabledWithThreshold,
threshold,
tasksCount.get(taskThrottlingKey),
Expand All @@ -243,7 +243,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
);

tasksCount.computeIfPresent(taskThrottlingKey, (key, existingTaskCount) -> {
checkForClusterManagerThrottling(
failFastWhenThrottlingThresholdsAreAlreadyBreached(
isThrottlingEnabledWithThreshold,
threshold,
existingTaskCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.opensearch.test.ClusterServiceUtils.setState;

Expand Down Expand Up @@ -442,35 +441,40 @@ public void testThrottlingWithLock() {
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));

final CountDownLatch latch = new CountDownLatch(1);
// Taking lock on tasksCount will not impact throttling behaviour now.
var threadToLock = new Thread(() -> {
throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
Thread threadToLock = null;
try {
// Taking lock on tasksCount will not impact throttling behaviour now.
threadToLock = new Thread(() -> {
throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10L;
});
});
threadToLock.start();

// adding one task will throttle
// taskCount in Queue: 5 Threshold: 5
final ClusterManagerThrottlingException exception = assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
);
assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
assertEquals(4L, throttlingStats.getThrottlingCount(taskKey));
} finally {
if (threadToLock != null) {
latch.countDown();
// Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
try {
latch.await();
threadToLock.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10L;
});
});
threadToLock.start();

// adding one task will throttle
// taskCount in Queue: 5 Threshold: 5
final ClusterManagerThrottlingException exception = assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
);
assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
assertEquals(4L, throttlingStats.getThrottlingCount(taskKey));

latch.countDown();
try {
// Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
threadToLock.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
}
Expand Down Expand Up @@ -505,46 +509,62 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() {
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));

final CountDownLatch latch = new CountDownLatch(1);
// Taking lock on tasksCount will not impact throttling behaviour now.
var threadToLock = new Thread(() -> {
throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
Thread threadToLock = null;
List<Thread> submittingThreads = new ArrayList<>();

try {
// Taking lock on tasksCount will not impact throttling behaviour now.
threadToLock = new Thread(() -> {
throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10L;
});
});
threadToLock.start();

final CountDownLatch latch2 = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Thread submittingThread = new Thread(() -> {
// adding one task will throttle
// taskCount in Queue: 5 Threshold: 5
final ClusterManagerThrottlingException exception = assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
);
assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
latch2.countDown();
});
submittingThread.start();
submittingThreads.add(submittingThread);
}
try {
latch2.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(13L, throttlingStats.getThrottlingCount(taskKey));
} finally {
if (threadToLock != null) {
latch.countDown();
try {
latch.await();
// Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
threadToLock.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10L;
});
});
threadToLock.start();

// submit 1000 threads to verify throttlingCount behaviour as well
final CountDownLatch latch2 = new CountDownLatch(10);
IntStream.range(0, 10).forEach(i -> new Thread(() -> {
// adding one task will throttle
// taskCount in Queue: 5 Threshold: 5
final ClusterManagerThrottlingException exception = assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
);
assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
latch2.countDown();
}).start());

try {
latch2.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(13L, throttlingStats.getThrottlingCount(taskKey));

try {
latch.countDown();
// Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
threadToLock.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (Thread submittingThread : submittingThreads) {
try {
submittingThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
}
Expand Down

0 comments on commit 8ad4da6

Please sign in to comment.