Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stop_ongoing_execution flag to rebalance requests for full run #10703

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onProposalReady(Re
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
if (Annotations.booleanAnnotation(kafkaRebalance, ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL, false)) {
LOGGER.infoCr(reconciliation, "Auto-approval set on the KafkaRebalance resource");
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, null, true);
} else {
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
switch (rebalanceAnnotation) {
Expand All @@ -876,7 +876,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onProposalReady(Re
return configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()).compose(loadmap -> Future.succeededFuture(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), StatusUtils.validate(reconciliation, kafkaRebalance)))));
case approve:
LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.approve);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, null, true);
case refresh:
LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.refresh);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder);
Expand Down Expand Up @@ -1130,21 +1130,24 @@ private boolean isKafkaClusterReady(Kafka kafka) {
}

private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation,
String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null);
String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, false);
}


private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation, String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
boolean dryrun,
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder, String userTaskID) {
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder,
String userTaskID, boolean stopOngoingExecution) {

LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}]", dryrun);
LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}] [stop_ongoing_execution={}]", dryrun, stopOngoingExecution);
rebalanceOptionsBuilder.withVerboseResponse();
if (!dryrun) {
rebalanceOptionsBuilder.withFullRun();
}
if (stopOngoingExecution) {
rebalanceOptionsBuilder.withStopOngoingExecution();
}
// backward compatibility, no mode specified means "full"
KafkaRebalanceMode mode = Optional.ofNullable(kafkaRebalance.getSpec())
.map(spec -> spec.getMode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public abstract class AbstractRebalanceOptions {
private final boolean skipHardGoalCheck;
/** Sets whether the response should be JSON formatted or formatted for readability on the command line */
private final boolean json;
/** Sets whether to stop the ongoing execution (if any) and start executing the given request. */
private final boolean stopOngoingExecution;
/** A regular expression to specify topics that should not be considered for replica movement */
private final String excludedTopics;
/** The upper bound of ongoing replica movements going into/out of each broker */
Expand Down Expand Up @@ -67,6 +69,13 @@ public boolean isJson() {
return json;
}

/**
* @return True if stopping the ongoing execution (if any) and starting executing the given request. False otherwise.
*/
public boolean isStopOngoingExecution() {
return stopOngoingExecution;
}

/**
* @return Excludes topics
*/
Expand Down Expand Up @@ -108,6 +117,7 @@ public List<String> getReplicaMovementStrategies() {
this.verbose = builder.verbose;
this.skipHardGoalCheck = builder.skipHardGoalCheck;
this.json = builder.json;
this.stopOngoingExecution = builder.stopOngoingExecution;
this.excludedTopics = builder.excludedTopics;
this.concurrentPartitionMovementsPerBroker = builder.concurrentPartitionMovementsPerBroker;
this.concurrentLeaderMovements = builder.concurrentLeaderMovements;
Expand All @@ -127,6 +137,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
private boolean verbose;
private boolean skipHardGoalCheck;
private boolean json;
private boolean stopOngoingExecution;
private String excludedTopics;
private int concurrentPartitionMovementsPerBroker;
private int concurrentLeaderMovements;
Expand All @@ -138,6 +149,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
goals = null;
verbose = false;
skipHardGoalCheck = false;
stopOngoingExecution = false;
json = true;
excludedTopics = null;
concurrentPartitionMovementsPerBroker = 0;
Expand Down Expand Up @@ -183,6 +195,16 @@ public B withSkipHardGoalCheck() {
return self();
}

/**
* Stop the ongoing execution (if any) and start executing the given request
*
* @return Instance of this builder
*/
public B withStopOngoingExecution() {
this.stopOngoingExecution = true;
return self();
}

/**
* Set rebalance goals
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private PathBuilder withAbstractRebalanceParameters(AbstractRebalanceOptions opt
if (options != null) {
PathBuilder builder = withParameter(CruiseControlParameters.DRY_RUN, String.valueOf(options.isDryRun()))
.withParameter(CruiseControlParameters.VERBOSE, String.valueOf(options.isVerbose()))
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, String.valueOf(options.isSkipHardGoalCheck()));
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, String.valueOf(options.isSkipHardGoalCheck()))
.withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, String.valueOf(options.isStopOngoingExecution()));

if (options.getExcludedTopics() != null) {
builder.withParameter(CruiseControlParameters.EXCLUDED_TOPICS, options.getExcludedTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,74 @@ private void krNewToProposalReadyToRebalancingToReadyThenRefresh(VertxTestContex
}));
}

/**
* Tests the transition from 'New' to 'Ready'
* The rebalance proposal is auto approved and the resource moves to 'Rebalancing'.
* Then the Rebalancing KafkaRebalance is refreshed and a moved to 'ProposalReady' again.
* Then the ProposalReady KafkaRebalance moves to Rebalancing again and finally to 'Ready'
*
* 1. A new KafkaRebalance resource is created with auto-approval annotation set; it is in the 'New' state
* 2. The operator requests a rebalance proposal through the Cruise Control REST API
* 3. The rebalance proposal is ready on the first call
* 4. The KafkaRebalance resource transitions to the 'ProposalReady' state
* 5. The operator requests the rebalance operation through the Cruise Control REST API
* 6. The rebalance operation is not done immediately; the operator starts polling the Cruise Control REST API
* 7. The KafkaRebalance resource moves to the 'Rebalancing' state
* 8. The KafkaRebalance resource is annotated with 'strimzi.io/rebalance=refresh' while the rebalancing is still in progress
* 9. The operator stops polling the Cruise Control REST API and requests a stop execution
* 10. The operator requests a new rebalance proposal through the Cruise Control REST API
* 11. The KafkaRebalance resource transitions to the 'ProposalReady' state again
* 12. The operator requests the rebalance operation through the Cruise Control REST API
* 13. The rebalance operation is not done immediately; the operator starts polling the Cruise Control REST API
* 14. The KafkaRebalance resource moves to the 'Rebalancing' state again
* 15. The rebalance operation is done
* 16. The KafkaRebalance resource moves to the 'Ready' state
*/
@Test
public void krNewToProposalReadyToRebalancingToRefresh(VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kr = createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, true);

// Set up the rebalance and user tasks endpoints with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REMOVE_BROKER);
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 0);
cruiseControlServer.setupCCStopResponse();

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
crdCreateCruiseControlSecrets();

Checkpoint checkpoint = context.checkpoint();
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
// the resource moved from 'New' to 'ProposalReady' directly (no pending calls in the Mock server)
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady)))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from ProposalReady to Rebalancing on auto approval
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing);
}))
.compose(v -> {
// apply the "refresh" annotation to the resource in the Rebalancing state
annotate(client, namespace, kr.getMetadata().getName(), KafkaRebalanceAnnotation.refresh);
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v -> {
// the resource moved from Rebalancing to ProposalReady due to refresh annotation
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady);
}))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from ProposalReady to Rebalancing
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing);
}))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from Rebalancing to Ready
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Ready);
checkpoint.flag();
}));
}

