Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor resource host to virtual mappings #1925

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cmd/vcluster/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/loft-sh/vcluster/pkg/scheme"
"github.com/loft-sh/vcluster/pkg/setup"
"github.com/loft-sh/vcluster/pkg/telemetry"
util "github.com/loft-sh/vcluster/pkg/util/context"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -105,6 +106,12 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
return fmt.Errorf("start integrations: %w", err)
}

// start managers
syncers, err := setup.StartManagers(util.ToRegisterContext(controllerCtx))
if err != nil {
return fmt.Errorf("start managers: %w", err)
}

// start proxy
err = setup.StartProxy(controllerCtx)
if err != nil {
Expand All @@ -126,17 +133,14 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
}
}

if err := pro.ConnectToPlatform(
ctx,
vConfig,
controllerCtx.VirtualManager,
); err != nil {
// connect to vCluster platform if configured
if err := pro.ConnectToPlatform(ctx, vConfig, controllerCtx.VirtualManager); err != nil {
return fmt.Errorf("connect to platform: %w", err)
}

// start leader election + controllers
err = StartLeaderElection(controllerCtx, func() error {
return setup.StartControllers(controllerCtx)
return setup.StartControllers(controllerCtx, syncers)
})
if err != nil {
return fmt.Errorf("start controllers: %w", err)
Expand Down
14 changes: 1 addition & 13 deletions pkg/config/controller_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net/http"

"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -34,17 +33,6 @@ type ControllerContext struct {
AcquiredLeaderHooks []Hook
}

type Filter func(http.Handler, Clients) http.Handler
type Filter func(http.Handler, *ControllerContext) http.Handler

type Hook func(ctx *ControllerContext) error

type Clients struct {
UncachedVirtualClient client.Client
CachedVirtualClient client.Client

UncachedHostClient client.Client
CachedHostClient client.Client

HostConfig *rest.Config
VirtualConfig *rest.Config
}
4 changes: 4 additions & 0 deletions pkg/constants/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const (
PausedReplicasAnnotation = "loft.sh/paused-replicas"
PausedDateAnnotation = "loft.sh/paused-date"

HostClusterPersistentVolumeAnnotation = "vcluster.loft.sh/host-pv"

HostClusterVSCAnnotation = "vcluster.loft.sh/host-volumesnapshotcontent"

// NodeSuffix is the dns suffix for our nodes
NodeSuffix = "nodes.vcluster.com"

Expand Down
7 changes: 7 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constants

const (
K8sKineEndpoint = "unix:///data/kine.sock"
K3sKineEndpoint = "unix:///data/server/kine.sock"
K0sKineEndpoint = "unix:///run/k0s/kine/kine.sock:2379"
)
1 change: 0 additions & 1 deletion pkg/constants/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const (
IndexByPhysicalName = "IndexByPhysicalName"
IndexByVirtualName = "IndexByVirtualName"
IndexByAssigned = "IndexByAssigned"
IndexByStorageClass = "IndexByStorageClass"
IndexByIngressSecret = "IndexByIngressSecret"
IndexByPodSecret = "IndexByPodSecret"
IndexByConfigMap = "IndexByConfigMap"
Expand Down
8 changes: 7 additions & 1 deletion pkg/controllers/generic/export_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/mappings/generic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -98,12 +99,17 @@ func createExporterFromConfig(ctx *synccontext.RegisterContext, config *vcluster

gvk := schema.FromAPIVersionAndKind(config.APIVersion, config.Kind)
controllerID := fmt.Sprintf("%s/%s/GenericExport", strings.ToLower(gvk.Kind), strings.ToLower(gvk.Group))
mapper, err := generic.NewMapper(ctx, obj, translate.Default.PhysicalName)
if err != nil {
return nil, err
}

return &exporter{
ObjectPatcher: &exportPatcher{
config: config,
gvk: gvk,
},
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj),
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj, mapper),

patcher: NewPatcher(ctx.VirtualManager.GetClient(), ctx.PhysicalManager.GetClient(), hasStatusSubresource, log.New(controllerID)),
gvk: gvk,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/generic/import_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (s *importer) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Objec

var _ syncertypes.Syncer = &importer{}

func (s *importer) GroupVersionKind() schema.GroupVersionKind {
return s.gvk
}

func (s *importer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) {
// ignore all virtual resources that were not created by this controller
if !s.isVirtualManaged(vObj) {
Expand Down
172 changes: 27 additions & 145 deletions pkg/controllers/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,171 +9,43 @@ import (
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/controllers/deploy"
"github.com/loft-sh/vcluster/pkg/controllers/generic"
"github.com/loft-sh/vcluster/pkg/controllers/resources/configmaps"
"github.com/loft-sh/vcluster/pkg/controllers/resources/csidrivers"
"github.com/loft-sh/vcluster/pkg/controllers/resources/csinodes"
"github.com/loft-sh/vcluster/pkg/controllers/resources/csistoragecapacities"
"github.com/loft-sh/vcluster/pkg/controllers/resources/endpoints"
"github.com/loft-sh/vcluster/pkg/controllers/resources/events"
"github.com/loft-sh/vcluster/pkg/controllers/resources/ingressclasses"
"github.com/loft-sh/vcluster/pkg/controllers/resources/ingresses"
"github.com/loft-sh/vcluster/pkg/controllers/resources/namespaces"
"github.com/loft-sh/vcluster/pkg/controllers/resources/networkpolicies"
"github.com/loft-sh/vcluster/pkg/controllers/resources/nodes"
"github.com/loft-sh/vcluster/pkg/controllers/resources/persistentvolumeclaims"
"github.com/loft-sh/vcluster/pkg/controllers/resources/persistentvolumes"
"github.com/loft-sh/vcluster/pkg/controllers/resources/poddisruptionbudgets"
"github.com/loft-sh/vcluster/pkg/controllers/resources/pods"
"github.com/loft-sh/vcluster/pkg/controllers/resources/priorityclasses"
"github.com/loft-sh/vcluster/pkg/controllers/resources/secrets"
"github.com/loft-sh/vcluster/pkg/controllers/resources/serviceaccounts"
"github.com/loft-sh/vcluster/pkg/controllers/resources/storageclasses"
"github.com/loft-sh/vcluster/pkg/controllers/resources/volumesnapshots/volumesnapshotclasses"
"github.com/loft-sh/vcluster/pkg/controllers/resources/volumesnapshots/volumesnapshotcontents"
"github.com/loft-sh/vcluster/pkg/controllers/resources/volumesnapshots/volumesnapshots"
"github.com/loft-sh/vcluster/pkg/controllers/servicesync"
"github.com/loft-sh/vcluster/pkg/controllers/syncer"
synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/util/blockingcacheclient"
util "github.com/loft-sh/vcluster/pkg/util/context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/loft-sh/vcluster/pkg/controllers/coredns"
"github.com/loft-sh/vcluster/pkg/controllers/k8sdefaultendpoint"
"github.com/loft-sh/vcluster/pkg/controllers/podsecurity"
"github.com/loft-sh/vcluster/pkg/controllers/resources/services"
syncertypes "github.com/loft-sh/vcluster/pkg/types"
"github.com/loft-sh/vcluster/pkg/util/loghelper"
"github.com/pkg/errors"
)

// ExtraControllers that will be started as well
var ExtraControllers []BuildController

// BuildController is a function to build a new syncer
type BuildController func(ctx *synccontext.RegisterContext) (syncertypes.Object, error)

func getSyncers(ctx *config.ControllerContext) []BuildController {
return append([]BuildController{
isEnabled(ctx.Config.Sync.ToHost.Services.Enabled, services.New),
isEnabled(ctx.Config.Sync.ToHost.ConfigMaps.Enabled, configmaps.New),
isEnabled(ctx.Config.Sync.ToHost.Secrets.Enabled, secrets.New),
isEnabled(ctx.Config.Sync.ToHost.Endpoints.Enabled, endpoints.New),
isEnabled(ctx.Config.Sync.ToHost.Pods.Enabled, pods.New),
isEnabled(ctx.Config.Sync.FromHost.Events.Enabled, events.New),
isEnabled(ctx.Config.Sync.ToHost.PersistentVolumeClaims.Enabled, persistentvolumeclaims.New),
isEnabled(ctx.Config.Sync.ToHost.Ingresses.Enabled, ingresses.New),
isEnabled(ctx.Config.Sync.FromHost.IngressClasses.Enabled, ingressclasses.New),
isEnabled(ctx.Config.Sync.ToHost.StorageClasses.Enabled, storageclasses.New),
isEnabled(ctx.Config.Sync.FromHost.StorageClasses.Enabled == "true", storageclasses.NewHostStorageClassSyncer),
isEnabled(ctx.Config.Sync.ToHost.PriorityClasses.Enabled, priorityclasses.New),
isEnabled(ctx.Config.Sync.ToHost.PodDisruptionBudgets.Enabled, poddisruptionbudgets.New),
isEnabled(ctx.Config.Sync.ToHost.NetworkPolicies.Enabled, networkpolicies.New),
isEnabled(ctx.Config.Sync.ToHost.VolumeSnapshots.Enabled, volumesnapshotclasses.New),
isEnabled(ctx.Config.Sync.ToHost.VolumeSnapshots.Enabled, volumesnapshots.New),
isEnabled(ctx.Config.Sync.ToHost.VolumeSnapshots.Enabled, volumesnapshotcontents.New),
isEnabled(ctx.Config.Sync.ToHost.ServiceAccounts.Enabled, serviceaccounts.New),
isEnabled(ctx.Config.Sync.FromHost.CSINodes.Enabled == "true", csinodes.New),
isEnabled(ctx.Config.Sync.FromHost.CSIDrivers.Enabled == "true", csidrivers.New),
isEnabled(ctx.Config.Sync.FromHost.CSIStorageCapacities.Enabled == "true", csistoragecapacities.New),
isEnabled(ctx.Config.Experimental.MultiNamespaceMode.Enabled, namespaces.New),
persistentvolumes.New,
nodes.New,
}, ExtraControllers...)
}

func isEnabled(enabled bool, fn BuildController) BuildController {
if enabled {
return fn
}
return nil
}

func Create(ctx *config.ControllerContext) ([]syncertypes.Object, error) {
registerContext := util.ToRegisterContext(ctx)

// register controllers for resource synchronization
syncers := []syncertypes.Object{}
for _, newSyncer := range getSyncers(ctx) {
if newSyncer == nil {
continue
}

createdController, err := newSyncer(registerContext)

name := ""
if createdController != nil {
name = createdController.Name()
}

if err != nil {
return nil, fmt.Errorf("register %s controller: %w", name, err)
}

loghelper.Infof("Start %s sync controller", name)
syncers = append(syncers, createdController)
}

return syncers, nil
}

func ExecuteInitializers(controllerCtx *config.ControllerContext, syncers []syncertypes.Object) error {
registerContext := util.ToRegisterContext(controllerCtx)

// execute the syncer init functions
for _, s := range syncers {
name := s.Name()
initializer, ok := s.(syncertypes.Initializer)
if ok {
klog.FromContext(controllerCtx.Context).V(1).Info("Execute syncer init", "syncer", name)
err := initializer.Init(registerContext)
if err != nil {
return errors.Wrapf(err, "ensure prerequisites for %s syncer", name)
}
}
}

return nil
}

func RegisterIndices(ctx *config.ControllerContext, syncers []syncertypes.Object) error {
registerContext := util.ToRegisterContext(ctx)
for _, s := range syncers {
indexRegisterer, ok := s.(syncertypes.IndicesRegisterer)
if ok {
err := indexRegisterer.RegisterIndices(registerContext)
if err != nil {
return errors.Wrapf(err, "register indices for %s syncer", s.Name())
}
}
}

return nil
}

func RegisterControllers(ctx *config.ControllerContext, syncers []syncertypes.Object) error {
registerContext := util.ToRegisterContext(ctx)

// start default endpoint controller
err := k8sdefaultendpoint.Register(ctx)
if err != nil {
return err
}

// register controller that maintains pod security standard check
if ctx.Config.Policies.PodSecurityStandard != "" {
err := RegisterPodSecurityController(ctx)
err := registerPodSecurityController(ctx)
if err != nil {
return err
}
}

// register controller that keeps CoreDNS NodeHosts config up to date
err = RegisterCoreDNSController(ctx)
err = registerCoreDNSController(ctx)
if err != nil {
return err
}
Expand All @@ -185,12 +57,13 @@ func RegisterControllers(ctx *config.ControllerContext, syncers []syncertypes.Ob
}

// register service syncer to map services between host and virtual cluster
err = RegisterServiceSyncControllers(ctx)
err = registerServiceSyncControllers(ctx)
if err != nil {
return err
}

err = RegisterGenericSyncController(ctx)
// register generic sync controllers
err = registerGenericSyncController(ctx)
if err != nil {
return err
}
Expand All @@ -204,22 +77,31 @@ func RegisterControllers(ctx *config.ControllerContext, syncers []syncertypes.Ob
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
} else {
// real syncer?
realSyncer, ok := v.(syncertypes.Syncer)
if ok {
err = syncer.RegisterSyncer(registerContext, realSyncer)
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
}

// real syncer?
realSyncer, ok := v.(syncertypes.Syncer)
if ok {
err = syncer.RegisterSyncer(registerContext, realSyncer)
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
}

// custom syncer?
customSyncer, ok := v.(syncertypes.ControllerStarter)
if ok {
err = customSyncer.Register(registerContext)
if err != nil {
return errors.Wrapf(err, "start %s syncer", v.Name())
}
}
}

return nil
}

func RegisterGenericSyncController(ctx *config.ControllerContext) error {
func registerGenericSyncController(ctx *config.ControllerContext) error {
err := generic.CreateExporters(ctx)
if err != nil {
return err
Expand All @@ -233,7 +115,7 @@ func RegisterGenericSyncController(ctx *config.ControllerContext) error {
return nil
}

func RegisterServiceSyncControllers(ctx *config.ControllerContext) error {
func registerServiceSyncControllers(ctx *config.ControllerContext) error {
hostNamespace := ctx.Config.WorkloadTargetNamespace
if ctx.Config.Experimental.MultiNamespaceMode.Enabled {
hostNamespace = ctx.Config.WorkloadNamespace
Expand Down Expand Up @@ -360,7 +242,7 @@ func parseMapping(mappings []vclusterconfig.ServiceMapping, fromDefaultNamespace
return ret, nil
}

func RegisterCoreDNSController(ctx *config.ControllerContext) error {
func registerCoreDNSController(ctx *config.ControllerContext) error {
controller := &coredns.NodeHostsReconciler{
Client: ctx.VirtualManager.GetClient(),
Log: loghelper.New("corednsnodehosts-controller"),
Expand All @@ -372,7 +254,7 @@ func RegisterCoreDNSController(ctx *config.ControllerContext) error {
return nil
}

func RegisterPodSecurityController(ctx *config.ControllerContext) error {
func registerPodSecurityController(ctx *config.ControllerContext) error {
controller := &podsecurity.Reconciler{
Client: ctx.VirtualManager.GetClient(),
PodSecurityStandard: ctx.Config.Policies.PodSecurityStandard,
Expand Down
Loading
Loading