From 2ada27873e3db0e08f5021082fd926e6522880a1 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Thu, 29 Aug 2024 21:06:54 +0530 Subject: [PATCH 01/10] [ClusterManagerTaskThrottler Improvements] + Add shallow check in ClusterManagerTaskThrottler's onBeginSubmit method before computeIfPresent to avoid lock when queue is full + Remove stack trace filling in ClusterManagerThrottlingException Signed-off-by: Sumit Bansal --- CHANGELOG.md | 1 + .../service/ClusterManagerTaskThrottler.java | 48 ++++++++++++------- .../ClusterManagerThrottlingException.java | 6 +++ 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8b695205e789..a712daf08bd6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) +- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 827f3a12fbce4..a507c62418994 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -209,28 +209,40 @@ Long getThrottlingLimit(final String taskKey) { return tasksThreshold.get(taskKey); } + private void checkForClusterManagerThrottling( + final ThrottlingKey clusterManagerThrottlingKey, + final String taskThrottlingKey, + final long taskCount, + final int tasksSize + ) { + if (clusterManagerThrottlingKey.isThrottlingEnabled()) { + Long threshold = tasksThreshold.get(taskThrottlingKey); + if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) { + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); + logger.warn( + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", + taskThrottlingKey, + tasksSize, + threshold + ); + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); + } + } + } + @Override public void onBeginSubmit(List tasks) { - ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) + final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) .getClusterManagerThrottlingKey(); - tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L); - tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> { + final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); + tasksCount.putIfAbsent(taskThrottlingKey, 0L); + + // Performing shallow check before taking lock, performing throttle check and computing new count + checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, tasksCount.get(taskThrottlingKey), tasks.size()); + + tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { int size = tasks.size(); - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { - Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); - if (threshold != null && shouldThrottle(threshold, count, size)) { - clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); - logger.warn( - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", - clusterManagerThrottlingKey.getTaskThrottlingKey(), - tasks.size(), - threshold - ); - throw new ClusterManagerThrottlingException( - "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey() - ); - } - } + checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, count, size); return count + size; }); } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java index 04fa9fa45d5ea..7a835910c400f 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java @@ -25,4 +25,10 @@ public ClusterManagerThrottlingException(String msg, Object... args) { public ClusterManagerThrottlingException(StreamInput in) throws IOException { super(in); } + + @Override + public Throwable fillInStackTrace() { + // This is on the hot path; stack traces are expensive to compute and not very useful for this exception, so don't fill it. + return this; + } } From d80be09aea4f846d633923a21e4c43f4e747074d Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 10:47:52 +0530 Subject: [PATCH 02/10] Perform code refactor Signed-off-by: Sumit Bansal --- .../service/ClusterManagerTaskThrottler.java | 60 ++++++++------ .../ClusterManagerTaskThrottlerTests.java | 81 ++++++++++++++++--- 2 files changed, 103 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index a507c62418994..be67a9c664002 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -33,7 +33,7 @@ *

