From 1b2b758bd4bdbd17d24dd49bec25d5e239d4bab0 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Wed, 11 Dec 2024 12:26:29 +0100 Subject: [PATCH] Reduce number of retries to remove throttle Setting the retries to 30 amounts to retrying forever, or at least for longer than the Kafka cluster will exist (~170 years). We'd rather throw and abort the reassignment process, so that we can kick it off from scratch again than have Cruise Control turn non-operational without manual action. --- .../executor/ReplicationThrottleHelper.java | 16 ++++++++++------ .../executor/ReplicationThrottleHelperTest.java | 3 ++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index 92a441701..37c9bb91c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -48,33 +48,37 @@ class ReplicationThrottleHelper { static final String LEADER_THROTTLED_REPLICAS = getLogConfig(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG); static final String FOLLOWER_THROTTLED_REPLICAS = getLogConfig(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG); public static final long CLIENT_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); - static final int RETRIES = 30; + static final int RETRIES = 3; + static final long MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(10); private final AdminClient _adminClient; private final Long _throttleRate; private final int _retries; + private long _maxDelayMs; private final Set _deadBrokers; ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate) { - this(adminClient, throttleRate, RETRIES); + this(adminClient, throttleRate, RETRIES, MAX_DELAY_MS); } ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, Set deadBrokers) { - this(adminClient, throttleRate, RETRIES, deadBrokers); + this(adminClient, throttleRate, RETRIES, MAX_DELAY_MS, deadBrokers); } // for testing - ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries) { + ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, long maxDelayMs) { this._adminClient = adminClient; this._throttleRate = throttleRate; this._retries = retries; + this._maxDelayMs = maxDelayMs; this._deadBrokers = new HashSet(); } - ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, Set deadBrokers) { + ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, long maxDelayMs, Set deadBrokers) { this._adminClient = adminClient; this._throttleRate = throttleRate; this._retries = retries; + this._maxDelayMs = maxDelayMs; this._deadBrokers = deadBrokers; } @@ -374,7 +378,7 @@ void waitForConfigs(ConfigResource cf, Collection ops) { } catch (ExecutionException | InterruptedException | TimeoutException e) { return false; } - }, _retries); + }, _retries, _maxDelayMs); if (!retryResponse) { throw new IllegalStateException("The following configs " + ops + " were not applied to " + cf + " within the time limit"); } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index b1f94014d..46caae9c3 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -479,12 +479,13 @@ public void testRemoveReplicasFromConfigTest() { public void testWaitForConfigs() throws Exception { AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); int retries = 3; + int maxDelayMs = 1000; // Case 1: queue more responses than RETRIES and expect checkConfigs to throw for (int i = 0; i < retries + 1; i++) { expectDescribeTopicConfigs(mockAdminClient, TOPIC0, EMPTY_CONFIG, true); } EasyMock.replay(mockAdminClient); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, maxDelayMs, retries); ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); assertThrows(IllegalStateException.class, () -> throttleHelper.waitForConfigs(cf, Collections.singletonList( new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET)