Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from kosmos-io:main #21

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ func createKubeConfig(opts *options.Options) (*restclient.Config, error) {
}

func startEndPointsControllers(mgr manager.Manager) error {
restConfig := mgr.GetConfig()

kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}

coreEndPointsController := endpointscontroller.CoreDNSController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName),
Expand All @@ -199,9 +206,12 @@ func startEndPointsControllers(mgr manager.Manager) error {
return fmt.Errorf("error starting %s: %v", endpointscontroller.KonnectivitySyncControllerName, err)
}

nodeGetter := &endpointscontroller.RealNodeGetter{}
APIServerExternalSyncController := endpointscontroller.APIServerExternalSyncController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName),
KubeClient: kubeClient,
NodeGetter: nodeGetter,
}

if err := APIServerExternalSyncController.SetupWithManager(mgr); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/kubenest/common/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package common

import (
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
)

type APIServerExternalResource struct {
Namespace string
Name string
Vc *v1alpha1.VirtualCluster
RootClientSet kubernetes.Interface
}
1 change: 1 addition & 0 deletions pkg/kubenest/constants/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
ControllerFinalizerName = "operator.virtualcluster.io/finalizer"
DefaultKubeconfigPath = "/etc/cluster-tree/cert"
Label = "virtualCluster-app"
LabelValue = "apiserver"
ComponentBeReadyTimeout = 300 * time.Second
ComponentBeDeletedTimeout = 300 * time.Second

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,20 @@ package endpointcontroller
import (
"context"
"fmt"
"strings"
"reflect"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -30,153 +26,176 @@ import (
"github.com/kosmos.io/kosmos/pkg/utils"
)

type NodeGetter interface {
GetAPIServerNodes(client kubernetes.Interface, namespace string) (*v1.NodeList, error)
}

type RealNodeGetter struct{}

func (r *RealNodeGetter) GetAPIServerNodes(client kubernetes.Interface, namespace string) (*v1.NodeList, error) {
return util.GetAPIServerNodes(client, namespace)
}

type APIServerExternalSyncController struct {
client.Client
EventRecorder record.EventRecorder
KubeClient kubernetes.Interface
NodeGetter NodeGetter
}

const APIServerExternalSyncControllerName string = "api-server-external-service-sync-controller"

func (e *APIServerExternalSyncController) SetupWithManager(mgr manager.Manager) error {
skipEvent := func(obj client.Object) bool {
return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != ""
}

return controllerruntime.NewControllerManagedBy(mgr).
Named(APIServerExternalSyncControllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
For(&v1.Endpoints{},
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return skipEvent(createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) },
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
})).
Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(e.newVirtualClusterMapFunc())).
Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(e.newPodMapFunc())).
Complete(e)
}

