Skip to content

Commit

Permalink
draft: support serverless leaf cluster
Browse files Browse the repository at this point in the history
Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Jan 4, 2024
1 parent b7a02b7 commit 4c52421
Show file tree
Hide file tree
Showing 448 changed files with 4,320 additions and 3,162 deletions.
15 changes: 7 additions & 8 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,15 @@ func run(ctx context.Context, opts *options.Options) error {
}
}

// init rootPodController
rootPodReconciler := podcontrollers.RootPodReconciler{
GlobalLeafManager: globalleafManager,
RootClient: mgr.GetClient(),
rootPodWorkerQueue := podcontrollers.NewRootPodWorkerQueue(&podcontrollers.RootPodWorkerQueueOption{
Config: config,
RootClient: rootClient,
DynamicRootClient: dynamicClient,
GlobalLeafManager: globalleafManager,
Options: opts,
}
if err := rootPodReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting rootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err)
}
})

go rootPodWorkerQueue.Run(ctx)

if !opts.OnewayStorageControllers {
rootPVCController := pvc.RootPVCController{
Expand Down
8 changes: 8 additions & 0 deletions deploy/crds/kosmos.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ spec:
type: object
clusterTreeOptions:
properties:
accessKey:
description: secret?
type: string
enable:
default: true
type: boolean
Expand Down Expand Up @@ -223,6 +226,11 @@ spec:
type: array
type: object
type: array
leafType:
default: k8s
type: string
secretKey:
type: string
type: object
imageRepository:
type: string
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kosmos.io/kosmos
go 1.20

require (
github.com/andreburgaud/crypt2go v1.4.0
github.com/bep/debounce v1.2.1
github.com/containerd/console v1.0.3
github.com/containerd/containerd v1.6.14
Expand All @@ -12,6 +13,7 @@ require (
github.com/go-logr/logr v1.2.3
github.com/gogo/protobuf v1.3.2
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.1
github.com/olekukonko/tablewriter v0.0.4
github.com/onsi/ginkgo/v2 v2.9.2
Expand All @@ -24,7 +26,7 @@ require (
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
github.com/vishvananda/netlink v1.2.1-beta.2.0.20220630165224-c591ada0fb2b
golang.org/x/sys v0.12.0
golang.org/x/sys v0.15.0
golang.org/x/time v0.3.0
golang.org/x/tools v0.13.0
k8s.io/api v0.26.3
Expand Down Expand Up @@ -97,7 +99,6 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
Expand Down Expand Up @@ -159,13 +160,13 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200324154536-ceff61240acf // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alessio/shellescape v1.2.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
github.com/andreburgaud/crypt2go v1.4.0 h1:i2zy+SmjKlQ22eGMR8pIOgrp8OJwbEYp/TVKIcJJBY4=
github.com/andreburgaud/crypt2go v1.4.0/go.mod h1:MVXF4rq1q0igLFpdb+XW1DfIIgvbSsswGRUwwFbzcnc=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves=
Expand Down Expand Up @@ -1538,8 +1540,9 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -1818,8 +1821,9 @@ golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1828,8 +1832,9 @@ golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1845,8 +1850,9 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/kosmos/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ type ClusterTreeOptions struct {
// LeafModels provide an api to arrange the member cluster with some rules to pretend one or more leaf node
// +optional
LeafModels []LeafModel `json:"leafModels,omitempty"`

// +kubebuilder:default="k8s"
// +optional
LeafType string `json:"leafType,omitempty"`

// secret?
// +optional
AccessKey string `json:"accessKey,omitempty"`

// +optional
SecretKey string `json:"secretKey,omitempty"`
}

type LeafModel struct {
Expand Down
33 changes: 24 additions & 9 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs"
podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod"
leafpodsyncers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod/leaf-pod"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pv"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pvc"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
Expand Down Expand Up @@ -123,6 +124,19 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
return controllerruntime.Result{RequeueAfter: RequeueTime}, err
}

if cluster.Spec.ClusterTreeOptions.LeafType == string(leafUtils.LeafTypeServerless) {
if !cluster.DeletionTimestamp.IsZero() {
// TODO:
return reconcile.Result{}, nil
}
// TODO: ....
if err := CreateOpenApiNode(ctx, cluster, c.RootClientset, c.Options); err != nil {
return controllerruntime.Result{RequeueAfter: RequeueTime}, err
}
// TODO: clean
return reconcile.Result{}, nil
}

config, err := utils.NewConfigFromBytes(cluster.Spec.Kubeconfig, func(config *rest.Config) {
config.QPS = utils.DefaultLeafKubeQPS
config.Burst = utils.DefaultLeafKubeBurst
Expand Down Expand Up @@ -206,7 +220,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
c.ManagerCancelFuncs[cluster.Name] = &cancel
c.ControllerManagersLock.Unlock()

if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config, subContext); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
}

Expand Down Expand Up @@ -243,7 +257,8 @@ func (c *ClusterController) setupControllers(
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
leafClientset kubernetes.Interface,
kosmosClient kosmosversioned.Interface,
leafRestConfig *rest.Config) error {
leafRestConfig *rest.Config,
subContext context.Context) error {
c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{
Client: mgr.GetClient(),
DynamicClient: clientDynamic,
Expand All @@ -255,6 +270,8 @@ func (c *ClusterController) setupControllers(
IgnoreLabels: strings.Split("", ","),
EnableServiceAccount: true,
RestConfig: leafRestConfig,
// LeafType: leafUtils.LeafTypeK8s,
LeafType: leafUtils.LeafTypeK8s,
}, cluster, nodes)

nodeResourcesController := controllers.NodeResourcesController{
Expand Down Expand Up @@ -293,14 +310,12 @@ func (c *ClusterController) setupControllers(
}
}

leafPodController := podcontrollers.LeafPodReconciler{
RootClient: c.Root,
Namespace: "",
}
leafPodWorkerQueue := podcontrollers.NewLeafPodWorkerQueue(&leafpodsyncers.LeafPodWorkerQueueOption{
Config: leafRestConfig,
RootClient: c.RootClientset,
}, leafUtils.LeafTypeK8s) // TODO:

if err := leafPodController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting podUpstreamReconciler %s: %v", podcontrollers.LeafPodControllerName, err)
}
go leafPodWorkerQueue.Run(subContext)

if !c.Options.OnewayStorageControllers {
err := c.setupStorageControllers(mgr, utils.IsOne2OneMode(cluster), cluster.Name)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package k8s

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/runtime"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

type leafPodK8sSyncer struct {
LeafClient *kubernetes.Clientset
RootClient kubernetes.Interface
}

const (
LeafPodControllerName = "leaf-pod-controller"
LeafPodRequeueTime = 10 * time.Second
)

func DeletePodInRootCluster(ctx context.Context, rootnamespacedname runtime.NamespacedName, rootClient kubernetes.Interface) error {
rPod, err := rootClient.CoreV1().Pods(rootnamespacedname.Namespace).Get(ctx, rootnamespacedname.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
} else {
return err
}
}

rPodCopy := rPod.DeepCopy()

if err := rootClient.CoreV1().Pods(rPodCopy.Namespace).Delete(ctx, rPodCopy.Name, metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
}); err != nil {
if !errors.IsNotFound(err) {
return err
}
}

return nil
}

func (s *leafPodK8sSyncer) Reconcile(ctx context.Context, key runtime.NamespacedName) (runtime.Result, error) {
pod, err := s.LeafClient.CoreV1().Pods(key.Namespace).Get(ctx, key.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// delete pod in root
if err := DeletePodInRootCluster(ctx, key, s.RootClient); err != nil {
return runtime.Result{RequeueAfter: LeafPodRequeueTime}, nil
}
return runtime.Result{}, nil
}

klog.Errorf("get %s error: %v", key, err)
return runtime.Result{RequeueAfter: LeafPodRequeueTime}, nil
}

podCopy := pod.DeepCopy()

// if ShouldSkipStatusUpdate(podCopy) {
// return reconcile.Result{}, nil
// }

if podutils.IsKosmosPod(podCopy) {
podutils.FitObjectMeta(&podCopy.ObjectMeta)
podCopy.ResourceVersion = "0"
if _, err := s.RootClient.CoreV1().Pods(podCopy.Namespace).UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil && !errors.IsNotFound(err) {
klog.V(4).Info(fmt.Sprintf("error while updating pod status in kubernetes: %s", err))
return runtime.Result{RequeueAfter: LeafPodRequeueTime}, nil
}
}
return runtime.Result{}, nil
}
Loading

0 comments on commit 4c52421

Please sign in to comment.