Skip to content

Commit

Permalink
refactor: vcluster manager start
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianKramm committed Jul 17, 2024
1 parent a05c4d7 commit 0ca7e34
Show file tree
Hide file tree
Showing 90 changed files with 688 additions and 702 deletions.
8 changes: 7 additions & 1 deletion cmd/vcluster/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
return fmt.Errorf("start integrations: %w", err)
}

// start managers
syncers, err := setup.StartManagers(controllerCtx)
if err != nil {
return fmt.Errorf("start managers: %w", err)
}

// start proxy
err = setup.StartProxy(controllerCtx)
if err != nil {
Expand Down Expand Up @@ -133,7 +139,7 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {

// start leader election + controllers
err = StartLeaderElection(controllerCtx, func() error {
return setup.StartControllers(controllerCtx)
return setup.StartControllers(controllerCtx, syncers)
})
if err != nil {
return fmt.Errorf("start controllers: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/constants/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const (
PausedReplicasAnnotation = "loft.sh/paused-replicas"
PausedDateAnnotation = "loft.sh/paused-date"

HostClusterPersistentVolumeAnnotation = "vcluster.loft.sh/host-pv"

HostClusterVSCAnnotation = "vcluster.loft.sh/host-volumesnapshotcontent"

// NodeSuffix is the dns suffix for our nodes
NodeSuffix = "nodes.vcluster.com"

Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/generic/export_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/mappings/generic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -98,12 +99,18 @@ func createExporterFromConfig(ctx *synccontext.RegisterContext, config *vcluster

gvk := schema.FromAPIVersionAndKind(config.APIVersion, config.Kind)
controllerID := fmt.Sprintf("%s/%s/GenericExport", strings.ToLower(gvk.Kind), strings.ToLower(gvk.Group))

mapper, err := generic.NewNamespacedMapper(ctx, obj, translate.Default.PhysicalName)
if err != nil {
return nil, err
}

return &exporter{
ObjectPatcher: &exportPatcher{
config: config,
gvk: gvk,
},
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj),
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj, mapper),

patcher: NewPatcher(ctx.VirtualManager.GetClient(), ctx.PhysicalManager.GetClient(), hasStatusSubresource, log.New(controllerID)),
gvk: gvk,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/generic/import_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (s *importer) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Objec

var _ syncertypes.Syncer = &importer{}

func (s *importer) GroupVersionKind() schema.GroupVersionKind {
return s.gvk
}

func (s *importer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) {
// ignore all virtual resources that were not created by this controller
if !s.isVirtualManaged(vObj) {
Expand Down
29 changes: 20 additions & 9 deletions pkg/controllers/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func isEnabled(enabled bool, fn BuildController) BuildController {
return nil
}

func Create(ctx *config.ControllerContext) ([]syncertypes.Object, error) {
func CreateSyncers(ctx *config.ControllerContext) ([]syncertypes.Object, error) {
registerContext := util.ToRegisterContext(ctx)

// register controllers for resource synchronization
Expand Down Expand Up @@ -159,6 +159,7 @@ func RegisterIndices(ctx *config.ControllerContext, syncers []syncertypes.Object
func RegisterControllers(ctx *config.ControllerContext, syncers []syncertypes.Object) error {
registerContext := util.ToRegisterContext(ctx)

// start default endpoint controller
err := k8sdefaultendpoint.Register(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -190,6 +191,7 @@ func RegisterControllers(ctx *config.ControllerContext, syncers []syncertypes.Ob
return err
}

// register generic sync controllers
err = RegisterGenericSyncController(ctx)
if err != nil {
return err
Expand All @@ -204,14 +206,23 @@ func RegisterControllers(ctx *config.ControllerContext, syncers []syncertypes.Ob
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
} else {
// real syncer?
realSyncer, ok := v.(syncertypes.Syncer)
if ok {
err = syncer.RegisterSyncer(registerContext, realSyncer)
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
}

// real syncer?
realSyncer, ok := v.(syncertypes.Syncer)
if ok {
err = syncer.RegisterSyncer(registerContext, realSyncer)
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
}

// custom syncer?
customSyncer, ok := v.(syncertypes.ControllerStarter)
if ok {
err = customSyncer.Register(registerContext)
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/resources/configmaps/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/loft-sh/vcluster/pkg/constants"
synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
syncer "github.com/loft-sh/vcluster/pkg/types"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -22,7 +23,7 @@ import (

func New(ctx *synccontext.RegisterContext) (syncer.Object, error) {
return &configMapSyncer{
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, "configmap", &corev1.ConfigMap{}),
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, "configmap", &corev1.ConfigMap{}, mappings.ConfigMaps()),

syncAllConfigMaps: ctx.Config.Sync.ToHost.ConfigMaps.All,
multiNamespaceMode: ctx.Config.Experimental.MultiNamespaceMode.Enabled,
Expand Down
6 changes: 1 addition & 5 deletions pkg/controllers/resources/configmaps/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ import (
"context"

"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (s *configMapSyncer) translate(ctx context.Context, vObj client.Object) *corev1.ConfigMap {
pObj := s.TranslateMetadata(ctx, vObj).(*corev1.ConfigMap)
pObj.SetName(mappings.ConfigMaps().VirtualToHost(ctx, types.NamespacedName{Name: vObj.GetName(), Namespace: vObj.GetNamespace()}, vObj).Name)
return pObj
return s.TranslateMetadata(ctx, vObj).(*corev1.ConfigMap)
}

func (s *configMapSyncer) translateUpdate(ctx context.Context, pObj, vObj *corev1.ConfigMap) *corev1.ConfigMap {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/resources/csidrivers/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/patcher"
syncer "github.com/loft-sh/vcluster/pkg/types"
storagev1 "k8s.io/api/storage/v1"
Expand All @@ -15,7 +16,7 @@ import (

func New(_ *synccontext.RegisterContext) (syncer.Object, error) {
return &csidriverSyncer{
Translator: translator.NewMirrorPhysicalTranslator("csidriver", &storagev1.CSIDriver{}),
Translator: translator.NewMirrorPhysicalTranslator("csidriver", &storagev1.CSIDriver{}, mappings.CSIDrivers()),
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/resources/csinodes/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/patcher"
syncertypes "github.com/loft-sh/vcluster/pkg/types"
corev1 "k8s.io/api/core/v1"
Expand All @@ -18,7 +19,7 @@ import (

func New(ctx *synccontext.RegisterContext) (syncertypes.Object, error) {
return &csinodeSyncer{
Translator: translator.NewMirrorPhysicalTranslator("csinode", &storagev1.CSINode{}),
Translator: translator.NewMirrorPhysicalTranslator("csinode", &storagev1.CSINode{}, mappings.CSINodes()),
virtualClient: ctx.VirtualManager.GetClient(),
}, nil
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/resources/csistoragecapacities/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/patcher"
syncertypes "github.com/loft-sh/vcluster/pkg/types"
Expand All @@ -25,13 +24,17 @@ import (

func New(ctx *synccontext.RegisterContext) (syncertypes.Object, error) {
return &csistoragecapacitySyncer{
Mapper: mappings.CSIStorageCapacities(),

storageClassSyncEnabled: ctx.Config.Sync.ToHost.StorageClasses.Enabled,
hostStorageClassSyncEnabled: ctx.Config.Sync.FromHost.StorageClasses.Enabled == "true",
physicalClient: ctx.PhysicalManager.GetClient(),
}, nil
}

type csistoragecapacitySyncer struct {
mappings.Mapper

storageClassSyncEnabled bool
hostStorageClassSyncEnabled bool
physicalClient client.Client
Expand Down Expand Up @@ -125,7 +128,7 @@ func (s *csistoragecapacitySyncer) enqueuePhysical(ctx context.Context, obj clie
return
}

name := mappings.Default.ByGVK(storagev1.SchemeGroupVersion.WithKind("CSIStorageCapacity")).HostToVirtual(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, obj)
name := s.Mapper.HostToVirtual(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, obj)
if name.Name != "" && name.Namespace != "" {
q.Add(reconcile.Request{NamespacedName: name})
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/controllers/resources/csistoragecapacities/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"fmt"

synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/util/clienthelper"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/controllers/resources/csistoragecapacities/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func (s *csistoragecapacitySyncer) IsManaged(context.Context, client.Object) (bo

// TranslateMetadata translates the object's metadata
func (s *csistoragecapacitySyncer) TranslateMetadata(ctx context.Context, pObj client.Object) (client.Object, error) {
name := mappings.CSIStorageCapacities().HostToVirtual(ctx, types.NamespacedName{Name: pObj.GetName(), Namespace: pObj.GetNamespace()}, pObj)
pName := mappings.CSIStorageCapacities().HostToVirtual(ctx, types.NamespacedName{Name: pObj.GetName(), Namespace: pObj.GetNamespace()}, pObj)
pObjCopy := pObj.DeepCopyObject()
vObj, ok := pObjCopy.(client.Object)
if !ok {
return nil, fmt.Errorf("%q not a metadata object: %+v", pObj.GetName(), pObjCopy)
}
translate.ResetObjectMetadata(vObj)
vObj.SetName(name.Name)
vObj.SetNamespace(name.Namespace)
vObj.SetName(pName.Name)
vObj.SetNamespace(pName.Namespace)
vObj.SetAnnotations(translate.Default.ApplyAnnotations(pObj, nil, []string{}))
vObj.SetLabels(translate.Default.ApplyLabels(pObj, nil, []string{}))
return vObj, nil
Expand Down
19 changes: 13 additions & 6 deletions pkg/controllers/resources/endpoints/syncer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package endpoints

import (
"errors"
"fmt"

synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
Expand All @@ -20,7 +21,7 @@ import (

func New(ctx *synccontext.RegisterContext) (syncer.Object, error) {
return &endpointsSyncer{
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, "endpoints", &corev1.Endpoints{}),
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, "endpoints", &corev1.Endpoints{}, mappings.Endpoints()),
}, nil
}

Expand All @@ -39,12 +40,18 @@ func (s *endpointsSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object,
}
defer func() {
if err := patch.Patch(ctx, pObj, vObj); err != nil {
s.NamespacedTranslator.EventRecorder().Eventf(pObj, "Warning", "SyncError", "Error syncing: %v", err)
retErr = err
retErr = errors.Join(retErr, err)
}

if retErr != nil {
s.NamespacedTranslator.EventRecorder().Eventf(pObj, "Warning", "SyncError", "Error syncing: %v", retErr)
}
}()

s.translateUpdate(ctx.Context, pObj.(*corev1.Endpoints), vObj.(*corev1.Endpoints))
err = s.translateUpdate(ctx.Context, pObj.(*corev1.Endpoints), vObj.(*corev1.Endpoints))
if err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -75,7 +82,7 @@ func (s *endpointsSyncer) ReconcileStart(ctx *synccontext.SyncContext, req ctrl.
} else if svc.Spec.Selector != nil {
// check if it was a managed endpoints object before and delete it
endpoints := &corev1.Endpoints{}
err := ctx.PhysicalClient.Get(ctx.Context, mappings.Endpoints().VirtualToHost(ctx.Context, req.NamespacedName, nil), endpoints)
err = ctx.PhysicalClient.Get(ctx.Context, s.VirtualToHost(ctx.Context, req.NamespacedName, nil), endpoints)
if err != nil {
if !kerrors.IsNotFound(err) {
klog.Infof("Error retrieving endpoints: %v", err)
Expand Down Expand Up @@ -104,7 +111,7 @@ func (s *endpointsSyncer) ReconcileStart(ctx *synccontext.SyncContext, req ctrl.

// check if it was a Kubernetes managed endpoints object before and delete it
endpoints := &corev1.Endpoints{}
err = ctx.PhysicalClient.Get(ctx.Context, mappings.Endpoints().VirtualToHost(ctx.Context, req.NamespacedName, nil), endpoints)
err = ctx.PhysicalClient.Get(ctx.Context, s.VirtualToHost(ctx.Context, req.NamespacedName, nil), endpoints)
if err == nil && (endpoints.Annotations == nil || endpoints.Annotations[translate.NameAnnotation] == "") {
klog.Infof("Refresh endpoints in physical cluster because they should be managed by vCluster now")
err = ctx.PhysicalClient.Delete(ctx.Context, endpoints)
Expand Down
15 changes: 10 additions & 5 deletions pkg/controllers/resources/endpoints/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package endpoints
import (
"context"

"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/util/translate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -24,7 +22,7 @@ func (s *endpointsSyncer) translate(ctx context.Context, vObj client.Object) *co
return endpoints
}

func (s *endpointsSyncer) translateSpec(endpoints *corev1.Endpoints) {
func (s *endpointsSyncer) translateSpec(endpoints *corev1.Endpoints) error {
// translate the addresses
for i, subset := range endpoints.Subsets {
for j, addr := range subset.Addresses {
Expand All @@ -50,12 +48,17 @@ func (s *endpointsSyncer) translateSpec(endpoints *corev1.Endpoints) {
}
}
}

return nil
}

func (s *endpointsSyncer) translateUpdate(ctx context.Context, pObj, vObj *corev1.Endpoints) {
func (s *endpointsSyncer) translateUpdate(ctx context.Context, pObj, vObj *corev1.Endpoints) error {
// check subsets
translated := vObj.DeepCopy()
s.translateSpec(translated)
err := s.translateSpec(translated)
if err != nil {
return err
}
if !equality.Semantic.DeepEqual(translated.Subsets, pObj.Subsets) {
pObj.Subsets = translated.Subsets
}
Expand All @@ -67,4 +70,6 @@ func (s *endpointsSyncer) translateUpdate(ctx context.Context, pObj, vObj *corev
pObj.Annotations = annotations
pObj.Labels = labels
}

return nil
}
4 changes: 4 additions & 0 deletions pkg/controllers/resources/events/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ import (

func New(ctx *synccontext.RegisterContext) (syncer.Object, error) {
return &eventSyncer{
Mapper: mappings.Events(),

virtualClient: ctx.VirtualManager.GetClient(),
hostClient: ctx.PhysicalManager.GetClient(),
}, nil
}

type eventSyncer struct {
mappings.Mapper

virtualClient client.Client
hostClient client.Client
}
Expand Down
Loading

0 comments on commit 0ca7e34

Please sign in to comment.