Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor resource host to virtual mappings #1925

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cmd/vcluster/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/loft-sh/vcluster/pkg/scheme"
"github.com/loft-sh/vcluster/pkg/setup"
"github.com/loft-sh/vcluster/pkg/telemetry"
util "github.com/loft-sh/vcluster/pkg/util/context"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -105,6 +106,12 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
return fmt.Errorf("start integrations: %w", err)
}

// start managers
syncers, err := setup.StartManagers(util.ToRegisterContext(controllerCtx))
if err != nil {
return fmt.Errorf("start managers: %w", err)
}

// start proxy
err = setup.StartProxy(controllerCtx)
if err != nil {
Expand All @@ -126,17 +133,14 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
}
}

if err := pro.ConnectToPlatform(
ctx,
vConfig,
controllerCtx.VirtualManager,
); err != nil {
// connect to vCluster platform if configured
if err := pro.ConnectToPlatform(ctx, vConfig, controllerCtx.VirtualManager); err != nil {
return fmt.Errorf("connect to platform: %w", err)
}

// start leader election + controllers
err = StartLeaderElection(controllerCtx, func() error {
return setup.StartControllers(controllerCtx)
return setup.StartControllers(controllerCtx, syncers)
})
if err != nil {
return fmt.Errorf("start controllers: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions docs/pages/advanced-topics/plugins-development.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ The `SyncDown` function mentioned above is called by the vCluster SDK when a giv

```
func (s *carSyncer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) {
return s.SyncToHostCreate(ctx, vObj, s.TranslateMetadata(ctx.Context, vObj).(*examplev1.Car))
return s.SyncToHostCreate(ctx, vObj, s.TranslateMetadata(ctx, vObj).(*examplev1.Car))
}

func (s *carSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) {
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx.Context, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
}
```
The `TranslateMetadata` function used above produces a Car object that will be created in the host cluster. It is a deep copy of the Car from vCluster, but with certain metadata modifications - the name and labels are transformed, some vCluster labels and annotations are added, many metadata fields are stripped (uid, resourceVersion, etc.).
Expand All @@ -109,7 +109,7 @@ Next, we need to implement code that will handle the updates of the Car. When a
```

func (s *carSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) {
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx.Context, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
}

func (s *carSyncer) translateUpdate(ctx context.Context, pObj, vObj *examplev1.Car) *examplev1.Car {
Expand Down
10 changes: 5 additions & 5 deletions pkg/apiservice/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func StartAPIServiceProxy(ctx *config.ControllerContext, targetServiceName, targ
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// we only allow traffic to discovery paths
if !isAPIServiceProxyPathAllowed(request.Method, request.URL.Path) {
klog.FromContext(ctx.Context).Info("Denied access to api service proxy at path", "path", request.URL.Path, "method", request.Method)
klog.FromContext(ctx).Info("Denied access to api service proxy at path", "path", request.URL.Path, "method", request.Method)
responsewriters.ErrorNegotiated(
kerrors.NewForbidden(metav1.SchemeGroupVersion.WithResource("proxy").GroupResource(), "proxy", fmt.Errorf("paths other than discovery paths are not allowed")),
s,
Expand All @@ -187,7 +187,7 @@ func StartAPIServiceProxy(ctx *config.ControllerContext, targetServiceName, targ
klog.Infof("Listening apiservice proxy on localhost:%d...", hostPort)
err = server.ListenAndServeTLS(tlsCertFile, tlsKeyFile)
if err != nil {
klog.FromContext(ctx.Context).Error(err, "error listening for apiservice proxy and serve tls")
klog.FromContext(ctx).Error(err, "error listening for apiservice proxy and serve tls")
os.Exit(1)
}
}()
Expand Down Expand Up @@ -232,14 +232,14 @@ func isAPIServiceProxyPathAllowed(method, path string) bool {
}

func RegisterAPIService(ctx *config.ControllerContext, serviceName string, hostPort int, groupVersion schema.GroupVersion) error {
return applyOperation(ctx.Context, createOperation(ctx, serviceName, hostPort, groupVersion))
return applyOperation(ctx, createOperation(ctx, serviceName, hostPort, groupVersion))
}

func DeregisterAPIService(ctx *config.ControllerContext, groupVersion schema.GroupVersion) error {
// check if the api service should get created
exists := checkExistingAPIService(ctx.Context, ctx.VirtualManager.GetClient(), groupVersion)
exists := checkExistingAPIService(ctx, ctx.VirtualManager.GetClient(), groupVersion)
if exists {
return applyOperation(ctx.Context, deleteOperation(ctx, groupVersion))
return applyOperation(ctx, deleteOperation(ctx, groupVersion))
}

return nil
Expand Down
16 changes: 2 additions & 14 deletions pkg/config/controller_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"net/http"

"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ControllerContext struct {
Context context.Context
context.Context

LocalManager ctrl.Manager
VirtualManager ctrl.Manager
Expand All @@ -34,17 +33,6 @@ type ControllerContext struct {
AcquiredLeaderHooks []Hook
}

type Filter func(http.Handler, Clients) http.Handler
type Filter func(http.Handler, *ControllerContext) http.Handler

type Hook func(ctx *ControllerContext) error

type Clients struct {
UncachedVirtualClient client.Client
CachedVirtualClient client.Client

UncachedHostClient client.Client
CachedHostClient client.Client

HostConfig *rest.Config
VirtualConfig *rest.Config
}
4 changes: 4 additions & 0 deletions pkg/constants/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const (
PausedReplicasAnnotation = "loft.sh/paused-replicas"
PausedDateAnnotation = "loft.sh/paused-date"

HostClusterPersistentVolumeAnnotation = "vcluster.loft.sh/host-pv"

HostClusterVSCAnnotation = "vcluster.loft.sh/host-volumesnapshotcontent"

// NodeSuffix is the dns suffix for our nodes
NodeSuffix = "nodes.vcluster.com"

Expand Down
7 changes: 7 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constants

const (
K8sKineEndpoint = "unix:///data/kine.sock"
K3sKineEndpoint = "unix:///data/server/kine.sock"
K0sKineEndpoint = "unix:///run/k0s/kine/kine.sock:2379"
)
1 change: 0 additions & 1 deletion pkg/constants/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const (
IndexByPhysicalName = "IndexByPhysicalName"
IndexByVirtualName = "IndexByVirtualName"
IndexByAssigned = "IndexByAssigned"
IndexByStorageClass = "IndexByStorageClass"
IndexByIngressSecret = "IndexByIngressSecret"
IndexByPodSecret = "IndexByPodSecret"
IndexByConfigMap = "IndexByConfigMap"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deploy/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func RegisterInitManifestsController(controllerCtx *config.ControllerContext) er
return err
}

helmBinaryPath, err := helmdownloader.GetHelmBinaryPath(controllerCtx.Context, log.GetInstance())
helmBinaryPath, err := helmdownloader.GetHelmBinaryPath(controllerCtx, log.GetInstance())
if err != nil {
return err
}
Expand All @@ -37,7 +37,7 @@ func RegisterInitManifestsController(controllerCtx *config.ControllerContext) er

go func() {
for {
result, err := controller.Apply(controllerCtx.Context, controllerCtx.Config)
result, err := controller.Apply(controllerCtx, controllerCtx.Config)
if err != nil {
klog.Errorf("Error deploying manifests: %v", err)
time.Sleep(time.Second * 10)
Expand Down
30 changes: 18 additions & 12 deletions pkg/controllers/generic/export_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/mappings/generic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -39,7 +40,7 @@ func CreateExporters(ctx *config.ControllerContext) error {

for _, exportConfig := range exporterConfig.Exports {
_, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster(
registerCtx.Context,
registerCtx,
registerCtx.PhysicalManager.GetConfig(),
registerCtx.VirtualManager.GetConfig(),
schema.FromAPIVersionAndKind(exportConfig.APIVersion, exportConfig.Kind))
Expand Down Expand Up @@ -98,12 +99,17 @@ func createExporterFromConfig(ctx *synccontext.RegisterContext, config *vcluster

gvk := schema.FromAPIVersionAndKind(config.APIVersion, config.Kind)
controllerID := fmt.Sprintf("%s/%s/GenericExport", strings.ToLower(gvk.Kind), strings.ToLower(gvk.Group))
mapper, err := generic.NewMapper(ctx, obj, translate.Default.PhysicalName)
if err != nil {
return nil, err
}

return &exporter{
ObjectPatcher: &exportPatcher{
config: config,
gvk: gvk,
},
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj),
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj, mapper),

patcher: NewPatcher(ctx.VirtualManager.GetClient(), ctx.PhysicalManager.GetClient(), hasStatusSubresource, log.New(controllerID)),
gvk: gvk,
Expand All @@ -123,7 +129,7 @@ func BuildCustomExporter(
replaceWhenInvalid bool,
) (syncertypes.Object, error) {
_, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster(
registerCtx.Context,
registerCtx,
registerCtx.PhysicalManager.GetConfig(),
registerCtx.VirtualManager.GetConfig(),
gvk)
Expand Down Expand Up @@ -165,7 +171,7 @@ func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object)

// apply object to physical cluster
ctx.Log.Infof("Create physical %s %s/%s, since it is missing, but virtual object exists", f.gvk.Kind, vObj.GetNamespace(), vObj.GetName())
pObj, err := f.patcher.ApplyPatches(ctx.Context, vObj, nil, f)
pObj, err := f.patcher.ApplyPatches(ctx, vObj, nil, f)
if kerrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
Expand All @@ -181,7 +187,7 @@ func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object)
}

// wait here for vObj to be created
err = wait.PollUntilContextTimeout(ctx.Context, time.Millisecond*10, time.Second, true, func(pollContext context.Context) (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(pollContext context.Context) (done bool, err error) {
err = ctx.PhysicalClient.Get(pollContext, types.NamespacedName{
Namespace: pObj.GetNamespace(),
Name: pObj.GetName(),
Expand All @@ -207,7 +213,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
// check if virtual object is not matching anymore
if !f.objectMatches(vObj) {
ctx.Log.Infof("delete physical %s %s/%s, because it is not used anymore", f.gvk.Kind, pObj.GetNamespace(), pObj.GetName())
err := ctx.PhysicalClient.Delete(ctx.Context, pObj, &client.DeleteOptions{
err := ctx.PhysicalClient.Delete(ctx, pObj, &client.DeleteOptions{
GracePeriodSeconds: &[]int64{0}[0],
})
if err != nil {
Expand All @@ -222,12 +228,12 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
if vObj.GetDeletionTimestamp() != nil || pObj.GetDeletionTimestamp() != nil {
if pObj.GetDeletionTimestamp() == nil {
ctx.Log.Infof("delete physical object %s/%s, because the virtual object is being deleted", pObj.GetNamespace(), pObj.GetName())
if err := ctx.PhysicalClient.Delete(ctx.Context, pObj); err != nil {
if err := ctx.PhysicalClient.Delete(ctx, pObj); err != nil {
return ctrl.Result{}, err
}
} else if vObj.GetDeletionTimestamp() == nil {
ctx.Log.Infof("delete virtual object %s/%s, because physical object %s/%s is being deleted", vObj.GetNamespace(), vObj.GetName(), pObj.GetNamespace(), pObj.GetName())
if err := ctx.VirtualClient.Delete(ctx.Context, vObj); err != nil {
if err := ctx.VirtualClient.Delete(ctx, vObj); err != nil {
return ctrl.Result{}, nil
}
}
Expand All @@ -236,7 +242,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
}

// apply reverse patches
result, err := f.patcher.ApplyReversePatches(ctx.Context, vObj, pObj, f)
result, err := f.patcher.ApplyReversePatches(ctx, vObj, pObj, f)
if err != nil {
if kerrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
Expand All @@ -257,14 +263,14 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
}

// apply patches
pObj, err = f.patcher.ApplyPatches(ctx.Context, vObj, pObj, f)
pObj, err = f.patcher.ApplyPatches(ctx, vObj, pObj, f)
err = IgnoreAcceptableErrors(err)
if err != nil {
// when invalid, auto delete and recreate to recover
if kerrors.IsInvalid(err) && f.replaceWhenInvalid {
// Replace the object
ctx.Log.Infof("Replace physical object, because apply failed: %v", err)
err = ctx.PhysicalClient.Delete(ctx.Context, pObj, &client.DeleteOptions{
err = ctx.PhysicalClient.Delete(ctx, pObj, &client.DeleteOptions{
GracePeriodSeconds: &[]int64{0}[0],
})
if err != nil {
Expand All @@ -289,7 +295,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
var _ syncertypes.ToVirtualSyncer = &exporter{}

func (f *exporter) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Object) (ctrl.Result, error) {
isManaged, err := f.NamespacedTranslator.IsManaged(ctx.Context, pObj)
isManaged, err := f.NamespacedTranslator.IsManaged(ctx, pObj)
if err != nil {
return ctrl.Result{}, err
} else if !isManaged {
Expand Down
Loading
Loading