Skip to content

Commit

Permalink
Refactor Subnet lock for SubnetPort creation
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Zhou <[email protected]>
  • Loading branch information
yanjunz97 committed Dec 23, 2024
1 parent 86c0d65 commit a37a016
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 135 deletions.
10 changes: 7 additions & 3 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID()))
for _, nsxSubnet := range subnetList {
portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
// NSX reserves 4 ip addresses in each subnet for network address, gateway address,
// dhcp server address and broadcast address.
if portNums < totalIP-4 {
if subnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) {
return *nsxSubnet.Path, nil
}
}
Expand All @@ -56,7 +55,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
log.Error(err, "Failed to allocate Subnet")
return "", err
}
return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
path, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
if err != nil {
return path, err
}
subnetPortService.AddPortToSubnet(path, 0)
return path, nil
}

func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) {
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,21 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
// The error at the very beginning of the operator startup is expected because at that time the node may be not cached yet. We can expect the retry to become normal.
log.Error(err, "failed to get node ID for pod", "pod.Name", req.NamespacedName, "pod.UID", pod.UID, "node", pod.Spec.NodeName)
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
return common.ResultRequeue, err
}
contextID := *node.UniqueId
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
return common.ResultRequeue, err
}
_, err = r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnet, contextID, &pod.ObjectMeta.Labels)
if err != nil {
// Remove SubnetPort from Subnet if the SubnetPort is not saved to store
if subnetport.IsNotCreatedError(err) {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
}
r.StatusUpdater.UpdateFail(ctx, pod, err, "", nil)
return common.ResultRequeue, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/subnet/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id)
return err
}
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id)
}
log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets))
Expand Down
22 changes: 17 additions & 5 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,23 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, fmt.Sprintf("Failed to get Subnet by path: %s", nsxSubnetPath), setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(subnetPort, nsxSubnet, "", labels)
if err != nil {
// Remove SubnetPort from Subnet if the SubnetPort is not saved to store
if subnetport.IsNotCreatedError(err) {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
}
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
Expand Down Expand Up @@ -469,7 +471,17 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList)
return
}
subnetPath = *subnetList[0].Path
nsxSubnet := subnetList[0]
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
subnetPath = *nsxSubnet.Path
if !r.SubnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) {
err = fmt.Errorf("no valid IP in Subnet %s", subnetPath)
return
}
} else if len(subnetPort.Spec.SubnetSet) > 0 {
subnetSet := &v1alpha1.SubnetSet{}
namespacedName := types.NamespacedName{
Expand Down
40 changes: 19 additions & 21 deletions pkg/controllers/subnetset/subnetset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,32 +375,30 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet, delet
}
var deleteErrs []error
for _, nsxSubnet := range nsxSubnets {
lock := r.SubnetService.LockSubnet(nsxSubnet.Path)
func() {
defer r.SubnetService.UnlockSubnet(nsxSubnet.Path, lock)

portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
if portNums > 0 {
hasStalePort = true
log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id)
return
}

if deleteBindingMaps {
if err := r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
return
}
}
if !r.SubnetPortService.IsEmptySubnet(*nsxSubnet.Path) {
hasStalePort = true
log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id)
continue
}

if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
if deleteBindingMaps {
if err = r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
continue
}
}()
}

