From 4baf73ae34fa8e1726f7580f327bb29ff7a130be Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Tue, 20 Feb 2024 16:24:07 +0000 Subject: [PATCH] Address review comments Made some improvements on the structure Signed-off-by: Gantigmaa Selenge --- 06x-new-kafka-roller.md | 131 ++++++++++++++++++++++------------------ 1 file changed, 71 insertions(+), 60 deletions(-) diff --git a/06x-new-kafka-roller.md b/06x-new-kafka-roller.md index bdca9ca8..968e8234 100644 --- a/06x-new-kafka-roller.md +++ b/06x-new-kafka-roller.md @@ -2,7 +2,7 @@ ## Current situation -The Kafka Roller is a Cluster Operator component that's responsible for rolling Kafka pods when: +The Kafka Roller is a Cluster Operator component that's responsible for coordinating the rolling restart or reconfiguration of Kafka pods when: - non-dynamic reconfigurations needs to be applied - update in Kafka CRD is detected - a certificate is renewed @@ -20,9 +20,9 @@ A pod is considered stuck if it is in one of following states: ### Known Issues -The existing KafkaRoller has been suffering from the following shortcomings: +The existing KafkaRoller suffers from the following shortcomings: - While it is safe and simple to restart one broker at a time, it is slow in large clusters ([related issue](https://github.com/strimzi/strimzi-kafka-operator/issues/8547)). -- It doesn’t worry about partition preferred leadership +- It doesn’t worry about partition preferred leadership. This means there can be more leadership changes than necessary during a rolling restart, with consequent impact on tail latency. - Hard to reason about when things go wrong. The code is complex to understand and it's not easy to determine why a pod was restarted from logs that tend to be noisy. - Potential race condition between Cruise Control rebalance and KafkaRoller that could cause partitions under minimum in sync replica. This issue is described in more detail in the `Future Improvements` section. - The current code for KafkaRoller does not easily allow growth and adding new functionality due to its complexity. @@ -40,49 +40,28 @@ The following non-trivial fixes and changes are missing from the current KafkaRo Strimzi users have been reporting some of the issues mentioned above and would benefit from a new KafkaRoller that is designed to address the shortcomings of the current KafkaRoller. -The current KafkaRoller has complex and nested conditions therefore makes it challenging for users to debug and understand actions taken on their brokers when things go wrong and configure it correctly for their use cases. A new KafkaRoller that is redesigned to be simpler would help users to easily understand the code and configure it to their needs. +The current KafkaRoller has complex and nested conditions therefore makes it challenging for users to debug and understand actions taken on their brokers when things go wrong and configure it correctly for their use cases. It is also not particularly easy to unit test which results in insufficient test coverage for many edge cases, making it challenging to refactor safely. Therefore, refactoring becomes essential to enhance test coverage effectively. A new KafkaRoller that is redesigned to be simpler would help users to easily understand the code and configure it to their needs. -As you can see above, the current KafkaRoller still needs various changes and potentially more as we get more experience with KRaft and discover more issues. Adding these non trivial changes to a component that is very complex and hard to reason, is expensive and poses potential risks of introducing bugs because of tightly coupled logics andlack of testability. +As you can see above, the current KafkaRoller still needs various changes and potentially more as we get more experience with KRaft and discover more issues. Adding these non trivial changes to a component that is very complex and hard to reason, is expensive and poses potential risks of introducing bugs because of tightly coupled logics and lack of testability. ## Proposal -The objective of this proposal is to introduce a new KafkaRoller with simplified logic therefore more testable, and has structured design resembling a finite state machine. KafkaRoller desisions can become more accurate and better informed by observations coming from different sources (e.g. Kubernetes API, KafkaAgent, Kafka Admin API). These sources will be abstracted so that KafkaRoller is not dependent on their specifics as long as it's getting the information it needs. This will enable the KafkaRoller to run even if the underlying platform is different, for example, not Kubernetes. +The objective of this proposal is to introduce a new KafkaRoller with simplified logic having a structured design resembling a finite state machine. KafkaRoller desisions are informed by observations coming from different sources (e.g. Kubernetes API, KafkaAgent, Kafka Admin API). These sources will be abstracted so that KafkaRoller is not dependent on their specifics as long as it's getting the information it needs. The abstractions also enable much better unit testing. -Depending on the observed states, the roller will perform specific actions, causing each node's state to transition to another state based on the corresponding action. This iterative process continues until each node's state aligns with the desired state. +Depending on the observed states, the roller will perform specific actions. Those actions should cause a subsequent observation to cause a state transition. This iterative process continues until each node's state aligns with the desired state. It will also introduce an algorithm that can restart brokers in parallel while applying safety conditions that can guarantee Kafka producer availability and causing minimal impact on controllers and overall stability of clusters. -0. The following can be the configured for the new KafkaRoller: +### Node State +When a new reconciliation starts up, a context object is created for each node to store the state and other useful information used by the roller. It will have the following fields: -| Configuration | Default value | Exposed to user | Description | -| :-------------| :-------------| :---------------| :---------- | -| maxRestarts | 3 | No | The maximum number of times a node can be restarted before failing the reconciliation. This is checked against `numRestarts` in the `Context`.| -| maxReconfigs | 3 | No | The maximum number of times a node can be reconfigured before restarting it. This is checked against `numReconfigs` in the `Context`.| -| maxAttempts| 10 | No | The maximum number of times a node can be retried before failing the reconciliation. This is checked against `numAttempts` in the `Context`.| -| postOperationTimeoutMs | 60 seconds | Yes | The maximum amount of time we will wait for nodes to transition to `SERVING` state after an operation in each retry. This will be based on the operational timeout that is already exposed to the user. | -| maxBatchSize | 1 | Yes | The maximum number of broker nodes that can be restarted in parallel. | - -1. When a new reconciliation starts up, `Context` is created for each node. - ``` - Context { - nodeId: int - nodeRole: String - state: ServerState - reason: String - numRestarts: int - numReconfigs: int - numAttempts: int - lastTransitionTime: long - } - ``` - - - nodeId: Node ID. - - nodeRoles: Process roles of this node (e.g. controller, broker). This will be set using the pod labels `strimzi.io/controller-role` and `strimzi.io/broker-role` because these are the currently assigned roles of the node. + - nodeRef: NodeRef object that contains Node ID. + - currentNodeRole: Currently assigned process roles for this node (e.g. controller, broker). - state: It contains the current state of the node based on information collected from the abstracted sources. The table below describes the possible states. - reason: It is updated based on the current predicate logic from the Reconciler. For example, an update in the Kafka CR is detected. - numRestarts: The value is incremented each time the node has been attempted to restart. - numReconfigs: The value is incremented each time the node has been attempted to reconfigure. - - numReconfigs: The value is incremented each time the node has been retried by the roller. + - numAttempts: The value is incremented each time the node cannot be restarted/reconfigured due to not meeting safety conditions (more on this later). - lastTransitionTime: System.currentTimeMillis of last observed state transition. States @@ -97,9 +76,36 @@ It will also introduce an algorithm that can restart brokers in parallel while a | SERVING | Node is in running state and ready to serve requests (broker state >= 3 AND != 127). | | LEADING_ALL_PREFERRED | Node is in running state and leading all preferred replicas. | -2. The existing predicate function will be called for each of the nodes and those for which the function returns a non-empty list of reasons will be restarted. +### Configurability +The following can be the configured for the new KafkaRoller: + +| Configuration | Default value | Exposed to user | Description | +| :-------------| :-------------| :---------------| :---------- | +| maxRestarts | 3 | No | The maximum number of times a node can be restarted before failing the reconciliation. This is checked against `numRestarts` in the `Context`.| +| maxReconfigs | 3 | No | The maximum number of times a node can be reconfigured before restarting it. This is checked against `numReconfigs` in the `Context`.| +| maxAttempts| 10 | No | The maximum number of times a node can retried after not meeting the safety conditions. This is checked against `numAttempts` in the `Context`.| +| postOperationTimeoutMs | 60 seconds | Yes | The maximum amount of time we will wait for nodes to transition to `SERVING` state after an operation in each retry. This will be based on the operation timeout that is already exposed to the user via environment variable `STRIMZI_OPERATION_TIMEOUT_MS`. | +| maxBatchSize | 1 | Yes | The maximum number of broker nodes that can be restarted in parallel. | + -3. Group the nodes into the following categories based on information collected from the abstracted sources: +### Algorithm + +1. Initialise a context object for each node with the following data: +``` +Context: { + nodeRef: + nodeRoles: + state: UNKNOWN + lastTransition: + reason: + numRestarts: 0 + numReconfigs: 0 + numAttempts: 0 +} +``` +2. Transition each node's state to the corresponding state based on the information collected from the abstracted sources such as Kubernetes API and KafkaAgent. + +3. Group the nodes into the following categories based their state and connectivity: - `RESTART_FIRST` - Nodes that have `NOT_READY` or `NOT_RUNNING` state in their contexts. The group will also include nodes that we cannot connect to via Admin API. - `WAIT_FOR_LOG_RECOVERY` - Nodes that have `RECOVERING` state. - `RESTART` - Nodes that have non-empty list of reasons from the predicate function and have not been restarted yet (Context.numRestarts == 0). @@ -111,15 +117,23 @@ It will also introduce an algorithm that can restart brokers in parallel while a - Wait for each node to have `SERVING` within the `postOperationalTimeoutMs`. - If the timeout is reached for a node and its `numAttempts` is greater than or equal to `maxAttempts`, throw `UnrestartableNodesException` with the log recovery progress (number of remaining logs and segments). Otherwise increment node's `numAttempts` and restart from step 3. -5. Restart nodes in `RESTART_FIRST` category either one by one in the following order unless all nodes are combined -and are in `NOT_RUNNING` state: - - Pure controller nodes - - Combined nodes - - Broker only nodes +5. Restart nodes in `RESTART_FIRST` category: + - if one or more nodes have `NOT_RUNNING` state, we first need to check 2 special conditions: + - If all of the nodes are combined and are in `NOT_RUNNING` state, restart them in parallel to give the best chance of forming the quorum. + > This is to address the issue described in https://github.com/strimzi/strimzi-kafka-operator/issues/9426. + + - If a node is in `NOT_RUNNING` state, the restart it only if it has `POD_HAS_OLD_REVISION` reason. This is because, if the node is not running at all, then restarting it likely won't make any difference unless node is out of date. + > For example, if a pod is in pending state due to misconfigured affinity rule, there is no point restarting this pod again or restarting other pods, because that would leave them in pending state as well. If the user then fixed the misconfigured affinity rule, then we should detect that the pod has an old revision, therefore should restart it so that pod is scheduled correctly and runs. - If all controllers are combined and are in `NOT_RUNNING` state, restart all nodes in parallel and wait for them to have `SERVING`. Explained more in detail below. + - At this point either we started all nodes or a node or decided not to because of node's reason not being `POD_HAS_OLD_REVISION`. Regardless, wait for nodes to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxAttempts`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3. - Wait until the restarted node to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxAttempts`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3. + + - Otherwise the controllers will be attempted to restart one by one in the following order: + - Pure controller nodes + - Combined nodes + - Broker only nodes + + - Wait for the restarted node to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxAttempts`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3. 6. Further refine the broker nodes in `MAYBE_RECONFIGURE` group: - Describe Kafka configurations for each node via Admin API and compare them against the desired configurations. This is essentially the same mechanism we use today for the current KafkaRoller. @@ -131,23 +145,26 @@ and are in `NOT_RUNNING` state: - If `numReconfigs` of a node is greater than the configured `maxReconfigs`, add a restart reason to its context. Otherwise continue. - Send `incrementalAlterConfig` request with its config updates. - Transitions the node's state to `RECONFIGURED` and increment its `numReconfigs`. - - Wait for each node that got configurations updated until they have `SERVING` within the `postOperationalTimeoutMs`. + - Wait for each node that got configurations updated until they have `LEADING_ALL_PREFERRED` within the `postOperationalTimeoutMs`. - If the `postOperationalTimeoutMs` is reached, restart from step 3. 8. If at this point, the `RESTART` group is empty, the reconciliation will be completed successfully. 9. Otherwise, batch nodes in `RESTART` group and get the next batch to restart: - Further categorize nodes based on their roles so that the following restart order can be enforced: - 1. `NON_ACTIVE_PURE_CONTROLLER` - Pure controller that is not the active controller - 2. `ACTIVE_PURE_CONTROLLER` - Pure controller that is the active controller (the quorum leader) - 3. `BROKER_AND_NOT_ACTIVE_CONTROLLER` - Node that is at least a broker but also might be a controller (combined) and is not the active controller - 4. `BROKER_AND_ACTIVE_CONTROLLER` - Combined node that is the active controller (the quorum leader) - - The returned batch will contain only one node if it is not `BROKER_AND_NOT_ACTIVE_CONTROLLER` group, so that controllers are restarted one at a time. + 1. `NON_ACTIVE_CONTROLLER` - Pure controller that is not the active controller + 2. `ACTIVE_CONTROLLER` - Pure controller that is the active controller (the quorum leader) + 3. `COMBINED_AND_NOT_ACTIVE_CONTROLLER` - Combined node (both controller and broker) and is not the active controller + 4. `COMBINED_AND_ACTIVE_CONTROLLER` - Combined node (both controller and broker) and is the active controller (the quorum leader) + 5. `BROKER` - Pure broker + + > The batch returned will comprise only one node for all groups except 'BROKER', ensuring that controllers are restarted sequentially. This approach is taken to mitigate the risk of losing quorum when restarting multiple controller nodes simultaneously. A failure to establish quorum due to unhealthy controller nodes directly impacts the brokers and consequently the availability of the cluster. However, restarting broker nodes can be executed without affecting availability. If concurrently restarting brokers do not share any topic partitions, the in-sync replicas (ISRs) of topic partitions will lose no more than one replica, thus preserving availability. - If `NON_ACTIVE_PURE_CONTROLLER` group is non empty, return the first node that can be restarted without impacting the quorum health (more on this later). - If `ACTIVE_PURE_CONTROLLER` group is non empty, return the node if it can be restarted without impacting the quorum health. Otherwise return an empty set. - - If `BROKER_AND_NOT_ACTIVE_CONTROLLER` group is non empty, batch the broker nodes: + - If `COMBINED_AND_NOT_ACTIVE_CONTROLLER` group is non empty, return the first node that can be restarted without impacting the quorum health and the availability. + - If `COMBINED_AND_ACTIVE_CONTROLLER` group is non empty, return the node if it can be restarted without impacting the quorum health and the availability. Otherwise return an empty set. + - If `BROKER` group is non empty, batch the broker nodes: - remove the node from the list, if it is a combined node and cannot be restarted without impacting the quorum health so that it does get included in a batch - build a map of nodes and their replicating partitions by sending describeTopics request to Admin API - batch the nodes that do not have any partitions in common therefore can be restarted together @@ -159,17 +176,11 @@ and are in `NOT_RUNNING` state: - If `numRestarts` of a node is larger than `maxRestarts`, throw `MaxRestartsExceededException`. - Otherwise, restart each node and transition its state to `RESTARTED` and increment its `numRestarts`. - After restarting all the nodes in the batch, wait for their states to become `SERVING` until the configured `postOperationalTimeoutMs` is reached. - - If the timeout is reached, throw `TimeoutException` if a node's `numAttempts` is greater than or equal to `maxAttempts`. Otherwise increment their `numAttempts` and restart from step 3. + - If the timeout is reached, throw `TimeoutException` if a node's `numAttempts` is greater than or equal to `maxAttempts`. Otherwise increment their `numAttempts` and restart from step 3. 9. If there are no exceptions thrown at this point, the reconciliation completes successfully. If there were `UnrestartableNodesException`, `TimeoutException`, `MaxRestartsExceededException` or any other unexpected exceptions throws, the reconciliation fails. -TODO: we need to figure out when to elect preferred leaders and not fail the reconciliation if did not become leaders within the timeout. This does not apply to pure controllers. - -#### Restarting not running combined nodes - -When restarting not running combined nodes, we will apply a special logic to address the issue described in https://github.com/strimzi/strimzi-kafka-operator/issues/9426. - -In step 3, we restart each node in the `RESTART_FIRST` group one by one. In this specific case, we will compare the total number of not running combined nodes against the total number of controller nodes in the cluster. This is to identify whether all of controllers nodes in this cluster are running in combined mode and none of them are running. In this case, we will restart all the nodes in parallel to give the best chance of forming the quorum. We will then wait for the nodes to have `SERVING` state. +TODO: we need to figure out when to elect preferred leaders and not fail the reconciliation if did not become leaders within the timeout. This does not apply to pure controllers. Also at the point of restarting the unready nodes, admin is not initialised yet. #### Quorum health check @@ -177,7 +188,7 @@ The quorum health logic is similar to the current KafkaRoller except for a coupl Also the current KafkaRoller does not connect to the controller via Admin API to get the quorum health information. By the time, we implement this proposal, Strimzi should support Kafka 3.7 which includes [KIP 919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration). Therefore new KafkaRoller will be able to connect to the controller directly for quorum information and active controller's configuration. -TODO: we need to figure out how to work out the total number of controller without relying on the describe API. +> TODO: we need to figure out how to work out the total number of controller without relying on the describe API. This is because we need to know the number of controllers, when all pods are down. Also nodes could have inconsistent information about the quorum during scale down. #### Availability check @@ -196,7 +207,7 @@ Kafka CR's `KafkaMetadataState` represents where the metadata is stored for the - We are not looking to solve the potential race condition between KafkaRoller and Cruise Control rebalance activity right away but this is something we can solve in the future. An example scenario that cause this race: Let's say we have 5 brokers cluster, minimum in sync replica for topic partition foo-0 is set to 2. The possible sequence of events that could happen: - Broker 0 is down due to an issue and the ISR of foo-0 partition changes from [0, 1, 2] to [1 , 2]. In this case producers with acks-all still can produce to this partition. - - Cruise Control sends `addingReplicas` request to reassign partition foo-0 to 4 instead of 2 in order to achieve its configured goal. + - Cruise Control sends `addingReplicas` request to reassign partition foo-0 to broker 4 instead of broker 2 in order to achieve its configured goal. - The reassignment request is processed and foo-0 partition now has ISR [1, 2, 4]. - Cruise Control sends `removingReplicas` request to un-assign the partition from broker 2. - KafkaRoller is performing a rolling update to the cluster. It checks the availability impact for foo-0 partition before rolling broker 1. Since partition foo-0 has ISR [1, 2, 4], KafkaRoller decides that it is safe to restart broker 1. It is unaware of the `removingReplicas` request that is about to be processed.