diff --git a/kube-controllers/pkg/controllers/node/controller.go b/kube-controllers/pkg/controllers/node/controller.go index 53b7e5d896d..4aa6addb07c 100644 --- a/kube-controllers/pkg/controllers/node/controller.go +++ b/kube-controllers/pkg/controllers/node/controller.go @@ -59,7 +59,7 @@ type NodeController struct { dataFeed *utils.DataFeed // Sub-controllers - ipamCtrl *ipamController + ipamCtrl *IPAMController } // NewNodeController Constructor for NodeController diff --git a/kube-controllers/pkg/controllers/node/ipam.go b/kube-controllers/pkg/controllers/node/ipam.go index 84dd0763c15..07f74dd564e 100644 --- a/kube-controllers/pkg/controllers/node/ipam.go +++ b/kube-controllers/pkg/controllers/node/ipam.go @@ -118,12 +118,12 @@ type rateLimiterItemKey struct { Name string } -func NewIPAMController(cfg config.NodeControllerConfig, c client.Interface, cs kubernetes.Interface, pi, ni cache.Indexer) *ipamController { +func NewIPAMController(cfg config.NodeControllerConfig, c client.Interface, cs kubernetes.Interface, pi, ni cache.Indexer) *IPAMController { var leakGracePeriod *time.Duration if cfg.LeakGracePeriod != nil { leakGracePeriod = &cfg.LeakGracePeriod.Duration } - return &ipamController{ + return &IPAMController{ client: c, clientset: cs, config: cfg, @@ -138,7 +138,7 @@ func NewIPAMController(cfg config.NodeControllerConfig, c client.Interface, cs k podLister: v1lister.NewPodLister(pi), nodeLister: v1lister.NewNodeLister(ni), - nodeDeletionChan: make(chan *v1.Node, batchUpdateSize), + nodeDeletionChan: make(chan struct{}, batchUpdateSize), podDeletionChan: make(chan *v1.Pod, batchUpdateSize), // Buffered channels for potentially bursty channels. @@ -155,6 +155,7 @@ func NewIPAMController(cfg config.NodeControllerConfig, c client.Interface, cs k emptyBlocks: make(map[string]string), poolManager: newPoolManager(), datastoreReady: true, + consolidationWindow: 1 * time.Second, // Track blocks which we might want to release. blockReleaseTracker: newBlockReleaseTracker(leakGracePeriod), @@ -164,7 +165,7 @@ func NewIPAMController(cfg config.NodeControllerConfig, c client.Interface, cs k } } -type ipamController struct { +type IPAMController struct { rl workqueue.TypedRateLimiter[any] client client.Interface clientset kubernetes.Interface @@ -214,29 +215,36 @@ type ipamController struct { // Cache datastoreReady to avoid too much API queries. datastoreReady bool - // channels for indicating that Kubernetes nodes or pods have been deleted. - nodeDeletionChan chan *v1.Node + // Channel for indicating that Kubernetes nodes have been deleted. + nodeDeletionChan chan struct{} podDeletionChan chan *v1.Pod + // consolidationWindow is the time to wait for additional updates after receiving one before processing the updates + // received. This is to allow for multiple node deletion events to be consolidated into a single event. + consolidationWindow time.Duration + // For unit testing purposes. pauseRequestChannel chan pauseRequest + + // fullSyncRequired marks whether or not a full scan of IPAM data is required on the next sync. + fullSyncRequired bool } -func (c *ipamController) Start(stop chan struct{}) { +func (c *IPAMController) Start(stop chan struct{}) { go c.acceptScheduleRequests(stop) } -func (c *ipamController) RegisterWith(f *utils.DataFeed) { +func (c *IPAMController) RegisterWith(f *utils.DataFeed) { f.RegisterForNotification(model.BlockKey{}, c.onUpdate) f.RegisterForNotification(model.ResourceKey{}, c.onUpdate) f.RegisterForSyncStatus(c.onStatusUpdate) } -func (c *ipamController) onStatusUpdate(s bapi.SyncStatus) { +func (c *IPAMController) onStatusUpdate(s bapi.SyncStatus) { c.syncerUpdates <- s } -func (c *ipamController) onUpdate(update bapi.Update) { +func (c *IPAMController) onUpdate(update bapi.Update) { switch update.KVPair.Key.(type) { case model.ResourceKey: switch update.KVPair.Key.(model.ResourceKey).Kind { @@ -250,19 +258,29 @@ func (c *ipamController) onUpdate(update bapi.Update) { } } -func (c *ipamController) OnKubernetesNodeDeleted(node *v1.Node) { - log.Debug("Kubernetes node deletion event") - c.nodeDeletionChan <- node +func (c *IPAMController) OnKubernetesNodeDeleted(n *v1.Node) { + log.WithField("node", n.Name).Debug("Kubernetes node deletion event") + c.nodeDeletionChan <- struct{}{} } -func (c *ipamController) OnKubernetesPodDeleted(pod *v1.Pod) { - log.WithField("pod", pod.Name).Debug("Kubernetes pod deletion event") - c.podDeletionChan <- pod +func (c *IPAMController) OnKubernetesPodDeleted(p *v1.Pod) { + log.WithField("pod", p.Name).Debug("Kubernetes pod deletion event") + c.podDeletionChan <- p +} + +// fullScanNextSync marks the IPAMController for a full resync on the next syncIPAM call. +func (c *IPAMController) fullScanNextSync(reason string) { + if c.fullSyncRequired { + log.WithField("reason", reason).Debug("Full resync already pending") + return + } + c.fullSyncRequired = true + log.WithField("reason", reason).Info("Marking IPAM for full resync") } // acceptScheduleRequests is the main worker routine of the IPAM controller. It monitors // the updates channel and triggers syncs. -func (c *ipamController) acceptScheduleRequests(stopCh <-chan struct{}) { +func (c *IPAMController) acceptScheduleRequests(stopCh <-chan struct{}) { // Periodic sync ticker. period := 5 * time.Minute if c.config.LeakGracePeriod != nil { @@ -275,33 +293,58 @@ func (c *ipamController) acceptScheduleRequests(stopCh <-chan struct{}) { for { // Wait until something wakes us up, or we are stopped. select { - case node := <-c.nodeDeletionChan: - // When a node is deleted, mark it as dirty and schedule a sync. - log.WithField("node", node.Name).Debug("Received node deletion event") - c.allocationState.markDirty(node.Name) + case <-c.nodeDeletionChan: + // Allow bursts of node deletion events to be consolidated. + // We wait a short time to see if more events come in before processing. + wait := time.After(c.consolidationWindow) + var i int + nodeConsolidationLoop: + for i = 1; i < batchUpdateSize; i++ { + select { + case <-c.nodeDeletionChan: + i++ + case <-wait: + break nodeConsolidationLoop + } + } + + // When one or more nodes are deleted, trigger a full sync to ensure that we release + // their affinities. + c.fullScanNextSync(fmt.Sprintf("%d node deletion event(s)", i)) kick(c.syncChan) case pod := <-c.podDeletionChan: - // When a pod is deleted, mark its node as dirty and schedule a sync. - // TODO: We could be tracking dirtiness at the allocation level instead. - log.WithFields(log.Fields{ - "pod": pod.Name, - "ns": pod.Namespace, - "node": pod.Spec.NodeName, - }).Debug("Received pod deletion event") - c.allocationState.markDirty(pod.Spec.NodeName) + // Mark the pod's node as dirty. + c.allocationState.markDirty(pod.Spec.NodeName, "pod deletion") + + // Allow bursts of pod deletion events to be consolidated. + // We wait a short time to see if more events come in before processing. + wait := time.After(c.consolidationWindow) + var i int + podConsolidationLoop: + for i = 1; i < batchUpdateSize; i++ { + select { + case pod = <-c.podDeletionChan: + i++ + c.allocationState.markDirty(pod.Spec.NodeName, "pod deletion") + case <-wait: + break podConsolidationLoop + } + } kick(c.syncChan) case upd := <-c.syncerUpdates: c.handleUpdate(upd) // It's possible we get a rapid series of updates in a row. Use // a consolidation loop to handle "batches" of updates before triggering a sync. + // We wait a short time to see if more events come in before processing. + wait := time.After(c.consolidationWindow) var i int consolidationLoop: for i = 1; i < batchUpdateSize; i++ { select { case upd = <-c.syncerUpdates: c.handleUpdate(upd) - default: + case <-wait: break consolidationLoop } } @@ -310,10 +353,8 @@ func (c *ipamController) acceptScheduleRequests(stopCh <-chan struct{}) { log.WithField("batchSize", i).Debug("Triggering sync after batch of updates") kick(c.syncChan) case <-t.C: - // Periodic IPAM sync. - - // Mark all nodes as dirty. - c.allocationState.markAllDirty() + // Periodic IPAM sync, queue a full scan of the IPAM data. + c.fullScanNextSync("periodic sync") log.Debug("Periodic IPAM sync") err := c.syncIPAM() @@ -348,7 +389,7 @@ func (c *ipamController) acceptScheduleRequests(stopCh <-chan struct{}) { // handleUpdate fans out proper handling of the update depending on the // information in the update. -func (c *ipamController) handleUpdate(upd interface{}) { +func (c *IPAMController) handleUpdate(upd interface{}) { switch upd := upd.(type) { case bapi.SyncStatus: c.syncStatus = upd @@ -381,7 +422,7 @@ func (c *ipamController) handleUpdate(upd interface{}) { } // handleBlockUpdate wraps up the logic to execute when receiving a block update. -func (c *ipamController) handleBlockUpdate(kvp model.KVPair) { +func (c *IPAMController) handleBlockUpdate(kvp model.KVPair) { if kvp.Value != nil { c.onBlockUpdated(kvp) } else { @@ -390,7 +431,7 @@ func (c *ipamController) handleBlockUpdate(kvp model.KVPair) { } // handleNodeUpdate wraps up the logic to execute when receiving a node update. -func (c *ipamController) handleNodeUpdate(kvp model.KVPair) { +func (c *IPAMController) handleNodeUpdate(kvp model.KVPair) { if kvp.Value != nil { n := kvp.Value.(*libapiv3.Node) kn, err := getK8sNodeName(*n) @@ -416,7 +457,7 @@ func (c *ipamController) handleNodeUpdate(kvp model.KVPair) { } } -func (c *ipamController) handlePoolUpdate(kvp model.KVPair) { +func (c *IPAMController) handlePoolUpdate(kvp model.KVPair) { if kvp.Value != nil { pool := kvp.Value.(*apiv3.IPPool) c.onPoolUpdated(pool) @@ -427,7 +468,7 @@ func (c *ipamController) handlePoolUpdate(kvp model.KVPair) { } // handleClusterInformationUpdate wraps the logic to execute when receiving a clusterinformation update. -func (c *ipamController) handleClusterInformationUpdate(kvp model.KVPair) { +func (c *IPAMController) handleClusterInformationUpdate(kvp model.KVPair) { if kvp.Value != nil { ci := kvp.Value.(*apiv3.ClusterInformation) if ci.Spec.DatastoreReady != nil { @@ -438,7 +479,7 @@ func (c *ipamController) handleClusterInformationUpdate(kvp model.KVPair) { } } -func (c *ipamController) onBlockUpdated(kvp model.KVPair) { +func (c *IPAMController) onBlockUpdated(kvp model.KVPair) { blockCIDR := kvp.Key.(model.BlockKey).CIDR.String() log.WithField("block", blockCIDR).Debug("Received block update") b := kvp.Value.(*model.AllocationBlock) @@ -549,7 +590,7 @@ func (c *ipamController) onBlockUpdated(kvp model.KVPair) { c.allBlocks[blockCIDR] = kvp } -func (c *ipamController) onBlockDeleted(key model.BlockKey) { +func (c *IPAMController) onBlockDeleted(key model.BlockKey) { blockCIDR := key.CIDR.String() log.WithField("block", blockCIDR).Info("Received block delete") @@ -576,7 +617,7 @@ func (c *ipamController) onBlockDeleted(key model.BlockKey) { c.poolManager.onBlockDeleted(blockCIDR) } -func (c *ipamController) onPoolUpdated(pool *apiv3.IPPool) { +func (c *IPAMController) onPoolUpdated(pool *apiv3.IPPool) { if c.poolManager.allPools[pool.Name] == nil { registerMetricVectorsForPool(pool.Name) publishPoolSizeMetric(pool) @@ -585,14 +626,25 @@ func (c *ipamController) onPoolUpdated(pool *apiv3.IPPool) { c.poolManager.onPoolUpdated(pool) } -func (c *ipamController) onPoolDeleted(poolName string) { +func (c *IPAMController) onPoolDeleted(poolName string) { unregisterMetricVectorsForPool(poolName) clearPoolSizeMetric(poolName) c.poolManager.onPoolDeleted(poolName) } -func (c *ipamController) updateMetrics() { +func (c *IPAMController) updateMetrics() { + if !c.datastoreReady { + log.Warn("datastore is locked, skipping metrics sync") + return + } + + // Skip if not InSync yet. + if c.syncStatus != bapi.InSync { + log.WithField("status", c.syncStatus).Debug("Have not yet received InSync notification, skipping metrics sync.") + return + } + log.Debug("Gathering latest IPAM state for metrics") // Keep track of various counts so that we can report them as metrics. These counts track legacy metrics by node. @@ -652,9 +704,9 @@ func (c *ipamController) updateMetrics() { // Update legacy gauges legacyAllocationsGauge.Reset() - for node, allocations := range c.allocationState.allocationsByNode { + c.allocationState.iter(func(node string, allocations map[string]*allocation) { legacyAllocationsGauge.WithLabelValues(node).Set(float64(len(allocations))) - } + }) legacyBlocksGauge.Reset() for node, num := range legacyBlocksByNode { legacyBlocksGauge.WithLabelValues(node).Set(float64(num)) @@ -666,7 +718,7 @@ func (c *ipamController) updateMetrics() { log.Debug("IPAM metrics updated") } -// checkEmptyBlocks looks at known empty blocks, and releases their affinity +// releaseUnusedBlocks looks at known empty blocks, and releases their affinity // if appropriate. A block is a candidate for having its affinity released if: // // - The block is empty. @@ -675,7 +727,7 @@ func (c *ipamController) updateMetrics() { // // A block will only be released if it has been in this state for longer than the configured // grace period, which defaults to 15m. -func (c *ipamController) checkEmptyBlocks() error { +func (c *IPAMController) releaseUnusedBlocks() error { for blockCIDR, node := range c.emptyBlocks { logc := log.WithFields(log.Fields{"blockCIDR": blockCIDR, "node": node}) nodeBlocks := c.blocksByNode[node] @@ -749,19 +801,44 @@ func (c *ipamController) checkEmptyBlocks() error { // - The node no longer exists in the Kubernetes API, AND // - There are no longer any IP allocations on the node, OR // - The remaining IP allocations on the node are all determined to be leaked IP addresses. -func (c *ipamController) checkAllocations() ([]string, error) { +func (c *IPAMController) checkAllocations() ([]string, error) { // For each node present in IPAM, if it doesn't exist in the Kubernetes API then we // should consider it a candidate for cleanup. - // - // Build a map of any nodes that have had their allocation - // state change since the last sync. - nodesAndAllocations := map[string]map[string]*allocation{} - for node := range c.allocationState.dirtyNodes { - nodesAndAllocations[node] = c.allocationState.allocationsByNode[node] + nodesToCheck := map[string]map[string]*allocation{} + + if c.fullSyncRequired { + // If a full sync is required, we need to consider all nodes in the IPAM cache - not just the ones that + // have changed since the last sync. This is a more expensive operation, so we only do it periodically. + log.Info("Performing a full scan of IPAM allocations to check for leaks and redundant affinities") + + for _, node := range c.nodesByBlock { + // For each affine block, add an entry. This makes sure we consider them even + // if they have no allocations. + nodesToCheck[node] = nil + } + + // Add in allocations for any nodes that have them. + c.allocationState.iter(func(node string, allocations map[string]*allocation) { + nodesToCheck[node] = allocations + }) + + // Clear the full sync flag. + c.fullSyncRequired = false + } else { + log.Info("Checking dirty nodes for leaks and redundant affinities") + + // Collect allocation state for all nodes that have changed since the last sync. + c.allocationState.iterDirty(func(node string, allocations map[string]*allocation) { + log.WithField("node", node).Debug("Node is dirty, checking for leaks") + nodesToCheck[node] = allocations + }) } + + // nodesToRelease tracks nodes that exist in Calico IPAM, but do not exist in the Kubernetes API. + // These nodes should have all of their block affinities released. nodesToRelease := []string{} - for cnode, allocations := range nodesAndAllocations { + for cnode, allocations := range nodesToCheck { // Lookup the corresponding Kubernetes node for each Calico node we found in IPAM. // In KDD mode, these are identical. However, in etcd mode its possible that the Calico node has a // different name from the Kubernetes node. @@ -882,7 +959,7 @@ func (c *ipamController) checkAllocations() ([]string, error) { // allocationIsValid returns true if the allocation is still in use, and false if the allocation // appears to be leaked. -func (c *ipamController) allocationIsValid(a *allocation, preferCache bool) bool { +func (c *IPAMController) allocationIsValid(a *allocation, preferCache bool) bool { ns := a.attrs[ipam.AttributeNamespace] pod := a.attrs[ipam.AttributePod] logc := log.WithFields(a.fields()) @@ -985,7 +1062,7 @@ func (c *ipamController) allocationIsValid(a *allocation, preferCache bool) bool return false } -func (c *ipamController) syncIPAM() error { +func (c *IPAMController) syncIPAM() error { if !c.datastoreReady { log.Warn("datastore is locked, skipping ipam sync") return nil @@ -997,26 +1074,32 @@ func (c *ipamController) syncIPAM() error { return nil } - // Check if any nodes in IPAM need to have affinities released. log.Debug("Synchronizing IPAM data") + + // Scan known allocations, determining if there are any IP address leaks + // or nodes that should have their block affinities released. nodesToRelease, err := c.checkAllocations() if err != nil { return err } - // Release all confirmed leaks. - err = c.garbageCollectIPs() + // Release all confirmed leaks. Leaks are confirmed in checkAllocations() above. + err = c.garbageCollectKnownLeaks() if err != nil { return err } - // Check if any empty blocks should be removed. - err = c.checkEmptyBlocks() + // Release any block affinities for empty blocks that are no longer needed. + // This ensures Nodes don't hold on to blocks that are no longer in use, allowing them to + // to be claimed elsewhere. + err = c.releaseUnusedBlocks() if err != nil { return err } - // Delete any nodes that we determined can be removed above. + // Delete any nodes that we determined can be removed in checkAllocations. These + // nodes are no longer in the Kubernetes API, and have no valid allocations, so can be cleaned up entirely + // from Calico IPAM. var storedErr error if len(nodesToRelease) > 0 { log.WithField("num", len(nodesToRelease)).Info("Found a batch of nodes to release") @@ -1040,13 +1123,13 @@ func (c *ipamController) syncIPAM() error { return storedErr } - log.Debug("IPAM sync completed") c.allocationState.syncComplete() + log.Debug("IPAM sync completed") return nil } -// garbageCollectIPs checks all known allocations and garbage collects any confirmed leaks. -func (c *ipamController) garbageCollectIPs() error { +// garbageCollectKnownLeaks checks all known allocations and garbage collects any confirmed leaks. +func (c *IPAMController) garbageCollectKnownLeaks() error { for id, a := range c.confirmedLeaks { logc := log.WithFields(a.fields()) @@ -1086,7 +1169,7 @@ func (c *ipamController) garbageCollectIPs() error { return nil } -func (c *ipamController) cleanupNode(cnode string) error { +func (c *IPAMController) cleanupNode(cnode string) error { // At this point, we've verified that the node isn't in Kubernetes and that all the allocations // are tied to pods which don't exist anymore. Clean up any allocations which may still be laying around. logc := log.WithField("calicoNode", cnode) @@ -1109,7 +1192,7 @@ func (c *ipamController) cleanupNode(cnode string) error { } // nodeExists returns true if the given node still exists in the Kubernetes API. -func (c *ipamController) nodeExists(knode string) bool { +func (c *IPAMController) nodeExists(knode string) bool { _, err := c.nodeLister.Get(knode) if err != nil { if errors.IsNotFound(err) { @@ -1122,7 +1205,7 @@ func (c *ipamController) nodeExists(knode string) bool { // nodeIsBeingMigrated looks up a Kubernetes node for a Calico node and checks, // if it is marked by the flannel-migration controller to undergo migration. -func (c *ipamController) nodeIsBeingMigrated(name string) (bool, error) { +func (c *IPAMController) nodeIsBeingMigrated(name string) (bool, error) { // Find the Kubernetes node referenced by the Calico node kname, err := c.kubernetesNodeForCalico(name) if err != nil { @@ -1153,7 +1236,7 @@ func (c *ipamController) nodeIsBeingMigrated(name string) (bool, error) { // kubernetesNodeForCalico returns the name of the Kubernetes node that corresponds to this Calico node. // This function returns an empty string if no corresponding node could be found. // Returns ErrorNotKubernetes if the given Calico node is not a Kubernetes node. -func (c *ipamController) kubernetesNodeForCalico(cnode string) (string, error) { +func (c *IPAMController) kubernetesNodeForCalico(cnode string) (string, error) { // Check if we have the node name cached. if kn, ok := c.kubernetesNodesByCalicoName[cnode]; ok && kn != "" { return kn, nil @@ -1178,7 +1261,7 @@ func (c *ipamController) kubernetesNodeForCalico(cnode string) (string, error) { return getK8sNodeName(*calicoNode) } -func (c *ipamController) incrementReclamationMetric(block string, node string) { +func (c *IPAMController) incrementReclamationMetric(block string, node string) { pool := c.poolManager.poolsByBlock[block] if node == "" { node = unknownNodeLabel @@ -1263,7 +1346,7 @@ func unregisterMetricVectorsForPool(poolName string) { // Creates map used to index gauge values by node, and seeds with zeroes to create explicit zero values rather than // absence of data for a node. This enables users to construct utilization expressions that return 0 when the numerator // is zero, rather than no data. If the pool is the 'unknown' pool, the map is not seeded. -func (c *ipamController) createZeroedMapForNodeValues(poolName string) map[string]int { +func (c *IPAMController) createZeroedMapForNodeValues(poolName string) map[string]int { valuesByNode := map[string]int{} if poolName != unknownPoolLabel { @@ -1327,7 +1410,7 @@ type pauseRequest struct { // pause pauses the controller's main loop until the returned function is called. // this function is for TESTING PURPOSES ONLY, allowing the tests to safely access // the controller's data caches without races. -func (c *ipamController) pause() func() { +func (c *IPAMController) pause() func() { doneChan := make(chan struct{}) pauseConfirmed := make(chan struct{}) c.pauseRequestChannel <- pauseRequest{doneChan: doneChan, pauseConfirmed: pauseConfirmed} diff --git a/kube-controllers/pkg/controllers/node/ipam_allocation.go b/kube-controllers/pkg/controllers/node/ipam_allocation.go index 29530b1e05f..c27fd42162f 100644 --- a/kube-controllers/pkg/controllers/node/ipam_allocation.go +++ b/kube-controllers/pkg/controllers/node/ipam_allocation.go @@ -237,7 +237,7 @@ func (a *allocation) isWindowsReserved() bool { func newAllocationState() *allocationState { return &allocationState{ allocationsByNode: map[string]map[string]*allocation{}, - dirtyNodes: map[string]bool{}, + dirtyNodes: map[string]struct{}{}, } } @@ -247,8 +247,8 @@ type allocationState struct { // allocationsByNode maps a node name to a map of allocations keyed by a unique identifier. allocationsByNode map[string]map[string]*allocation - // dirtyNodes is a set of nodes that have had allocations added or removed since the last sync. - dirtyNodes map[string]bool + // dirtyNodes tracks which nodes have had their IPAM data modified since the last sync. + dirtyNodes map[string]struct{} } // allocate marks an allocation as being allocated. @@ -259,16 +259,42 @@ func (t *allocationState) allocate(a *allocation) { t.allocationsByNode[a.node()][a.id()] = a // Mark the node as dirty. - t.markDirty(a.node()) + t.markDirty(a.node(), "new allocation") } -func (t *allocationState) markDirty(node string) { +func (t *allocationState) iter(f func(string, map[string]*allocation)) { + for node, allocations := range t.allocationsByNode { + f(node, allocations) + } +} + +func (t *allocationState) iterDirty(f func(string, map[string]*allocation)) { + for node := range t.dirtyNodes { + if allocations, ok := t.allocationsByNode[node]; ok { + f(node, allocations) + } else { + f(node, nil) + } + } +} + +func (t *allocationState) markDirty(node string, reason string) { if node == "" { return } if _, ok := t.dirtyNodes[node]; !ok { - log.WithField("node", node).Debug("Marking node as dirty") - t.dirtyNodes[node] = true + log.WithFields(log.Fields{ + "node": node, + "reason": reason, + }).Debug("Marking node as dirty") + t.dirtyNodes[node] = struct{}{} + } +} + +func (t *allocationState) syncComplete() { + for node := range t.dirtyNodes { + log.WithField("node", node).Debug("Node is no longer dirty") + delete(t.dirtyNodes, node) } } @@ -282,24 +308,10 @@ func (t *allocationState) release(a *allocation) { delete(t.allocationsByNode[a.node()], a.id()) // Mark the node as dirty. - t.markDirty(a.node()) + t.markDirty(a.node(), "allocation released") // Check if the node is empty and clean it up if so. if len(t.allocationsByNode[a.node()]) == 0 { delete(t.allocationsByNode, a.node()) } } - -func (t *allocationState) syncComplete() { - for node := range t.dirtyNodes { - log.WithField("node", node).Debug("Clearing dirty flag") - delete(t.dirtyNodes, node) - } -} - -// markAllDirty marks all nodes as dirty, so that they will be re-processed on the next sync. -func (t *allocationState) markAllDirty() { - for node := range t.allocationsByNode { - t.dirtyNodes[node] = true - } -} diff --git a/kube-controllers/pkg/controllers/node/ipam_test.go b/kube-controllers/pkg/controllers/node/ipam_test.go index 3cf62a19510..17dd78f57a1 100644 --- a/kube-controllers/pkg/controllers/node/ipam_test.go +++ b/kube-controllers/pkg/controllers/node/ipam_test.go @@ -16,11 +16,13 @@ package node import ( "context" "fmt" + "math/big" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -52,7 +54,7 @@ const ( // assertConsistentState performs checks on the provided IPAM controller's internal // caches to ensure that they are consistent with each other. Useful for ensuring that // at any arbitrary point in time, we're not in an unknown state. -func assertConsistentState(c *ipamController) { +func assertConsistentState(c *IPAMController) { // Stop the world so we can inspect it. done := c.pause() defer done() @@ -93,7 +95,7 @@ func assertConsistentState(c *ipamController) { } var _ = Describe("IPAM controller UTs", func() { - var c *ipamController + var c *IPAMController var cli client.Interface var cs kubernetes.Interface var stopChan chan struct{} @@ -144,6 +146,9 @@ var _ = Describe("IPAM controller UTs", func() { // Create a new controller. We don't register with a data feed, // as the tests themselves will drive the controller. c = NewIPAMController(cfg, cli, cs, podInformer.GetIndexer(), nodeInformer.GetIndexer()) + + // For testing, speed up update batching. + c.consolidationWindow = 1 * time.Millisecond }) AfterEach(func() { @@ -1533,7 +1538,30 @@ var _ = Describe("IPAM controller UTs", func() { Consistently(numBlocks, assertionTimeout, 100*time.Millisecond).Should(Equal(1)) }) - Context("with several allocations across nodes", func() { + It("should delete empty IPAM blocks when the node no longer exists", func() { + // This testcase handles an edge case in our code, to make sure we spot node affinities that + // must be released even when there are no allocations in the block. Since much of the GC controller logic + // is based on allocations, it's important to have an explicit test for this case. + + // Create an empty block with an affinity to a node that doesn't exist. Then, trigger a full GC cycle. The controller + // should spot the empty block and release it. + c.onUpdate(createBlock(nil, "dead-node", "10.0.0.0/26")) + + // Start the controller. + c.Start(stopChan) + + // Mark the syncer as InSync so that the GC will be enabled. + c.fullScanNextSync("forced by test") + c.onStatusUpdate(bapi.InSync) + + // Expect the block to be released. + fakeClient := cli.IPAM().(*fakeIPAMClient) + Eventually(func() bool { + return fakeClient.affinityReleased("dead-node") + }, assertionTimeout, 100*time.Millisecond).Should(BeTrue(), "Affinity for dead-node should be released") + }) + + Context("with a 1hr grace period", func() { ns := "test-namespace" podsNode1 := []v1.Pod{ { @@ -1591,54 +1619,17 @@ var _ = Describe("IPAM controller UTs", func() { } // Create some IPAM blocks, assigning IPs to the pods. - createBlock(podsNode1, c, "10.0.1.0/24") - createBlock(podsNode2, c, "10.0.2.0/24") + c.onUpdate(createBlock(podsNode1, "node1", "10.0.1.0/24")) + c.onUpdate(createBlock(podsNode2, "node2", "10.0.2.0/24")) // Mark the syncer as InSync so that the GC will be enabled. c.onStatusUpdate(bapi.InSync) // Start the controller. c.Start(stopChan) - - // Expect nothing to be dirty initially. This makes sure initial sync has completed - // before starting the tests. - Eventually(func() bool { - done := c.pause() - defer done() - return len(c.allocationState.dirtyNodes) == 0 - }, 1*time.Second, 100*time.Millisecond).Should(BeTrue(), "initial state was dirty") }) - It("should handle pod deletion", func() { - // Send a pod deletion event for one of the pods. - Expect(cs.CoreV1().Pods(ns).Delete(context.TODO(), podsNode1[0].Name, metav1.DeleteOptions{})).NotTo(HaveOccurred()) - c.OnKubernetesPodDeleted(&podsNode1[0]) - - // We should see the allocation marked as a candidate for GC. - Eventually(func() error { - done := c.pause() - defer done() - if _, ok := c.allocationState.allocationsByNode["node1"]; !ok { - return fmt.Errorf("node1 not found") - } - if _, ok := c.allocationState.allocationsByNode["node1"]["pod1-1/10.0.1.1"]; !ok { - return fmt.Errorf("allocation not found") - } - if c.allocationState.allocationsByNode["node1"]["pod1-1/10.0.1.1"].leakedAt == nil { - return fmt.Errorf("allocation was not marked as leaked") - } - return nil - }, 1*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) - - // Eventually node1 should be cleared of dirty status. - Eventually(func() bool { - done := c.pause() - defer done() - return c.allocationState.dirtyNodes["node1"] - }, 1*time.Second, 100*time.Millisecond).Should(BeFalse(), "node1 was not cleared of dirty status") - }) - - It("should handle node deletion", func() { + It("should handle node deletion events", func() { // Delete node1. Expect(cs.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{})).NotTo(HaveOccurred()) c.OnKubernetesNodeDeleted(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}) @@ -1663,8 +1654,9 @@ var _ = Describe("IPAM controller UTs", func() { // Delete the pods too. Expect(cs.CoreV1().Pods(ns).Delete(context.TODO(), podsNode1[0].Name, metav1.DeleteOptions{})).NotTo(HaveOccurred()) Expect(cs.CoreV1().Pods(ns).Delete(context.TODO(), podsNode1[1].Name, metav1.DeleteOptions{})).NotTo(HaveOccurred()) - c.OnKubernetesPodDeleted(&podsNode1[0]) - c.OnKubernetesPodDeleted(&podsNode1[1]) + + // Trigger another sync. + c.OnKubernetesNodeDeleted(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}) // All state should be cleaned up now. fakeClient := cli.IPAM().(*fakeIPAMClient) @@ -1684,11 +1676,168 @@ var _ = Describe("IPAM controller UTs", func() { }, 1*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) }) }) + + Context("with a large number of nodes and allocatoins", func() { + // This is a sort of stress test to see how well the controller handles a large number of nodes and allocations. + numNodes := 1000 + podsPerNode := 5 + + var allPods []v1.Pod + var allBlocks []bapi.Update + + BeforeEach(func() { + // Set a normal grace period to prevent frequent syncs, which can cause excessive resource usage + // at such a large scale. + c.config.LeakGracePeriod = &metav1.Duration{Duration: 1 * time.Hour} + c.consolidationWindow = 1 * time.Second + + // Start the controller - we need to do this before creating the nodes, so that the controller is ready to + // consume from its channels. + c.Start(stopChan) + + // Create 5k nodes. + for i := 0; i < numNodes; i++ { + n := libapiv3.Node{} + n.Name = fmt.Sprintf("node%d", i) + n.Spec.OrchRefs = []libapiv3.OrchRef{{NodeName: n.Name, Orchestrator: apiv3.OrchestratorKubernetes}} + _, err := cli.Nodes().Create(context.TODO(), &n, options.SetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + kn := v1.Node{} + kn.Name = n.Name + _, err = cs.CoreV1().Nodes().Create(context.TODO(), &kn, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + var node *v1.Node + Eventually(nodes).WithTimeout(time.Second).Should(Receive(&node)) + } + + // For each node, create 30 pods and assign them IPs. Pre-create the blocks, and then + // send them all in at once to separate the test setup from the controller processing. + for nodeNum := 0; nodeNum < numNodes; nodeNum++ { + // Determine the block CIDR for this node. Each node is given a /26, + // which means for 5k nodes we need a /13 IP pool. + baseIPInt := big.NewInt(int64(0x0a000000 + nodeNum*64)) + baseIP := net.BigIntToIP(baseIPInt, false) + blockCIDR := fmt.Sprintf("%s/26", baseIP.String()) + + podIP := baseIP + nodeName := fmt.Sprintf("node%d", nodeNum) + nodePods := []v1.Pod{} + for podNum := 0; podNum < podsPerNode; podNum++ { + p := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod%d-%d", nodeNum, podNum), + Namespace: "test-namespace", + }, + Spec: v1.PodSpec{NodeName: nodeName}, + Status: v1.PodStatus{ + PodIP: podIP.String(), + PodIPs: []v1.PodIP{{IP: podIP.String()}}, + }, + } + allPods = append(allPods, p) + nodePods = append(nodePods, p) + podIP = net.IncrementIP(podIP, big.NewInt(1)) + + } + allBlocks = append(allBlocks, createBlock(nodePods, nodeName, blockCIDR)) + logrus.WithField("nodeNum", nodeNum).WithField("blockCIDR", blockCIDR).Info("[TEST] Created node + block") + } + + // Create all the pods. This loop consumes the vast majority of the time this test takes to run. + // It would be great to parallelize this, but it's not possible to create pods in parallel with + // the current fake client. + for _, p := range allPods { + _, err := cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), &p, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + var gotPod *v1.Pod + Eventually(pods).WithTimeout(time.Second).Should(Receive(&gotPod)) + logrus.WithField("pod", p.Name).Info("[TEST] Created pod") + } + + By("Sending updates for all blocks") + + // Create all the blocks + for _, u := range allBlocks { + c.onUpdate(u) + } + + // Mark in sync + c.onStatusUpdate(bapi.InSync) + + By("Waiting for controller to be in sync") + + // Wait for the controller to process all the updates. + Eventually(func() bool { + done := c.pause() + defer done() + return len(c.allBlocks) == numNodes + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue()) + Eventually(func() bool { + done := c.pause() + defer done() + return len(c.allocationState.dirtyNodes) == 0 + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue(), "Controller did not process all blocks") + Eventually(func() bool { + done := c.pause() + defer done() + return c.fullSyncRequired + }, 5*time.Second, 100*time.Millisecond).Should(BeFalse()) + }) + + It("should detect a leaked IP reasonably quickly", func() { + By("Deleting a pod to trigger a leak") + + // Delete one of the pods to trigger a leak, and check that the controller detects it. + pod := allPods[numNodes-1] + Expect(cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})).NotTo(HaveOccurred()) + c.OnKubernetesPodDeleted(&pod) + + // Delete a pod on node 0 but don't inform the controller. This should not trigger a leak, + // since the controller is not aware of the pod deletion. + pod2 := allPods[0] + Expect(cs.CoreV1().Pods(pod2.Namespace).Delete(context.TODO(), pod2.Name, metav1.DeleteOptions{})).NotTo(HaveOccurred()) + + // Wait for the controller to detect the leak. This should happen quickly, since the controller + // will only need to process a single block. + Eventually(func() bool { + done := c.pause() + defer done() + a := c.allocationState.allocationsByNode[pod.Spec.NodeName][fmt.Sprintf("%s/%s", pod.Name, pod.Status.PodIP)] + return a.leakedAt != nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue(), "IP was not marked as leaked") + Consistently(func() bool { + // While it would not be WRONG to mark the other pod as leaked, we know that the controller won't do so + // because we haven't told it the pod was deleted. This verifies that the controller is doing work incrementally based + // on the events it receives, rather than brute force syncing. + done := c.pause() + defer done() + a := c.allocationState.allocationsByNode[pod2.Spec.NodeName][fmt.Sprintf("%s/%s", pod2.Name, pod2.Status.PodIP)] + return a.leakedAt == nil + }, 3*time.Second, 100*time.Millisecond).Should(BeTrue(), "IP was unexpected marked as leaked") + + By("Triggering a full IPAM scan") + // Now do a brute force full scan to ensure that the controller eventually catches up. + c.fullScanNextSync("forced by test") + c.onStatusUpdate(bapi.InSync) + Eventually(func() bool { + done := c.pause() + defer done() + a := c.allocationState.allocationsByNode[pod2.Spec.NodeName][fmt.Sprintf("%s/%s", pod2.Name, pod2.Status.PodIP)] + return a.leakedAt != nil + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue(), "IP was not marked as leaked") + }) + }) }) // createBlock creates a block based on the given pods and CIDR, and sends it as an update to the controller. -func createBlock(pods []v1.Pod, c *ipamController, cidrStr string) { - affinity := fmt.Sprintf("host:%s", pods[0].Spec.NodeName) +func createBlock(pods []v1.Pod, host, cidrStr string) bapi.Update { + var affinity *string + if host != "" { + aff := fmt.Sprintf("host:%s", host) + affinity = &aff + } cidr := net.MustParseCIDR(cidrStr) // Create a bootstrap block for access to IPToOrdinal. @@ -1700,7 +1849,7 @@ func createBlock(pods []v1.Pod, c *ipamController, cidrStr string) { attrs = append(attrs, model.AllocationAttribute{ AttrPrimary: &pod.Name, AttrSecondary: map[string]string{ - ipam.AttributeNode: pod.Spec.NodeName, + ipam.AttributeNode: host, ipam.AttributePod: pod.Name, ipam.AttributeNamespace: pod.Namespace, }, @@ -1713,14 +1862,13 @@ func createBlock(pods []v1.Pod, c *ipamController, cidrStr string) { alloc, unalloc := makeAllocationsArrays(int(cidr.Network().NumAddrs().Int64()), assignments) block = model.AllocationBlock{ CIDR: cidr, - Affinity: &affinity, + Affinity: affinity, Allocations: alloc, Unallocated: unalloc, Attributes: attrs, } kvp := model.KVPair{Key: model.BlockKey{CIDR: cidr}, Value: &block} - update := bapi.Update{KVPair: kvp, UpdateType: bapi.UpdateTypeKVNew} - c.onUpdate(update) + return bapi.Update{KVPair: kvp, UpdateType: bapi.UpdateTypeKVNew} } // makeAllocationsArray creates an array of pointers to integers, with the given assignments.