* Set specific setting to for setting the threshold of throttling of particular task type. * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, - * Set it to default value(-1) to disable the throttling for this task type. + * Set it to default value(-1) to disable the throttling for this task type. */ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class); @@ -69,7 +69,7 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener; - private final ConcurrentMap tasksCount; + final ConcurrentMap tasksCount; private final ConcurrentMap tasksThreshold; private final Supplier minNodeVersionSupplier; @@ -210,23 +210,14 @@ Long getThrottlingLimit(final String taskKey) { } private void checkForClusterManagerThrottling( - final ThrottlingKey clusterManagerThrottlingKey, - final String taskThrottlingKey, + final boolean throttlingEnabledWithThreshold, + final Long threshold, final long taskCount, - final int tasksSize + final int tasksSize, + final String taskThrottlingKey ) { - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { - Long threshold = tasksThreshold.get(taskThrottlingKey); - if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) { - clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); - logger.warn( - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", - taskThrottlingKey, - tasksSize, - threshold - ); - throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); - } + if (throttlingEnabledWithThreshold && shouldThrottle(threshold, taskCount, tasksSize)) { + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); } } @@ -235,16 +226,33 @@ public void onBeginSubmit(List tasks) { final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) .getClusterManagerThrottlingKey(); final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); + final Long threshold = getThrottlingLimit(taskThrottlingKey); + final boolean isThrottlingEnabledWithThreshold = clusterManagerThrottlingKey.isThrottlingEnabled() && threshold != null; tasksCount.putIfAbsent(taskThrottlingKey, 0L); - - // Performing shallow check before taking lock, performing throttle check and computing new count - checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, tasksCount.get(taskThrottlingKey), tasks.size()); - - tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { - int size = tasks.size(); - checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, count, size); - return count + size; - }); + int tasksSize = tasks.size(); + + try { + checkForClusterManagerThrottling( + isThrottlingEnabledWithThreshold, + threshold, + tasksCount.get(taskThrottlingKey), + tasksSize, + taskThrottlingKey + ); + tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { + checkForClusterManagerThrottling(isThrottlingEnabledWithThreshold, threshold, count, tasksSize, taskThrottlingKey); + return count + tasksSize; + }); + } catch (final ClusterManagerThrottlingException e) { + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); + logger.trace( + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", + taskThrottlingKey, + tasksSize, + threshold + ); + throw e; + } } /** diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index e25a0e0b2c3bf..fb7bf11f07370 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -69,7 +71,7 @@ public static void afterClass() { public void testDefaults() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); throttler.registerClusterManagerTask("create-index", true); @@ -108,7 +110,7 @@ public void testValidateSettingsForDifferentVersion() { } } - public void testValidateSettingsForTaskWihtoutRetryOnDataNode() { + public void testValidateSettingsForTaskWithoutRetryOnDataNode() { DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0); DiscoveryNode dataNode = getDataNode(Version.V_2_5_0); setState( @@ -139,7 +141,7 @@ public void testUpdateSettingsForNullValue() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -173,7 +175,7 @@ public void testSettingsOnBootstrap() { .put("cluster_manager.throttling.retry.max.delay", maxDelay + "s") .build(); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -187,7 +189,7 @@ public void testSettingsOnBootstrap() { public void testUpdateRetryDelaySetting() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); // verify defaults @@ -217,7 +219,7 @@ public void testValidateSettingsForUnknownTask() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); // set some limit for update snapshot tasks @@ -236,7 +238,7 @@ public void testUpdateThrottlingLimitForBasicSanity() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -263,7 +265,7 @@ public void testValidateSettingForLimit() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -274,7 +276,7 @@ public void testValidateSettingForLimit() { public void testUpdateLimit() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -309,7 +311,7 @@ public void testThrottlingForDisabledThrottlingTask() { String taskKey = "test"; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false); @@ -321,6 +323,9 @@ public void testThrottlingForDisabledThrottlingTask() { // Asserting that there was not any throttling for it assertEquals(0L, throttlingStats.getThrottlingCount(taskKey)); + + // Asserting value in tasksCount map to make sure it gets updated even when throttling is disabled + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } public void testThrottlingForInitialStaticSettingAndVersionCheck() { @@ -339,7 +344,7 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() { .put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value) .build(); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true); @@ -367,7 +372,7 @@ public void testThrottling() { String taskKey = "test"; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); @@ -406,6 +411,58 @@ public void testThrottling() { throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)); } + public void testThrottlingWithLock() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { + return clusterService.getClusterManagerService().getMinNodeVersion(); + }, throttlingStats); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + + throttler.updateLimit(taskKey, 5); + + // adding 3 tasks + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding 3 more tasks, these tasks should be throttled + // taskCount in Queue: 3 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + + // remove one task + throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + + // add 3 tasks should pass now. + // taskCount in Queue: 2 Threshold: 5 + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + final CountDownLatch latch = new CountDownLatch(1); + + // Taking lock on tasksCount will not impact throttling behaviour now. + new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }).start(); + + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + latch.countDown(); + } + private List getMockUpdateTaskList( String taskKey, ClusterManagerTaskThrottler.ThrottlingKey throttlingKey, From 758a3d987ef7200bf65aa9b1f89a113d1606e502 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 12:26:06 +0530 Subject: [PATCH 03/10] Retry Build Signed-off-by: Sumit Bansal From b1275b97936d8ca44fcacfd2620d33e7f440beb9 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 12:43:40 +0530 Subject: [PATCH 04/10] Retry Build Signed-off-by: Sumit Bansal From 9f7edc3d471faf18c36c96ae62f951dc11207f64 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 14:19:17 +0530 Subject: [PATCH 05/10] Update assertions in testThrottlingWithLock unit test Signed-off-by: Sumit Bansal --- .../ClusterManagerTaskThrottlerTests.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index fb7bf11f07370..78238eb7b0f84 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -441,9 +441,8 @@ public void testThrottlingWithLock() { throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); final CountDownLatch latch = new CountDownLatch(1); - // Taking lock on tasksCount will not impact throttling behaviour now. - new Thread(() -> { + var threadToLock = new Thread(() -> { throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { try { latch.await(); @@ -452,15 +451,27 @@ public void testThrottlingWithLock() { } return 10L; }); - }).start(); + }); + threadToLock.start(); // adding one task will throttle // taskCount in Queue: 5 Threshold: 5 - assertThrows( + final ClusterManagerThrottlingException exception = assertThrows( ClusterManagerThrottlingException.class, () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); + latch.countDown(); + try { + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread + threadToLock.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } private List getMockUpdateTaskList( From 26259d93b7ad064404673047a0dc4dbb7dbefb25 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 14:44:56 +0530 Subject: [PATCH 06/10] Add testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock Signed-off-by: Sumit Bansal --- .../ClusterManagerTaskThrottlerTests.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index 78238eb7b0f84..ede28db9d42ff 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -33,6 +33,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -474,6 +475,80 @@ public void testThrottlingWithLock() { assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } + public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { + return clusterService.getClusterManagerService().getMinNodeVersion(); + }, throttlingStats); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + + throttler.updateLimit(taskKey, 5); + + // adding 3 tasks + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding 3 more tasks, these tasks should be throttled + // taskCount in Queue: 3 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + + // remove one task + throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + + // add 3 tasks should pass now. + // taskCount in Queue: 2 Threshold: 5 + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + final CountDownLatch latch = new CountDownLatch(1); + // Taking lock on tasksCount will not impact throttling behaviour now. + var threadToLock = new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }); + threadToLock.start(); + + // submit 1000 threads to verify throttlingCount behaviour as well + final CountDownLatch latch2 = new CountDownLatch(1000); + IntStream.range(0, 1000).forEach(i -> new Thread(() -> { + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + final ClusterManagerThrottlingException exception = assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + latch2.countDown(); + }).start()); + + try { + latch2.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(1003L, throttlingStats.getThrottlingCount(taskKey)); + + try { + latch.countDown(); + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread + threadToLock.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + } + private List getMockUpdateTaskList( String taskKey, ClusterManagerTaskThrottler.ThrottlingKey throttlingKey, From c8990727845f5203aed38d016322329f2415301b Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 15:10:30 +0530 Subject: [PATCH 07/10] add comment and refactor variable names Signed-off-by: Sumit Bansal --- .../service/ClusterManagerTaskThrottler.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index be67a9c664002..4559a985a7280 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -212,11 +212,11 @@ Long getThrottlingLimit(final String taskKey) { private void checkForClusterManagerThrottling( final boolean throttlingEnabledWithThreshold, final Long threshold, - final long taskCount, - final int tasksSize, + final long existingTaskCount, + final int incomingTaskCount, final String taskThrottlingKey ) { - if (throttlingEnabledWithThreshold && shouldThrottle(threshold, taskCount, tasksSize)) { + if (throttlingEnabledWithThreshold && shouldThrottle(threshold, existingTaskCount, incomingTaskCount)) { throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); } } @@ -228,27 +228,36 @@ public void onBeginSubmit(List tasks) { final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); final Long threshold = getThrottlingLimit(taskThrottlingKey); final boolean isThrottlingEnabledWithThreshold = clusterManagerThrottlingKey.isThrottlingEnabled() && threshold != null; - tasksCount.putIfAbsent(taskThrottlingKey, 0L); - int tasksSize = tasks.size(); + int incomingTaskCount = tasks.size(); try { + tasksCount.putIfAbsent(taskThrottlingKey, 0L); + // Perform shallow check before acquiring lock to avoid blocking of network threads + // if throttling is ongoing for a specific task checkForClusterManagerThrottling( isThrottlingEnabledWithThreshold, threshold, tasksCount.get(taskThrottlingKey), - tasksSize, + incomingTaskCount, taskThrottlingKey ); - tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { - checkForClusterManagerThrottling(isThrottlingEnabledWithThreshold, threshold, count, tasksSize, taskThrottlingKey); - return count + tasksSize; + + tasksCount.computeIfPresent(taskThrottlingKey, (key, existingTaskCount) -> { + checkForClusterManagerThrottling( + isThrottlingEnabledWithThreshold, + threshold, + existingTaskCount, + incomingTaskCount, + taskThrottlingKey + ); + return existingTaskCount + incomingTaskCount; }); } catch (final ClusterManagerThrottlingException e) { - clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, incomingTaskCount); logger.trace( "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", taskThrottlingKey, - tasksSize, + incomingTaskCount, threshold ); throw e; From 34d6245d32c0c8e3ea167f374291fbad61b2b385 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 15:28:43 +0530 Subject: [PATCH 08/10] reduce threads in ut to avoid slowing down runs Signed-off-by: Sumit Bansal --- .../cluster/service/ClusterManagerTaskThrottlerTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index ede28db9d42ff..29343f53b2ac9 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -519,8 +519,8 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { threadToLock.start(); // submit 1000 threads to verify throttlingCount behaviour as well - final CountDownLatch latch2 = new CountDownLatch(1000); - IntStream.range(0, 1000).forEach(i -> new Thread(() -> { + final CountDownLatch latch2 = new CountDownLatch(10); + IntStream.range(0, 10).forEach(i -> new Thread(() -> { // adding one task will throttle // taskCount in Queue: 5 Threshold: 5 final ClusterManagerThrottlingException exception = assertThrows( @@ -537,7 +537,7 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { } catch (InterruptedException e) { throw new RuntimeException(e); } - assertEquals(1003L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(13L, throttlingStats.getThrottlingCount(taskKey)); try { latch.countDown(); From 8ad4da6fc6c8ae539a64298e402dfa44fc8ee1cb Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Wed, 4 Sep 2024 00:01:24 +0530 Subject: [PATCH 09/10] handle comments to update unit tests Signed-off-by: Sumit Bansal --- .../service/ClusterManagerTaskThrottler.java | 6 +- .../ClusterManagerTaskThrottlerTests.java | 144 ++++++++++-------- 2 files changed, 85 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 4559a985a7280..39ce218dd801a 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -209,7 +209,7 @@ Long getThrottlingLimit(final String taskKey) { return tasksThreshold.get(taskKey); } - private void checkForClusterManagerThrottling( + private void failFastWhenThrottlingThresholdsAreAlreadyBreached( final boolean throttlingEnabledWithThreshold, final Long threshold, final long existingTaskCount, @@ -234,7 +234,7 @@ public void onBeginSubmit(List tasks) { tasksCount.putIfAbsent(taskThrottlingKey, 0L); // Perform shallow check before acquiring lock to avoid blocking of network threads // if throttling is ongoing for a specific task - checkForClusterManagerThrottling( + failFastWhenThrottlingThresholdsAreAlreadyBreached( isThrottlingEnabledWithThreshold, threshold, tasksCount.get(taskThrottlingKey), @@ -243,7 +243,7 @@ public void onBeginSubmit(List tasks) { ); tasksCount.computeIfPresent(taskThrottlingKey, (key, existingTaskCount) -> { - checkForClusterManagerThrottling( + failFastWhenThrottlingThresholdsAreAlreadyBreached( isThrottlingEnabledWithThreshold, threshold, existingTaskCount, diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index 29343f53b2ac9..3bd9333dc4168 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -442,35 +441,40 @@ public void testThrottlingWithLock() { throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); final CountDownLatch latch = new CountDownLatch(1); - // Taking lock on tasksCount will not impact throttling behaviour now. - var threadToLock = new Thread(() -> { - throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + Thread threadToLock = null; + try { + // Taking lock on tasksCount will not impact throttling behaviour now. + threadToLock = new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }); + threadToLock.start(); + + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + final ClusterManagerThrottlingException exception = assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); + } finally { + if (threadToLock != null) { + latch.countDown(); + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread try { - latch.await(); + threadToLock.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } - return 10L; - }); - }); - threadToLock.start(); - - // adding one task will throttle - // taskCount in Queue: 5 Threshold: 5 - final ClusterManagerThrottlingException exception = assertThrows( - ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) - ); - assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); - assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); - assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); - - latch.countDown(); - try { - // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread - threadToLock.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } } assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } @@ -505,46 +509,62 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); final CountDownLatch latch = new CountDownLatch(1); - // Taking lock on tasksCount will not impact throttling behaviour now. - var threadToLock = new Thread(() -> { - throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + Thread threadToLock = null; + List submittingThreads = new ArrayList<>(); + + try { + // Taking lock on tasksCount will not impact throttling behaviour now. + threadToLock = new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }); + threadToLock.start(); + + final CountDownLatch latch2 = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + Thread submittingThread = new Thread(() -> { + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + final ClusterManagerThrottlingException exception = assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + latch2.countDown(); + }); + submittingThread.start(); + submittingThreads.add(submittingThread); + } + try { + latch2.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(13L, throttlingStats.getThrottlingCount(taskKey)); + } finally { + if (threadToLock != null) { + latch.countDown(); try { - latch.await(); + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread + threadToLock.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } - return 10L; - }); - }); - threadToLock.start(); - - // submit 1000 threads to verify throttlingCount behaviour as well - final CountDownLatch latch2 = new CountDownLatch(10); - IntStream.range(0, 10).forEach(i -> new Thread(() -> { - // adding one task will throttle - // taskCount in Queue: 5 Threshold: 5 - final ClusterManagerThrottlingException exception = assertThrows( - ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) - ); - assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); - assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); - latch2.countDown(); - }).start()); - - try { - latch2.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - assertEquals(13L, throttlingStats.getThrottlingCount(taskKey)); - - try { - latch.countDown(); - // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread - threadToLock.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } + for (Thread submittingThread : submittingThreads) { + try { + submittingThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } From 37ea33314664b78c15f533d6ac8327ffe19c8d16 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Wed, 4 Sep 2024 01:18:06 +0530 Subject: [PATCH 10/10] Retry Build Signed-off-by: Sumit Bansal