Skip to content

Commit

Permalink
Set fake Node IP for the virtual pod status.HostIP field when ProxyKu…
Browse files Browse the repository at this point in the history
…belets.ByIP is enabled (#2420)

* set fake Node IP for the virtual pod status.HostIP field when ProxyKubeletsByIP is enabled

* add tests

(cherry picked from commit 2cd6f1f)
  • Loading branch information
neogopher committed Jan 24, 2025
1 parent fe75250 commit 7feb3c6
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 9 deletions.
57 changes: 54 additions & 3 deletions pkg/controllers/resources/pods/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

"github.com/loft-sh/vcluster/pkg/controllers/resources/pods/token"
Expand Down Expand Up @@ -93,6 +94,7 @@ func New(ctx *synccontext.RegisterContext) (syncertypes.Object, error) {

serviceName: ctx.Config.WorkloadService,
enableScheduler: ctx.Config.ControlPlane.Advanced.VirtualScheduler.Enabled,
fakeKubeletIPs: ctx.Config.Networking.Advanced.ProxyKubelets.ByIP,

virtualClusterClient: virtualClusterClient,
physicalClusterClient: physicalClusterClient,
Expand All @@ -111,6 +113,7 @@ type podSyncer struct {

serviceName string
enableScheduler bool
fakeKubeletIPs bool

podTranslator translatepods.Translator
virtualClusterClient kubernetes.Interface
Expand Down Expand Up @@ -223,9 +226,21 @@ func (s *podSyncer) SyncToHost(ctx *synccontext.SyncContext, event *synccontext.
}
}

// if scheduler is enabled we only sync if the pod has a node name
if s.enableScheduler && pPod.Spec.NodeName == "" {
return ctrl.Result{}, nil
if s.enableScheduler {
// if scheduler is enabled we only sync if the pod has a node name
if pPod.Spec.NodeName == "" {
return ctrl.Result{}, nil
}

if s.fakeKubeletIPs {
nodeIP, err := s.getNodeIP(ctx, pPod.Spec.NodeName)
if err != nil {
return ctrl.Result{}, err
}

pPod.Annotations[translatepods.HostIPAnnotation] = nodeIP
pPod.Annotations[translatepods.HostIPsAnnotation] = nodeIP
}
}

err = pro.ApplyPatchesHostObject(ctx, nil, pPod, event.Virtual, ctx.Config.Sync.ToHost.Pods.Patches, false)
Expand All @@ -237,6 +252,9 @@ func (s *podSyncer) SyncToHost(ctx *synccontext.SyncContext, event *synccontext.
}

func (s *podSyncer) Sync(ctx *synccontext.SyncContext, event *synccontext.SyncEvent[*corev1.Pod]) (_ ctrl.Result, retErr error) {
var (
err error
)
// should pod get deleted?
if event.Host.DeletionTimestamp != nil {
if event.Virtual.DeletionTimestamp == nil {
Expand Down Expand Up @@ -280,6 +298,13 @@ func (s *podSyncer) Sync(ctx *synccontext.SyncContext, event *synccontext.SyncEv
return patcher.DeleteVirtualObjectWithOptions(ctx, event.Virtual, event.Host, "node name is different between the two", &client.DeleteOptions{GracePeriodSeconds: &minimumGracePeriodInSeconds})
}

if s.fakeKubeletIPs && event.Host.Status.HostIP != "" {
err = s.rewriteFakeHostIPAddresses(ctx, event.Host)
if err != nil {
return ctrl.Result{}, err
}
}

// validate virtual pod before syncing it to the host cluster
if s.podSecurityStandard != "" {
valid, err := s.isPodSecurityStandardsValid(ctx, event.Virtual, ctx.Log)
Expand Down Expand Up @@ -444,3 +469,29 @@ func (s *podSyncer) assignNodeToPod(ctx *synccontext.SyncContext, pObj *corev1.P

return nil
}

func (s *podSyncer) rewriteFakeHostIPAddresses(ctx *synccontext.SyncContext, pPod *corev1.Pod) error {
nodeIP, err := s.getNodeIP(ctx, pPod.Spec.NodeName)
if err != nil {
return err
}

pPod.Status.HostIP = nodeIP
pPod.Status.HostIPs = []corev1.HostIP{
{IP: nodeIP},
}

return nil
}

func (s *podSyncer) getNodeIP(ctx *synccontext.SyncContext, name string) (string, error) {
serviceName := translate.SafeConcatName(translate.VClusterName, "node", strings.ReplaceAll(name, ".", "-"))

nodeService := &corev1.Service{}
err := ctx.CurrentNamespaceClient.Get(ctx.Context, types.NamespacedName{Name: serviceName, Namespace: ctx.CurrentNamespace}, nodeService)
if err != nil && !kerrors.IsNotFound(err) {
return "", fmt.Errorf("list services: %w", err)
}

return nodeService.Spec.ClusterIP, nil
}
44 changes: 44 additions & 0 deletions pkg/controllers/resources/pods/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,35 @@ func TestSync(t *testing.T) {
},
}

testNodeName := "test123"
pVclusterNodeService := pVclusterService.DeepCopy()
pVclusterNodeService.Name = translate.SafeConcatName(testingutil.DefaultTestVClusterName, "node", testNodeName)

pPodFakeKubelet := pPodBase.DeepCopy()
pPodFakeKubelet.Spec.NodeName = testNodeName
pPodFakeKubelet.Status.HostIP = "3.3.3.3"
pPodFakeKubelet.Status.HostIPs = []corev1.HostIP{
{IP: "3.3.3.3"},
}

vPodWithNodeName := &corev1.Pod{
ObjectMeta: vObjectMeta,
Spec: corev1.PodSpec{
NodeName: testNodeName,
},
}
vPodWithHostIP := vPodWithNodeName.DeepCopy()
vPodWithHostIP.Status.HostIP = pVclusterService.Spec.ClusterIP
vPodWithHostIP.Status.HostIPs = []corev1.HostIP{
{IP: pVclusterService.Spec.ClusterIP},
}

testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
},
}

syncertesting.RunTests(t, []*syncertesting.SyncTest{
{
Name: "Map hostpaths",
Expand All @@ -605,5 +634,20 @@ func TestSync(t *testing.T) {
assert.NilError(t, err)
},
},
{
Name: "Fake Kubelet enabled with Node sync",
InitialVirtualState: []runtime.Object{testNode.DeepCopy(), vPodWithNodeName, vNamespace.DeepCopy()},
InitialPhysicalState: []runtime.Object{testNode.DeepCopy(), pVclusterNodeService.DeepCopy(), pPodFakeKubelet.DeepCopy()},
ExpectedVirtualState: map[schema.GroupVersionKind][]runtime.Object{
corev1.SchemeGroupVersion.WithKind("Pod"): {vPodWithHostIP},
},
Sync: func(ctx *synccontext.RegisterContext) {
ctx.Config.Sync.FromHost.Nodes.Selector.All = true
ctx.Config.Networking.Advanced.ProxyKubelets.ByIP = true
syncContext, syncer := syncertesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*podSyncer).Sync(syncContext, synccontext.NewSyncEventWithOld(pPodFakeKubelet, pPodFakeKubelet, vPodWithNodeName, vPodWithNodeName))
assert.NilError(t, err)
},
},
})
}
25 changes: 19 additions & 6 deletions pkg/controllers/resources/pods/translate/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
ClusterAutoScalerDaemonSetAnnotation = "cluster-autoscaler.kubernetes.io/daemonset-pod"
ServiceAccountNameAnnotation = "vcluster.loft.sh/service-account-name"
ServiceAccountTokenAnnotation = "vcluster.loft.sh/token-"
HostIPAnnotation = "vcluster.loft.sh/host-ip"
HostIPsAnnotation = "vcluster.loft.sh/host-ips"
)

var (
Expand Down Expand Up @@ -103,6 +105,7 @@ func NewTranslator(ctx *synccontext.RegisterContext, eventRecorder record.EventR
serviceAccountsEnabled: ctx.Config.Sync.ToHost.ServiceAccounts.Enabled,
priorityClassesEnabled: ctx.Config.Sync.ToHost.PriorityClasses.Enabled,
enableScheduler: ctx.Config.ControlPlane.Advanced.VirtualScheduler.Enabled,
fakeKubeletIPs: ctx.Config.Networking.Advanced.ProxyKubelets.ByIP,

mountPhysicalHostPaths: ctx.Config.ControlPlane.HostPathMapper.Enabled && !ctx.Config.ControlPlane.HostPathMapper.Central,

Expand Down Expand Up @@ -134,6 +137,7 @@ type translator struct {
overrideHostsResources corev1.ResourceRequirements
priorityClassesEnabled bool
enableScheduler bool
fakeKubeletIPs bool

virtualLogsPath string
virtualPodLogsPath string
Expand Down Expand Up @@ -405,7 +409,7 @@ func (t *translator) translateVolumes(ctx *synccontext.SyncContext, pPod *corev1
}
if pPod.Spec.Volumes[i].DownwardAPI != nil {
for j := range pPod.Spec.Volumes[i].DownwardAPI.Items {
translateFieldRef(pPod.Spec.Volumes[i].DownwardAPI.Items[j].FieldRef)
translateFieldRef(pPod.Spec.Volumes[i].DownwardAPI.Items[j].FieldRef, t.fakeKubeletIPs, t.enableScheduler)
}
}
if pPod.Spec.Volumes[i].ISCSI != nil && pPod.Spec.Volumes[i].ISCSI.SecretRef != nil {
Expand Down Expand Up @@ -471,7 +475,7 @@ func (t *translator) translateProjectedVolume(
}
if projectedVolume.Sources[i].DownwardAPI != nil {
for j := range projectedVolume.Sources[i].DownwardAPI.Items {
translateFieldRef(projectedVolume.Sources[i].DownwardAPI.Items[j].FieldRef)
translateFieldRef(projectedVolume.Sources[i].DownwardAPI.Items[j].FieldRef, t.fakeKubeletIPs, t.enableScheduler)
}
}
if projectedVolume.Sources[i].ServiceAccountToken != nil {
Expand Down Expand Up @@ -570,7 +574,7 @@ func (t *translator) translateProjectedVolume(
return nil
}

func translateFieldRef(fieldSelector *corev1.ObjectFieldSelector) {
func translateFieldRef(fieldSelector *corev1.ObjectFieldSelector, fakeKubeletIPs, enableScheduler bool) {
if fieldSelector == nil {
return
}
Expand All @@ -593,13 +597,22 @@ func translateFieldRef(fieldSelector *corev1.ObjectFieldSelector) {
fieldSelector.FieldPath = "metadata.annotations['" + UIDAnnotation + "']"
case "spec.serviceAccountName":
fieldSelector.FieldPath = "metadata.annotations['" + ServiceAccountNameAnnotation + "']"
// translate downward API references for status.hostIP(s) only when both virtual scheduler & fakeKubeletIPs are enabled
case "status.hostIP":
if fakeKubeletIPs && enableScheduler {
fieldSelector.FieldPath = "metadata.annotations['" + HostIPAnnotation + "']"
}
case "status.hostIPs":
if fakeKubeletIPs && enableScheduler {
fieldSelector.FieldPath = "metadata.annotations['" + HostIPsAnnotation + "']"
}
}
}

func (t *translator) TranslateContainerEnv(ctx *synccontext.SyncContext, envVar []corev1.EnvVar, envFrom []corev1.EnvFromSource, vPod *corev1.Pod, serviceEnvMap map[string]string) ([]corev1.EnvVar, []corev1.EnvFromSource, error) {
envNameMap := make(map[string]struct{})
for j, env := range envVar {
translateDownwardAPI(&envVar[j])
translateDownwardAPI(&envVar[j], t.fakeKubeletIPs, t.enableScheduler)
if env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil && env.ValueFrom.ConfigMapKeyRef.Name != "" {
envVar[j].ValueFrom.ConfigMapKeyRef.Name = mappings.VirtualToHostName(ctx, envVar[j].ValueFrom.ConfigMapKeyRef.Name, vPod.Namespace, mappings.ConfigMaps())
}
Expand Down Expand Up @@ -640,14 +653,14 @@ func (t *translator) TranslateContainerEnv(ctx *synccontext.SyncContext, envVar
return envVar, envFrom, nil
}

func translateDownwardAPI(env *corev1.EnvVar) {
func translateDownwardAPI(env *corev1.EnvVar, fakeKubeletIPs, enableScheduler bool) {
if env.ValueFrom == nil {
return
}
if env.ValueFrom.FieldRef == nil {
return
}
translateFieldRef(env.ValueFrom.FieldRef)
translateFieldRef(env.ValueFrom.FieldRef, fakeKubeletIPs, enableScheduler)
}

func (t *translator) translateDNSConfig(pPod *corev1.Pod, vPod *corev1.Pod, nameServer string) {
Expand Down
10 changes: 10 additions & 0 deletions test/e2e/syncer/pods/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ var _ = ginkgo.Describe("Pods are running in the host cluster", func() {
pod, err := f.HostClient.CoreV1().Pods(pPodName.Namespace).Get(f.Context, pPodName.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

// ignore HostIP differences
resetHostIP(vpod, pod)
framework.ExpectEqual(vpod.Status, pod.Status)

// check for ephemeralContainers subResource
Expand Down Expand Up @@ -139,6 +141,9 @@ var _ = ginkgo.Describe("Pods are running in the host cluster", func() {
pPodName := translate.Default.HostName(nil, podName, ns)
pod, err := f.HostClient.CoreV1().Pods(pPodName.Namespace).Get(f.Context, pPodName.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

// ignore HostIP differences
resetHostIP(vpod, pod)
framework.ExpectEqual(vpod.Status, pod.Status)

// check for conditions
Expand Down Expand Up @@ -684,3 +689,8 @@ var _ = ginkgo.Describe("Pods are running in the host cluster", func() {
framework.ExpectEqual(vPod.Labels[additionalLabelKey], pPod.Labels[additionalLabelKey])
})
})

func resetHostIP(vpod, pod *corev1.Pod) {
vpod.Status.HostIP, pod.Status.HostIP = "", ""
vpod.Status.HostIPs, pod.Status.HostIPs = nil, nil
}

0 comments on commit 7feb3c6

Please sign in to comment.