diff --git a/pkg/controller/blockdevice/controller.go b/pkg/controller/blockdevice/controller.go index cd3c2e6f..f0b198aa 100644 --- a/pkg/controller/blockdevice/controller.go +++ b/pkg/controller/blockdevice/controller.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "reflect" + "sync" "time" gocommon "github.com/harvester/go-common" @@ -66,6 +67,12 @@ func (s *semaphore) release() bool { } } +type DiskTags struct { + diskTags map[string][]string + lock *sync.RWMutex + initialized bool +} + type Controller struct { Namespace string NodeName string @@ -87,12 +94,37 @@ const ( NeedMountUpdateNoOp NeedMountUpdateOP = 1 << iota NeedMountUpdateMount NeedMountUpdateUnmount + + errorCacheDiskTagsNotInitialized = "CacheDiskTags is not initialized" ) func (f NeedMountUpdateOP) Has(flag NeedMountUpdateOP) bool { return f&flag != 0 } +var CacheDiskTags *DiskTags + +func (d *DiskTags) DeleteDiskTags(dev string) { + d.lock.Lock() + defer d.lock.Unlock() + + delete(d.diskTags, dev) +} + +func (d *DiskTags) UpdateDiskTags(dev string, tags []string) { + d.lock.Lock() + defer d.lock.Unlock() + + d.diskTags[dev] = tags +} + +func (d *DiskTags) UpdateInitialized() { + d.lock.Lock() + defer d.lock.Unlock() + + d.initialized = true +} + // Register register the block device CRD controller func Register( ctx context.Context, @@ -102,6 +134,11 @@ func Register( opt *option.Option, scanner *Scanner, ) error { + CacheDiskTags = &DiskTags{ + diskTags: make(map[string][]string), + lock: &sync.RWMutex{}, + initialized: false, + } controller := &Controller{ Namespace: opt.Namespace, NodeName: opt.NodeName, @@ -118,6 +155,12 @@ func Register( return err } + utils.CallerWithCondLock(scanner.Cond, func() any { + logrus.Infof("Wake up scanner first time to update CacheDiskTags ...") + scanner.Cond.Signal() + return nil + }) + bds.OnChange(ctx, blockDeviceHandlerName, controller.OnBlockDeviceChange) bds.OnRemove(ctx, blockDeviceHandlerName, controller.OnBlockDeviceDelete) return nil @@ -135,6 +178,10 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) ( return nil, nil } + if !CacheDiskTags.initialized { + return nil, errors.New(errorCacheDiskTagsNotInitialized) + } + deviceCpy := device.DeepCopy() devPath, err := resolvePersistentDevPath(device) if err != nil { @@ -193,8 +240,8 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) ( switch { case needProvision && device.Status.ProvisionPhase == diskv1.ProvisionPhaseProvisioned: logrus.Infof("Prepare to check the new device tags %v with device: %s", deviceCpy.Spec.Tags, device.Name) - if !gocommon.SliceContentCmp(deviceCpy.Spec.Tags, deviceCpy.Status.Tags) { - logrus.Debugf("Prepare to update device %s because the Tags changed, Spec: %v, Status: %v", deviceCpy.Name, deviceCpy.Spec.Tags, deviceCpy.Status.Tags) + if !gocommon.SliceContentCmp(deviceCpy.Spec.Tags, CacheDiskTags.diskTags[device.Name]) { + logrus.Debugf("Prepare to update device %s because the Tags changed, Spec: %v, CacheDiskTags: %v", deviceCpy.Name, deviceCpy.Spec.Tags, CacheDiskTags.diskTags[device.Name]) if err := c.provisionDeviceToNode(deviceCpy); err != nil { err := fmt.Errorf("failed to update tags %v with device %s to node %s: %w", deviceCpy.Spec.Tags, device.Name, c.NodeName, err) logrus.Error(err) @@ -385,23 +432,33 @@ func (c *Controller) provisionDeviceToNode(device *diskv1.BlockDevice) error { Tags: device.Spec.Tags, } - toRemovedTags := make([]string, 0) - for _, tag := range device.Status.Tags { - if !slices.Contains(device.Spec.Tags, tag) { - toRemovedTags = append(toRemovedTags, tag) - } - } + //toRemovedTags := make([]string, 0) + //for _, tag := range device.Status.Tags { + // if !slices.Contains(device.Spec.Tags, tag) { + // toRemovedTags = append(toRemovedTags, tag) + // } + //} updated := false if disk, found := node.Spec.Disks[device.Name]; found { + respectedTags := []string{} if disk.Tags != nil { /* we should respect the disk Tags from LH */ - logrus.Debugf("Previous disk tags on LH: %+v, we should respect it.", disk.Tags) - diskSpec.Tags = gocommon.SliceDedupe(append(disk.Tags, device.Spec.Tags...)) - if len(toRemovedTags) > 0 { - logrus.Debugf("Prepare to do final handling with toRemovedTags: %+v", toRemovedTags) - diskSpec.Tags = removeUnNeeded(diskSpec.Tags, toRemovedTags) + if _, exists := CacheDiskTags.diskTags[device.Name]; exists { + for _, tag := range disk.Tags { + if !slices.Contains(CacheDiskTags.diskTags[device.Name], tag) { + respectedTags = append(respectedTags, tag) + } + } + } else { + respectedTags = disk.Tags } + logrus.Debugf("Previous disk tags only on LH: %+v, we should respect it.", respectedTags) + diskSpec.Tags = gocommon.SliceDedupe(append(respectedTags, device.Spec.Tags...)) + //if len(toRemovedTags) > 0 { + // logrus.Debugf("Prepare to do final handling with toRemovedTags: %+v", toRemovedTags) + // diskSpec.Tags = removeUnNeeded(diskSpec.Tags, toRemovedTags) + //} updated = reflect.DeepEqual(disk, diskSpec) } } @@ -427,6 +484,9 @@ func (c *Controller) provisionDeviceToNode(device *diskv1.BlockDevice) error { } } + // update oldDiskTags + CacheDiskTags.UpdateDiskTags(device.Name, device.Spec.Tags) + return nil } @@ -548,6 +608,11 @@ func (c *Controller) updateDeviceStatus(device *diskv1.BlockDevice, devPath stri // OnBlockDeviceDelete will delete the block devices that belongs to the same parent device func (c *Controller) OnBlockDeviceDelete(_ string, device *diskv1.BlockDevice) (*diskv1.BlockDevice, error) { + + if !CacheDiskTags.initialized { + return nil, errors.New(errorCacheDiskTagsNotInitialized) + } + if device == nil { return nil, nil } @@ -598,6 +663,8 @@ func (c *Controller) OnBlockDeviceDelete(_ string, device *diskv1.BlockDevice) ( return device, err } + CacheDiskTags.DeleteDiskTags(device.Name) + return nil, nil } diff --git a/pkg/controller/blockdevice/scanner.go b/pkg/controller/blockdevice/scanner.go index 801ccd4f..6764c7ff 100644 --- a/pkg/controller/blockdevice/scanner.go +++ b/pkg/controller/blockdevice/scanner.go @@ -148,6 +148,10 @@ func (s *Scanner) scanBlockDevicesOnNode() error { } else { logrus.Debugf("Skip updating device %s", bd.Name) } + // only first time to update the cache + if !CacheDiskTags.initialized && oldBd.Spec.Tags != nil && len(oldBd.Spec.Tags) > 0 { + CacheDiskTags.UpdateDiskTags(oldBd.Name, oldBd.Spec.Tags) + } // remove blockdevice from old device so we can delete missing devices afterward delete(oldBds, bd.Name) } else { @@ -167,6 +171,10 @@ func (s *Scanner) scanBlockDevicesOnNode() error { } } } + if !CacheDiskTags.initialized { + CacheDiskTags.UpdateInitialized() + logrus.Debugf("CacheDiskTags initialized: %+v", CacheDiskTags) + } // We do not remove the block device that maybe just temporily not available. // Set it to inactive and give the chance to recover. diff --git a/pkg/controller/node/node.go b/pkg/controller/node/node.go index 791a65bd..a884802e 100644 --- a/pkg/controller/node/node.go +++ b/pkg/controller/node/node.go @@ -5,9 +5,9 @@ import ( "reflect" "strings" + gocommon "github.com/harvester/go-common" longhornv1 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" "github.com/sirupsen/logrus" - "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -70,19 +70,19 @@ func (c *Controller) OnNodeChange(_ string, node *longhornv1.Node) (*longhornv1. return node, err } - if bd.Spec.Tags == nil || len(bd.Spec.Tags) == 0 { - continue - } + //if bd.Spec.Tags == nil || len(bd.Spec.Tags) == 0 { + // continue + //} bdCpy := bd.DeepCopy() - newTags := make([]string, 0) - for _, tag := range bd.Spec.Tags { - if slices.Contains(disk.Tags, tag) { - newTags = append(newTags, tag) - } - } - bdCpy.Spec.Tags = newTags + newTags := gocommon.SliceDedupe(append(bd.Spec.Tags, disk.Tags...)) + //for _, tag := range bd.Spec.Tags { + // if slices.Contains(disk.Tags, tag) { + // newTags = append(newTags, tag) + // } + //} + //bdCpy.Spec.Tags = newTags bdCpy.Status.Tags = newTags if !reflect.DeepEqual(bd, bdCpy) {