if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
} else {
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
}

}
if len(deleteErrs) > 0 {
err = fmt.Errorf("multiple errors occurred while deleting Subnets: %v", deleteErrs)
Expand Down
20 changes: 9 additions & 11 deletions pkg/mock/services_mock.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package mock

import (
"sync"

"github.com/stretchr/testify/mock"
"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,26 +79,26 @@ func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []mo
return []model.Tag{}
}

func (m *MockSubnetServiceProvider) LockSubnet(path *string) *sync.RWMutex {
return nil
type MockSubnetPortServiceProvider struct {
mock.Mock
}

func (m *MockSubnetServiceProvider) UnlockSubnet(path *string, lock *sync.RWMutex) {
func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) {
return
}

func (m *MockSubnetServiceProvider) RLockSubnet(path *string) *sync.RWMutex {
return nil
func (m *MockSubnetPortServiceProvider) AddPortToSubnet(path string, size int) bool {
return true
}

func (m *MockSubnetServiceProvider) RUnlockSubnet(path *string, lock *sync.RWMutex) {
func (m *MockSubnetPortServiceProvider) RemovePortFromSubnet(path string) {
return
}

type MockSubnetPortServiceProvider struct {
mock.Mock
func (m *MockSubnetPortServiceProvider) IsEmptySubnet(path string) bool {
return true
}

func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) {
func (m *MockSubnetPortServiceProvider) DeletePortCount(path string) {
return
}
9 changes: 4 additions & 5 deletions pkg/nsx/services/common/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package common

import (
"context"
"sync"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -31,14 +30,14 @@ type SubnetServiceProvider interface {
GetSubnetsByIndex(key, value string) []*model.VpcSubnet
CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (string, error)
GenerateSubnetNSTags(obj client.Object) []model.Tag
LockSubnet(path *string) *sync.RWMutex
UnlockSubnet(path *string, lock *sync.RWMutex)
RLockSubnet(path *string) *sync.RWMutex
RUnlockSubnet(path *string, lock *sync.RWMutex)
}

type SubnetPortServiceProvider interface {
GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort)
AddPortToSubnet(path string, size int) bool
RemovePortFromSubnet(path string)
IsEmptySubnet(path string) bool
DeletePortCount(path string)
}

type NodeServiceReader interface {
Expand Down
38 changes: 0 additions & 38 deletions pkg/nsx/services/subnet/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package subnet

import (
"errors"
"sync"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"

Expand Down Expand Up @@ -70,43 +69,6 @@ func subnetSetIndexFunc(obj interface{}) ([]string, error) {
// SubnetStore is a store for subnet.
type SubnetStore struct {
common.ResourceStore
// save locks for subnet by path
pathLocks sync.Map
}

func (subnetStore *SubnetStore) Add(i interface{}) error {
subnet := i.(*model.VpcSubnet)
if subnet.Path == nil {
log.Info("Store a subnet without path", "subnet", subnet)
return subnetStore.ResourceStore.Add(i)
}
lock := sync.RWMutex{}
subnetStore.pathLocks.LoadOrStore(*subnet.Path, &lock)
return subnetStore.ResourceStore.Add(i)
}

func (subnetStore *SubnetStore) Delete(i interface{}) error {
subnet := i.(*model.VpcSubnet)
if subnet.Path == nil {
log.Info("Delete a subnet without path", "subnet", subnet)
return subnetStore.ResourceStore.Delete(i)
}
subnetStore.pathLocks.Delete(*subnet.Path)
return subnetStore.ResourceStore.Delete(i)
}

func (subnetStore *SubnetStore) Lock(path string) *sync.RWMutex {
lock := sync.RWMutex{}
subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock)
subnetLock.(*sync.RWMutex).Lock()
return subnetLock.(*sync.RWMutex)
}

func (subnetStore *SubnetStore) RLock(path string) *sync.RWMutex {
lock := sync.RWMutex{}
subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock)
subnetLock.(*sync.RWMutex).RLock()
return subnetLock.(*sync.RWMutex)
}

func (subnetStore *SubnetStore) Apply(i interface{}) error {
Expand Down
34 changes: 0 additions & 34 deletions pkg/nsx/services/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,37 +478,3 @@ func (service *SubnetService) UpdateSubnetSet(ns string, vpcSubnets []*model.Vpc
}
return nil
}

func (service *SubnetService) LockSubnet(path *string) *sync.RWMutex {
if path != nil && *path != "" {
log.V(1).Info("Locked Subnet for writing", "path", *path)
return service.SubnetStore.Lock(*path)
}
return nil
}

func (service *SubnetService) UnlockSubnet(path *string, lock *sync.RWMutex) {
if lock != nil {
if path != nil && *path != "" {
log.V(1).Info("Unlocked Subnet for writing", "path", *path)
}
lock.Unlock()
}
}

func (service *SubnetService) RLockSubnet(path *string) *sync.RWMutex {
if path != nil && *path != "" {
log.V(1).Info("Locked Subnet for reading", "path", *path)
return service.SubnetStore.RLock(*path)
}
return nil
}

func (service *SubnetService) RUnlockSubnet(path *string, lock *sync.RWMutex) {
if lock != nil {
if path != nil && *path != "" {
log.V(1).Info("Unlocked Subnet for reading", "path", *path)
}
lock.RUnlock()
}
}
Loading

0 comments on commit a37a016

Please sign in to comment.