Skip to content

Commit

Permalink
Reduce number of retries to remove throttle
Browse files Browse the repository at this point in the history
Setting the retries to 30 amounts to retrying forever, or at least for
longer than the Kafka cluster will exist (~170 years). We'd rather throw
and abort the reassignment process, so that we can kick it off from
scratch again than have Cruise Control turn non-operational without
manual action.
  • Loading branch information
mborst committed Dec 17, 2024
1 parent 1accc5c commit 1b2b758
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,37 @@ class ReplicationThrottleHelper {
static final String LEADER_THROTTLED_REPLICAS = getLogConfig(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG);
static final String FOLLOWER_THROTTLED_REPLICAS = getLogConfig(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG);
public static final long CLIENT_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
static final int RETRIES = 30;
static final int RETRIES = 3;
static final long MAX_DELAY_MS = TimeUnit.SECONDS.toMillis(10);

private final AdminClient _adminClient;
private final Long _throttleRate;
private final int _retries;
private long _maxDelayMs;
private final Set<Integer> _deadBrokers;

ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate) {
this(adminClient, throttleRate, RETRIES);
this(adminClient, throttleRate, RETRIES, MAX_DELAY_MS);
}

ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, Set<Integer> deadBrokers) {
this(adminClient, throttleRate, RETRIES, deadBrokers);
this(adminClient, throttleRate, RETRIES, MAX_DELAY_MS, deadBrokers);
}

// for testing
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries) {
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, long maxDelayMs) {
this._adminClient = adminClient;
this._throttleRate = throttleRate;
this._retries = retries;
this._maxDelayMs = maxDelayMs;
this._deadBrokers = new HashSet<Integer>();
}

ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, Set<Integer> deadBrokers) {
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, long maxDelayMs, Set<Integer> deadBrokers) {
this._adminClient = adminClient;
this._throttleRate = throttleRate;
this._retries = retries;
this._maxDelayMs = maxDelayMs;
this._deadBrokers = deadBrokers;
}

Expand Down Expand Up @@ -374,7 +378,7 @@ void waitForConfigs(ConfigResource cf, Collection<AlterConfigOp> ops) {
} catch (ExecutionException | InterruptedException | TimeoutException e) {
return false;
}
}, _retries);
}, _retries, _maxDelayMs);
if (!retryResponse) {
throw new IllegalStateException("The following configs " + ops + " were not applied to " + cf + " within the time limit");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,13 @@ public void testRemoveReplicasFromConfigTest() {
public void testWaitForConfigs() throws Exception {
AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class);
int retries = 3;
int maxDelayMs = 1000;
// Case 1: queue more responses than RETRIES and expect checkConfigs to throw
for (int i = 0; i < retries + 1; i++) {
expectDescribeTopicConfigs(mockAdminClient, TOPIC0, EMPTY_CONFIG, true);
}
EasyMock.replay(mockAdminClient);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, maxDelayMs, retries);
ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0);
assertThrows(IllegalStateException.class, () -> throttleHelper.waitForConfigs(cf, Collections.singletonList(
new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET)
Expand Down

0 comments on commit 1b2b758

Please sign in to comment.