Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Vicente-Cheng committed Jan 8, 2024
1 parent 3b08329 commit 71c2e5c
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 24 deletions.
93 changes: 80 additions & 13 deletions pkg/controller/blockdevice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"reflect"
"sync"
"time"

gocommon "github.com/harvester/go-common"
Expand Down Expand Up @@ -66,6 +67,12 @@ func (s *semaphore) release() bool {
}
}

type DiskTags struct {
diskTags map[string][]string
lock *sync.RWMutex
initialized bool
}

type Controller struct {
Namespace string
NodeName string
Expand All @@ -87,12 +94,37 @@ const (
NeedMountUpdateNoOp NeedMountUpdateOP = 1 << iota
NeedMountUpdateMount
NeedMountUpdateUnmount

errorCacheDiskTagsNotInitialized = "CacheDiskTags is not initialized"
)

func (f NeedMountUpdateOP) Has(flag NeedMountUpdateOP) bool {
return f&flag != 0
}

var CacheDiskTags *DiskTags

func (d *DiskTags) DeleteDiskTags(dev string) {
d.lock.Lock()
defer d.lock.Unlock()

delete(d.diskTags, dev)
}

func (d *DiskTags) UpdateDiskTags(dev string, tags []string) {
d.lock.Lock()
defer d.lock.Unlock()

d.diskTags[dev] = tags
}

func (d *DiskTags) UpdateInitialized() {
d.lock.Lock()
defer d.lock.Unlock()

d.initialized = true
}

// Register register the block device CRD controller
func Register(
ctx context.Context,
Expand All @@ -102,6 +134,11 @@ func Register(
opt *option.Option,
scanner *Scanner,
) error {
CacheDiskTags = &DiskTags{
diskTags: make(map[string][]string),
lock: &sync.RWMutex{},
initialized: false,
}
controller := &Controller{
Namespace: opt.Namespace,
NodeName: opt.NodeName,
Expand All @@ -118,6 +155,12 @@ func Register(
return err
}

utils.CallerWithCondLock(scanner.Cond, func() any {
logrus.Infof("Wake up scanner first time to update CacheDiskTags ...")
scanner.Cond.Signal()
return nil
})

bds.OnChange(ctx, blockDeviceHandlerName, controller.OnBlockDeviceChange)
bds.OnRemove(ctx, blockDeviceHandlerName, controller.OnBlockDeviceDelete)
return nil
Expand All @@ -135,6 +178,10 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (
return nil, nil
}

if !CacheDiskTags.initialized {
return nil, errors.New(errorCacheDiskTagsNotInitialized)
}

deviceCpy := device.DeepCopy()
devPath, err := resolvePersistentDevPath(device)
if err != nil {
Expand Down Expand Up @@ -193,8 +240,8 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (
switch {
case needProvision && device.Status.ProvisionPhase == diskv1.ProvisionPhaseProvisioned:
logrus.Infof("Prepare to check the new device tags %v with device: %s", deviceCpy.Spec.Tags, device.Name)
if !gocommon.SliceContentCmp(deviceCpy.Spec.Tags, deviceCpy.Status.Tags) {
logrus.Debugf("Prepare to update device %s because the Tags changed, Spec: %v, Status: %v", deviceCpy.Name, deviceCpy.Spec.Tags, deviceCpy.Status.Tags)
if !gocommon.SliceContentCmp(deviceCpy.Spec.Tags, CacheDiskTags.diskTags[device.Name]) {
logrus.Debugf("Prepare to update device %s because the Tags changed, Spec: %v, CacheDiskTags: %v", deviceCpy.Name, deviceCpy.Spec.Tags, CacheDiskTags.diskTags[device.Name])
if err := c.provisionDeviceToNode(deviceCpy); err != nil {
err := fmt.Errorf("failed to update tags %v with device %s to node %s: %w", deviceCpy.Spec.Tags, device.Name, c.NodeName, err)
logrus.Error(err)
Expand Down Expand Up @@ -385,23 +432,33 @@ func (c *Controller) provisionDeviceToNode(device *diskv1.BlockDevice) error {
Tags: device.Spec.Tags,
}

toRemovedTags := make([]string, 0)
for _, tag := range device.Status.Tags {
if !slices.Contains(device.Spec.Tags, tag) {
toRemovedTags = append(toRemovedTags, tag)
}
}
//toRemovedTags := make([]string, 0)
//for _, tag := range device.Status.Tags {
// if !slices.Contains(device.Spec.Tags, tag) {
// toRemovedTags = append(toRemovedTags, tag)
// }
//}

updated := false
if disk, found := node.Spec.Disks[device.Name]; found {
respectedTags := []string{}
if disk.Tags != nil {
/* we should respect the disk Tags from LH */
logrus.Debugf("Previous disk tags on LH: %+v, we should respect it.", disk.Tags)
diskSpec.Tags = gocommon.SliceDedupe(append(disk.Tags, device.Spec.Tags...))
if len(toRemovedTags) > 0 {
logrus.Debugf("Prepare to do final handling with toRemovedTags: %+v", toRemovedTags)
diskSpec.Tags = removeUnNeeded(diskSpec.Tags, toRemovedTags)
if _, exists := CacheDiskTags.diskTags[device.Name]; exists {
for _, tag := range disk.Tags {
if !slices.Contains(CacheDiskTags.diskTags[device.Name], tag) {
respectedTags = append(respectedTags, tag)
}
}
} else {
respectedTags = disk.Tags
}
logrus.Debugf("Previous disk tags only on LH: %+v, we should respect it.", respectedTags)
diskSpec.Tags = gocommon.SliceDedupe(append(respectedTags, device.Spec.Tags...))
//if len(toRemovedTags) > 0 {
// logrus.Debugf("Prepare to do final handling with toRemovedTags: %+v", toRemovedTags)
// diskSpec.Tags = removeUnNeeded(diskSpec.Tags, toRemovedTags)
//}
updated = reflect.DeepEqual(disk, diskSpec)
}
}
Expand All @@ -427,6 +484,9 @@ func (c *Controller) provisionDeviceToNode(device *diskv1.BlockDevice) error {
}
}

// update oldDiskTags
CacheDiskTags.UpdateDiskTags(device.Name, device.Spec.Tags)

return nil
}

Expand Down Expand Up @@ -548,6 +608,11 @@ func (c *Controller) updateDeviceStatus(device *diskv1.BlockDevice, devPath stri

// OnBlockDeviceDelete will delete the block devices that belongs to the same parent device
func (c *Controller) OnBlockDeviceDelete(_ string, device *diskv1.BlockDevice) (*diskv1.BlockDevice, error) {

if !CacheDiskTags.initialized {
return nil, errors.New(errorCacheDiskTagsNotInitialized)
}

if device == nil {
return nil, nil
}
Expand Down Expand Up @@ -598,6 +663,8 @@ func (c *Controller) OnBlockDeviceDelete(_ string, device *diskv1.BlockDevice) (
return device, err
}

CacheDiskTags.DeleteDiskTags(device.Name)

return nil, nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/blockdevice/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (s *Scanner) scanBlockDevicesOnNode() error {
} else {
logrus.Debugf("Skip updating device %s", bd.Name)
}
// only first time to update the cache
if !CacheDiskTags.initialized && oldBd.Spec.Tags != nil && len(oldBd.Spec.Tags) > 0 {
CacheDiskTags.UpdateDiskTags(oldBd.Name, oldBd.Spec.Tags)
}
// remove blockdevice from old device so we can delete missing devices afterward
delete(oldBds, bd.Name)
} else {
Expand All @@ -167,6 +171,10 @@ func (s *Scanner) scanBlockDevicesOnNode() error {
}
}
}
if !CacheDiskTags.initialized {
CacheDiskTags.UpdateInitialized()
logrus.Debugf("CacheDiskTags initialized: %+v", CacheDiskTags)
}

// We do not remove the block device that maybe just temporily not available.
// Set it to inactive and give the chance to recover.
Expand Down
22 changes: 11 additions & 11 deletions pkg/controller/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"reflect"
"strings"

gocommon "github.com/harvester/go-common"
longhornv1 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -70,19 +70,19 @@ func (c *Controller) OnNodeChange(_ string, node *longhornv1.Node) (*longhornv1.
return node, err
}

if bd.Spec.Tags == nil || len(bd.Spec.Tags) == 0 {
continue
}
//if bd.Spec.Tags == nil || len(bd.Spec.Tags) == 0 {
// continue
//}

bdCpy := bd.DeepCopy()
newTags := make([]string, 0)

for _, tag := range bd.Spec.Tags {
if slices.Contains(disk.Tags, tag) {
newTags = append(newTags, tag)
}
}
bdCpy.Spec.Tags = newTags
newTags := gocommon.SliceDedupe(append(bd.Spec.Tags, disk.Tags...))
//for _, tag := range bd.Spec.Tags {
// if slices.Contains(disk.Tags, tag) {
// newTags = append(newTags, tag)
// }
//}
//bdCpy.Spec.Tags = newTags
bdCpy.Status.Tags = newTags

if !reflect.DeepEqual(bd, bdCpy) {
Expand Down

0 comments on commit 71c2e5c

Please sign in to comment.