/**
* Tests the transition from 'New' to 'NotReady' due to "missing hard goals" error
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,27 @@ private void krStoppedRefreshToPendingProposal(Vertx vertx, VertxTestContext con
.onComplete(result -> checkOptimizationResults(result, context, true));
}

/**
* Tests the transition from 'Rebalancing' to 'ProposalReady' when refresh
*
* 1. A new KafkaRebalance resource is created and annotated with strimzi.io/rebalance=refresh; it is in the Rebalancing state
* 2. The operator calls the /stop_proposal_execution to stop the ongoing rebalance execution
* 3. The operator sends a request for a new proposal
* 4. The operator computes the next state on the proposal via the Cruise Control REST API
* 5. The rebalance proposal is ready on the first call
* 6. The KafkaRebalance resource moves to the 'ProposalReady' state
*/
@Test
public void testRebalancingToRefreshProposalReady(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, "", "refresh", REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCStopResponse();
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REMOVE_BROKER);
checkTransition(vertx, context,
KafkaRebalanceState.Rebalancing, KafkaRebalanceState.ProposalReady,
kcRebalance)
.onComplete(result -> checkOptimizationResults(result, context, false));
}

/**
* Tests the transition from 'Stopped' to 'ProposalReady' when refresh
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private String getExpectedRebalanceString() {
CruiseControlParameters.DRY_RUN + "=false&" +
CruiseControlParameters.VERBOSE + "=true&" +
CruiseControlParameters.SKIP_HARD_GOAL_CHECK + "=false&" +
CruiseControlParameters.STOP_ONGOING_EXECUTION + "=false&" +
CruiseControlParameters.EXCLUDED_TOPICS + "=test-.*&" +
CruiseControlParameters.GOALS + "=");

Expand Down Expand Up @@ -72,6 +73,7 @@ public void testQueryStringList() {
.withParameter(CruiseControlParameters.DRY_RUN, "false")
.withParameter(CruiseControlParameters.VERBOSE, "true")
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, "false")
.withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, "false")
.withParameter(CruiseControlParameters.EXCLUDED_TOPICS, "test-.*")
.withParameter(CruiseControlParameters.GOALS, GOALS)
.withParameter(CruiseControlParameters.REBALANCE_DISK, "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ public enum CruiseControlParameters {
/**
* Skip rack awareness check
*/
SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check");
SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check"),

/**
* Stop the ongoing execution (if any) and start executing the given request
*/
STOP_ONGOING_EXECUTION("stop_ongoing_execution");

private final String key;

Expand Down
Loading