func (e *APIServerExternalSyncController) newVirtualClusterMapFunc() handler.MapFunc {
return func(a client.Object) []reconcile.Request {
var requests []reconcile.Request
vcluster := a.(*v1alpha1.VirtualCluster)

// Join the Reconcile queue only if the status of the vcluster is Completed
if vcluster.Status.Phase == v1alpha1.Completed {
klog.V(4).Infof("api-server-external-sync-controller: virtualcluster change to completed: %s", vcluster.Name)
// Add the vcluster to the Reconcile queue
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: vcluster.Name,
Namespace: vcluster.Namespace,
func (e *APIServerExternalSyncController) newPodMapFunc() handler.MapFunc {
return func(obj client.Object) []reconcile.Request {
pod, ok := obj.(*v1.Pod)

if !ok {
klog.Warningf("Object is not a Pod, skipping: %v", obj)
return nil
}

// If the pod contains the specified label virtualCluster-app=apiserver,it indicates that it belongs to vc.
if val, exists := pod.Labels[constants.Label]; exists && val == constants.LabelValue {
return []reconcile.Request{
{
NamespacedName: client.ObjectKey{
Name: pod.Name,
Namespace: pod.Namespace,
},
},
})
}
}
return requests

return nil
}
}

func (e *APIServerExternalSyncController) SyncAPIServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error {
kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Errorf("Error getting endpoints: %v", err)
return err
}
klog.V(4).Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints)
for _, subset := range kubeEndpoints.Subsets {
for _, address := range subset.Addresses {
klog.V(4).Infof("IP: %s", address.IP)
}
func (e *APIServerExternalSyncController) SyncAPIServerExternalEndpoints(ctx context.Context, k8sClient kubernetes.Interface, vc *v1alpha1.VirtualCluster) error {
if e.NodeGetter == nil {
return fmt.Errorf("NodeGetter is nil")
}

if len(kubeEndpoints.Subsets) != 1 {
return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes")
nodes, err := e.NodeGetter.GetAPIServerNodes(e.KubeClient, vc.Namespace)
if err != nil {
return fmt.Errorf("failed to get API server nodes: %w", err)
}

if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 {
klog.Errorf("eps %s Addresses length is nil", "kubernetes")
return err
if len(nodes.Items) == 0 {
return fmt.Errorf("no API server nodes found in the cluster")
}

apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("failed to get endpoints for %s : %v", constants.APIServerExternalService, err)
return err
var addresses []v1.EndpointAddress
for _, node := range nodes.Items {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
addresses = append(addresses, v1.EndpointAddress{
IP: address.Address,
})
}
}
}

updateEPS := apiServerExternalEndpoints.DeepCopy()

if apiServerExternalEndpoints != nil {
klog.V(4).Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints)
} else {
klog.V(4).Info("apiServerExternalEndpoints is nil")
if len(addresses) == 0 {
return fmt.Errorf("no internal IP addresses found for the API server nodes")
}

if updateEPS != nil {
klog.V(4).Infof("updateEPS: %v", updateEPS)
} else {
klog.V(4).Info("updateEPS is nil")
apiServerPort, ok := vc.Status.PortMap[constants.APIServerPortKey]
if !ok {
return fmt.Errorf("failed to get API server port from VirtualCluster status")
}
klog.V(4).Infof("API server port: %d", apiServerPort)

if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 {
ip := kubeEndpoints.Subsets[0].Addresses[0].IP
klog.V(4).Infof("IP address: %s", ip)
updateEPS.Subsets[0].Addresses[0].IP = ip
newEndpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: constants.APIServerExternalService,
Namespace: constants.KosmosNs,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
Ports: []v1.EndpointPort{
{
Name: "https",
Port: apiServerPort,
Protocol: v1.ProtocolTCP,
},
},
},
},
}

if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update endpoints for api-server-external-service: %v", err)
return err
//avoid unnecessary updates
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentEndpoint, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Create(ctx, newEndpoint, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create api-server-external-service endpoint: %w", err)
}
klog.V(4).Info("Created api-server-external-service Endpoint")
return nil
} else if err != nil {
return fmt.Errorf("failed to get existing api-server-external-service endpoint: %w", err)
}
} else {
klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS)
return err
}

return nil
// determine if an update is needed
if !reflect.DeepEqual(currentEndpoint.Subsets, newEndpoint.Subsets) {
_, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Update(ctx, newEndpoint, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update api-server-external-service endpoint: %w", err)
}
klog.V(4).Info("Updated api-server-external-service Endpoint")
} else {
klog.V(4).Info("No changes detected in Endpoint, skipping update")
}
return nil
})
}

func (e *APIServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("============ %s start to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName)
defer klog.V(4).Infof("============ %s finish to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName)

var virtualClusterList v1alpha1.VirtualClusterList
if err := e.List(ctx, &virtualClusterList); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
klog.V(4).Infof("query virtualcluster failed: %v", err)
var vcList v1alpha1.VirtualClusterList
if err := e.List(ctx, &vcList, client.InNamespace(request.NamespacedName.Namespace)); err != nil {
klog.Errorf("Failed to list VirtualClusters in namespace %s: %v", request.NamespacedName.Namespace, err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
var targetVirtualCluster v1alpha1.VirtualCluster
hasVirtualCluster := false
for _, vc := range virtualClusterList.Items {
if vc.Namespace == request.Namespace {
targetVirtualCluster = vc
klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name)
hasVirtualCluster = true
break
}
}
if !hasVirtualCluster {
klog.V(4).Infof("virtualcluster %s not found", request.Namespace)

if len(vcList.Items) == 0 {
klog.V(4).Infof("No VirtualCluster found in namespace %s", request.NamespacedName.Namespace)
return reconcile.Result{}, nil
}

if targetVirtualCluster.Status.Phase != v1alpha1.Completed {
// A namespace should correspond to only one virtual cluster (vc). If it corresponds to multiple vcs, it indicates an error.
if len(vcList.Items) > 1 {
klog.Errorf("Multiple VirtualClusters found in namespace %s, expected only one", request.NamespacedName.Namespace)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

vc := vcList.Items[0]

if vc.Status.Phase != v1alpha1.Completed {
klog.V(4).Infof("VirtualCluster %s is not in Completed phase", vc.Name)
return reconcile.Result{}, nil
}

k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster)
k8sClient, err := util.GenerateKubeclient(&vc)
if err != nil {
klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err)
klog.Errorf("Failed to generate Kubernetes client for VirtualCluster %s: %v", vc.Name, err)
return reconcile.Result{}, nil
}

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return e.SyncAPIServerExternalEPS(ctx, k8sClient)
}); err != nil {
klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err)
if err := e.SyncAPIServerExternalEndpoints(ctx, k8sClient, &vc); err != nil {
klog.Errorf("Failed to sync apiserver external Endpoints: %v", err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

Expand Down
Loading
Loading