Skip to content

Commit

Permalink
Add support for node unregistration in KRaft mode (#10442)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Aug 15, 2024
1 parent 44ab699 commit f5938a8
Show file tree
Hide file tree
Showing 13 changed files with 750 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
On the client `accessTokenLocation`, `clientAssertion`, `clientAssertionLocation`, `clientAssertionType`, and `saslExtensions` have been added.
* Add support for custom Cruise Control API users
* Update HTTP bridge to latest 0.30.0 release
* Unregistration of KRaft nodes after scale-down

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" })
@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "registeredNodeIds", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaStatus extends Status {
private List<ListenerStatus> listeners;
private List<UsedNodePoolStatus> kafkaNodePools;
private List<Integer> registeredNodeIds;

private String clusterId;
private String operatorLastSuccessfulVersion;
Expand All @@ -54,6 +55,16 @@ public void setKafkaNodePools(List<UsedNodePoolStatus> kafkaNodePools) {
this.kafkaNodePools = kafkaNodePools;
}

@Description("Registered node IDs used by this Kafka cluster. " +
"This field is used for internal purposes only and will be removed in the future.")
public List<Integer> getRegisteredNodeIds() {
return registeredNodeIds;
}

public void setRegisteredNodeIds(List<Integer> registeredNodeIds) {
this.registeredNodeIds = registeredNodeIds;
}

@Description("Kafka cluster Id")
public String getClusterId() {
return clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public Future<KafkaStatus> createOrUpdate(Reconciliation reconciliation, Kafka k
// Copy the metadata state if needed
status.setKafkaMetadataState(kafkaAssembly.getStatus().getKafkaMetadataState());
}

if (status.getRegisteredNodeIds() == null
&& kafkaAssembly.getStatus().getRegisteredNodeIds() != null) {
// Copy the list of registered node IDs if needed
status.setRegisteredNodeIds(kafkaAssembly.getStatus().getRegisteredNodeIds());
}
}

if (reconcileResult.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.operator.VertxUtil;
import io.strimzi.operator.common.AdminClientProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.auth.PemAuthIdentity;
import io.strimzi.operator.common.auth.PemTrustSet;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;

import java.util.ArrayList;
import java.util.List;

/**
* Contains utility methods for unregistering KRaft nodes from a Kafka cluster after scale-down
*/
public class KafkaNodeUnregistration {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaNodeUnregistration.class.getName());

/**
* Unregisters Kafka nodes from a KRaft-based Kafka cluster
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param adminClientProvider Kafka Admin API client provider
* @param pemTrustSet Trust set for the admin client to connect to the Kafka cluster
* @param pemAuthIdentity Key set for the admin client to connect to the Kafka cluster
* @param nodeIdsToUnregister List of node IDs that should be unregistered
*
* @return Future that completes when all nodes are unregistered
*/
public static Future<Void> unregisterNodes(
Reconciliation reconciliation,
Vertx vertx,
AdminClientProvider adminClientProvider,
PemTrustSet pemTrustSet,
PemAuthIdentity pemAuthIdentity,
List<Integer> nodeIdsToUnregister
) {
try {
String bootstrapHostname = KafkaResources.bootstrapServiceName(reconciliation.name()) + "." + reconciliation.namespace() + ".svc:" + KafkaCluster.REPLICATION_PORT;
Admin adminClient = adminClientProvider.createAdminClient(bootstrapHostname, pemTrustSet, pemAuthIdentity);

List<Future<Void>> futures = new ArrayList<>();
for (Integer nodeId : nodeIdsToUnregister) {
futures.add(unregisterNode(reconciliation, vertx, adminClient, nodeId));
}

return Future.all(futures)
.eventually(() -> {
adminClient.close();
return Future.succeededFuture();
})
.map((Void) null);
} catch (KafkaException e) {
LOGGER.warnCr(reconciliation, "Failed to unregister nodes", e);
return Future.failedFuture(e);
}
}

/**
* Unregisters a single Kafka node using the Kafka Admin API. In case the failure is caused by the node not being
* registered, the error will be ignored.
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param adminClient Kafka Admin API client instance
* @param nodeIdToUnregister ID of the node that should be unregistered
*
* @return Future that completes when the node is unregistered
*/
private static Future<Void> unregisterNode(Reconciliation reconciliation, Vertx vertx, Admin adminClient, Integer nodeIdToUnregister) {
LOGGER.debugCr(reconciliation, "Unregistering node {} from the Kafka cluster", nodeIdToUnregister);

return VertxUtil
.kafkaFutureToVertxFuture(reconciliation, vertx, adminClient.unregisterBroker(nodeIdToUnregister).all())
.recover(t -> {
if (t instanceof BrokerIdNotRegisteredException) {
// The broker is not registered anymore, so it does not need to be unregistered anymore and we
// report success. Situation like this might happen when the operator fails before updating the
// status, when the Kafka API call fails (e.g. due to network connection) but the unregistration
// was done on the Kafka cluster and similar.
LOGGER.warnCr(reconciliation, "Node {} is not registered and cannot be unregistered from the Kafka cluster", nodeIdToUnregister, t);
return Future.succeededFuture();
} else {
LOGGER.warnCr(reconciliation, "Failed to unregister node {} from the Kafka cluster", nodeIdToUnregister, t);
return Future.failedFuture(t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.operator.resource.ReconcileResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class KafkaReconciler {
private final PlatformFeaturesAvailability pfa;
private final ImagePullPolicy imagePullPolicy;
private final List<LocalObjectReference> imagePullSecrets;
private final List<Integer> previousNodeIds;

// Objects used during the reconciliation
/* test */ final Reconciliation reconciliation;
Expand Down Expand Up @@ -210,6 +212,7 @@ public KafkaReconciler(
this.pfa = pfa;
this.imagePullPolicy = config.getImagePullPolicy();
this.imagePullSecrets = config.getImagePullSecrets();
this.previousNodeIds = kafkaCr.getStatus() != null ? kafkaCr.getStatus().getRegisteredNodeIds() : null;

this.stsOperator = supplier.stsOperations;
this.strimziPodSetOperator = supplier.strimziPodSetOperator;
Expand Down Expand Up @@ -268,6 +271,7 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
.compose(i -> headlessServiceEndpointsReady())
.compose(i -> clusterId(kafkaStatus))
.compose(i -> defaultKafkaQuotas())
.compose(i -> nodeUnregistration(kafkaStatus))
.compose(i -> metadataVersion(kafkaStatus))
.compose(i -> deletePersistentClaims())
.compose(i -> sharedKafkaConfigurationCleanup())
Expand Down Expand Up @@ -939,9 +943,61 @@ protected Future<Void> defaultKafkaQuotas() {
return DefaultKafkaQuotasManager.reconcileDefaultUserQuotas(reconciliation, vertx, adminClientProvider, this.coTlsPemIdentity.pemTrustSet(), this.coTlsPemIdentity.pemAuthIdentity(), kafka.quotas());
}

/**
* Unregisters the KRaft nodes that were removed from the Kafka cluster
*
* @param kafkaStatus Kafka status for updating the list of currently registered node IDs
*
* @return Future which completes when the nodes removed from the Kafka cluster are unregistered
*/
protected Future<Void> nodeUnregistration(KafkaStatus kafkaStatus) {
List<Integer> currentNodeIds = kafka.nodes().stream().map(NodeRef::nodeId).sorted().toList();

if (kafkaMetadataStateManager.getMetadataConfigurationState().isKRaft()
&& previousNodeIds != null
&& !new HashSet<>(currentNodeIds).containsAll(previousNodeIds)) {
// We are in KRaft mode and there are some nodes that were removed => we should unregister them
List<Integer> nodeIdsToUnregister = new ArrayList<>(previousNodeIds);
nodeIdsToUnregister.removeAll(currentNodeIds);

LOGGER.infoCr(reconciliation, "Kafka nodes {} were removed from the Kafka cluster and will be unregistered", nodeIdsToUnregister);

Promise<Void> unregistrationPromise = Promise.promise();
KafkaNodeUnregistration.unregisterNodes(reconciliation, vertx, adminClientProvider, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity(), nodeIdsToUnregister)
.onComplete(res -> {
if (res.succeeded()) {
LOGGER.infoCr(reconciliation, "Kafka nodes {} were successfully unregistered from the Kafka cluster", nodeIdsToUnregister);
kafkaStatus.setRegisteredNodeIds(currentNodeIds);
} else {
LOGGER.warnCr(reconciliation, "Failed to unregister Kafka nodes {} from the Kafka cluster", nodeIdsToUnregister);

// When the unregistration failed, we will keep the original registered node IDs to retry
// the unregistration for them. But we will merge it with any existing node IDs to make
// sure we do not lose track of them.
Set<Integer> updatedNodeIds = new HashSet<>(currentNodeIds);
updatedNodeIds.addAll(previousNodeIds);
kafkaStatus.setRegisteredNodeIds(updatedNodeIds.stream().sorted().toList());
}

// We complete the promise with success even if the unregistration failed as we do not want to
// fail the reconciliation.
unregistrationPromise.complete();
});

return unregistrationPromise.future();
} else {
// We are either not in KRaft mode, or at a cluster without any information about previous nodes, or without
// any change to the nodes => we just update the status field
kafkaStatus.setRegisteredNodeIds(currentNodeIds);
return Future.succeededFuture();
}
}

/**
* Manages the KRaft metadata version
*
* @param kafkaStatus Kafka status used for updating the currently used metadata version
*
* @return Future which completes when the KRaft metadata version is set to the current version or updated.
*/
protected Future<Void> metadataVersion(KafkaStatus kafkaStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.admin.UnregisterBrokerResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.internals.KafkaFutureImpl;
Expand Down Expand Up @@ -591,6 +592,11 @@ public static Admin adminClient() {

when(mock.describeClientQuotas(any())).thenReturn(dcqr);

// Mock KRaft node unregistration
UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class);
when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null));
when(mock.unregisterBroker(anyInt())).thenReturn(ubr);

return mock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ private void basicCheck() {
assertThat(k.getStatus().getOperatorLastSuccessfulVersion(), is(KafkaAssemblyOperator.OPERATOR_VERSION));
assertThat(k.getStatus().getKafkaMetadataState().toValue(), is("KRaft"));
assertThat(k.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers")));
assertThat(k.getStatus().getRegisteredNodeIds().size(), is(6));
assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12));
}

/**
Expand Down Expand Up @@ -562,6 +564,11 @@ public void testReconcileKafkaScaleUpAndDown(VertxTestContext context) {
assertThat(brokers.getStatus().getRoles().size(), is(1));
assertThat(brokers.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

// Check the Kafka resource status
Kafka k = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(k.getStatus().getRegisteredNodeIds().size(), is(8));
assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12, 13, 14));

// Scale down again
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("brokers").edit(p -> new KafkaNodePoolBuilder(p)
.editSpec()
Expand All @@ -588,6 +595,11 @@ public void testReconcileKafkaScaleUpAndDown(VertxTestContext context) {
assertThat(brokers.getStatus().getRoles().size(), is(1));
assertThat(brokers.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

// Check the Kafka resource status
Kafka k = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(k.getStatus().getRegisteredNodeIds().size(), is(6));
assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12));

async.flag();
})));
}
Expand Down Expand Up @@ -642,6 +654,8 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(kafka.getStatus().getKafkaNodePools().size(), is(3));
assertThat(kafka.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), hasItems("brokers", "controllers", "new-pool"));
assertThat(kafka.getStatus().getRegisteredNodeIds().size(), is(9));
assertThat(kafka.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12, 13, 14, 15));

KafkaNodePool controllers = Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("controllers").get();
assertThat(controllers.getStatus().getReplicas(), is(3));
Expand Down Expand Up @@ -676,6 +690,8 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(kafka.getStatus().getKafkaNodePools().size(), is(2));
assertThat(kafka.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), hasItems("brokers", "controllers"));
assertThat(kafka.getStatus().getRegisteredNodeIds().size(), is(6));
assertThat(kafka.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12));

KafkaNodePool controllers = Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("controllers").get();
assertThat(controllers.getStatus().getReplicas(), is(3));
Expand Down
Loading

0 comments on commit f5938a8

Please sign in to comment.