Skip to content

Commit

Permalink
Use Endpoint HealthCheckIPs instead of HealthCheckIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Jan 22, 2025
1 parent ffdb10e commit 8c75edd
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 31 deletions.
11 changes: 6 additions & 5 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/pinger"
"k8s.io/apimachinery/pkg/runtime"
k8snet "k8s.io/utils/net"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -122,17 +123,17 @@ func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool {
return false
}

if endpointCreated.Spec.HealthCheckIP == "" || endpointCreated.Spec.CableName == "" {
if endpointCreated.Spec.GetHealthCheckIP(k8snet.IPv4) == "" || endpointCreated.Spec.CableName == "" {
logger.Infof("HealthCheckIP (%q) and/or CableName (%q) for Endpoint %q empty - will not monitor endpoint health",
endpointCreated.Spec.HealthCheckIP, endpointCreated.Spec.CableName, endpointCreated.Name)
endpointCreated.Spec.GetHealthCheckIP(k8snet.IPv4), endpointCreated.Spec.CableName, endpointCreated.Name)
return false
}

h.Lock()
defer h.Unlock()

if pingerObject, found := h.pingers[endpointCreated.Spec.CableName]; found {
if pingerObject.GetIP() == endpointCreated.Spec.HealthCheckIP {
if pingerObject.GetIP() == endpointCreated.Spec.GetHealthCheckIP(k8snet.IPv4) {
return false
}

Expand All @@ -142,7 +143,7 @@ func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool {
}

pingerConfig := pinger.Config{
IP: endpointCreated.Spec.HealthCheckIP,
IP: endpointCreated.Spec.GetHealthCheckIP(k8snet.IPv4),
MaxPacketLossCount: h.config.MaxPacketLossCount,
}

Expand All @@ -160,7 +161,7 @@ func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool {
pingerObject.Start()

logger.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q",
endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP)
endpointCreated.Spec.CableName, endpointCreated.Spec.GetHealthCheckIP(k8snet.IPv4))

return false
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cableengine/healthchecker/healthchecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ var _ = Describe("Controller", func() {

createEndpoint := func(clusterID, healthCheckIP string) *submarinerv1.Endpoint {
endpointSpec := &submarinerv1.EndpointSpec{
ClusterID: clusterID,
CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", clusterID),
HealthCheckIP: healthCheckIP,
ClusterID: clusterID,
CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", clusterID),
HealthCheckIPs: []string{healthCheckIP},
}

endpointName, err := endpointSpec.GenerateName()
Expand Down Expand Up @@ -223,7 +223,7 @@ var _ = Describe("Controller", func() {
})

It("should stop the Pinger and start a new one", func() {
endpoint.Spec.HealthCheckIP = healthCheckIP3
endpoint.Spec.HealthCheckIPs = []string{healthCheckIP3}

test.UpdateResource(endpoints, endpoint)
pingerMap[healthCheckIP1].AwaitStop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cableengine/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (gs *GatewaySyncer) generateGatewayObject() *v1.Gateway {
latencyInfo := gs.healthCheck.GetLatencyInfo(&connection.Endpoint)
if latencyInfo != nil {
connection.LatencyRTT = latencyInfo.Spec
connection.Endpoint.HealthCheckIP = latencyInfo.IP
connection.Endpoint.SetHealthCheckIP(latencyInfo.IP)

if connection.Status == v1.Connected {
lastRTT, _ := time.ParseDuration(latencyInfo.Spec.Last)
Expand Down
11 changes: 6 additions & 5 deletions pkg/cableengine/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,13 @@ func testGatewayLatencyInfo() {
t.awaitGatewayUpdated(t.expectedGateway)

endpointSpec := &submarinerv1.EndpointSpec{
ClusterID: "north",
CableName: "submariner-cable-north-192-68-1-20",
PrivateIPs: []string{"192-68-1-20"},
HealthCheckIP: t.pinger.GetIP(),
ClusterID: "north",
CableName: "submariner-cable-north-192-68-1-20",
PrivateIPs: []string{"192-68-1-20"},
}

endpointSpec.SetHealthCheckIP(t.pinger.GetIP())

endpointName, err := endpointSpec.GenerateName()
Expect(err).To(Succeed())

Expand All @@ -423,7 +424,7 @@ func testGatewayLatencyInfo() {
}

t.engine.Connections = []submarinerv1.Connection{t.expectedGateway.Status.Connections[0]}
t.engine.Connections[0].Endpoint.HealthCheckIP = ""
t.engine.Connections[0].Endpoint.HealthCheckIPs = []string{}

t.expectedGateway.Status.Connections[0].LatencyRTT = &submarinerv1.LatencyRTTSpec{
Last: "93ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func testEndpointSyncing() {
})

JustBeforeEach(func() {
t.localEndpoint.HealthCheckIP = gateway.Annotations[constants.SmGlobalIP]
t.localEndpoint.SetHealthCheckIP(gateway.Annotations[constants.SmGlobalIP])
awaitEndpoint(t.localEndpoints, t.localEndpoint)
})

It("should update the local Endpoint's HealthCheckIP", func() {
gateway.Annotations[constants.SmGlobalIP] = "200.0.0.100"
t.localEndpoint.HealthCheckIP = gateway.Annotations[constants.SmGlobalIP]
t.localEndpoint.SetHealthCheckIP(gateway.Annotations[constants.SmGlobalIP])

test.UpdateResource(t.localGateways, gateway)
awaitEndpoint(t.localEndpoints, t.localEndpoint)
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/datastoresyncer/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/submariner-io/submariner/pkg/globalnet/constants"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
k8snet "k8s.io/utils/net"
)

func (d *DatastoreSyncer) handleCreateOrUpdateGateway(obj runtime.Object, _ int) bool {
Expand Down Expand Up @@ -62,11 +63,11 @@ func (d *DatastoreSyncer) areGatewaysEquivalent(obj1, obj2 *unstructured.Unstruc

func (d *DatastoreSyncer) updateLocalEndpointIfNecessary(globalIP string) bool {
spec := d.localEndpoint.Spec()
if spec.HealthCheckIP != globalIP {
if spec.GetHealthCheckIP(k8snet.IPv4) != globalIP {
logger.Infof("Updating the endpoint HealthCheckIP to globalIP %q", globalIP)

err := d.localEndpoint.Update(context.TODO(), func(existing *submarinerv1.EndpointSpec) {
existing.HealthCheckIP = globalIP
existing.SetHealthCheckIP(globalIP)
})
if err != nil {
logger.Warningf("Error updating the local submariner Endpoint with HealthcheckIP %s: %v", globalIP, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/endpoint/local_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var _ = Describe("GetLocalSpec", func() {
Expect(spec.Subnets).To(Equal(subnets))
Expect(spec.NATEnabled).To(BeFalse())
Expect(spec.BackendConfig[testUDPPortLabel]).To(Equal(testUDPPort))
Expect(spec.HealthCheckIP).To(BeEmpty())
Expect(spec.HealthCheckIPs).To(BeEmpty())
})

When("the gateway node is not annotated with udp port", func() {
Expand Down Expand Up @@ -167,7 +167,7 @@ var _ = Describe("GetLocalSpec", func() {
It("should set the HealthCheckIP", func() {
spec, err := endpoint.GetLocalSpec(context.TODO(), submSpec, client, true)
Expect(err).ToNot(HaveOccurred())
Expect(spec.HealthCheckIP).To(Equal(cniInterfaceIP))
Expect(spec.HealthCheckIPs).To(Equal([]string{cniInterfaceIP}))
})

Context("and globalnet is enabled", func() {
Expand All @@ -178,7 +178,7 @@ var _ = Describe("GetLocalSpec", func() {
It("should not set the HealthCheckIP", func() {
spec, err := endpoint.GetLocalSpec(context.TODO(), submSpec, client, true)
Expect(err).ToNot(HaveOccurred())
Expect(spec.HealthCheckIP).To(BeEmpty())
Expect(spec.HealthCheckIPs).To(BeEmpty())
})
})
})
Expand Down
11 changes: 6 additions & 5 deletions pkg/routeagent_driver/handlers/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
k8snet "k8s.io/utils/net"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -117,14 +118,14 @@ func (h *controller) RemoteEndpointUpdated(endpoint *submarinerv1.Endpoint) erro
func (h *controller) processEndpointCreatedOrUpdated(endpoint *submarinerv1.Endpoint) {
logger.Infof("Processing Endpoint: %#v", endpoint)

if endpoint.Spec.HealthCheckIP == "" || endpoint.Spec.CableName == "" {
if endpoint.Spec.GetHealthCheckIP(k8snet.IPv4) == "" || endpoint.Spec.CableName == "" {
logger.Infof("HealthCheckIP (%q) and/or CableName (%q) for Endpoint %q empty - will not monitor endpoint health",
endpoint.Spec.HealthCheckIP, endpoint.Spec.CableName, endpoint.Name)
endpoint.Spec.GetHealthCheckIP(k8snet.IPv4), endpoint.Spec.CableName, endpoint.Name)
return
}

if pingerObject, found := h.pingers[endpoint.Spec.CableName]; found {
if pingerObject.GetIP() == endpoint.Spec.HealthCheckIP {
if pingerObject.GetIP() == endpoint.Spec.GetHealthCheckIP(k8snet.IPv4) {
return
}

Expand All @@ -134,7 +135,7 @@ func (h *controller) processEndpointCreatedOrUpdated(endpoint *submarinerv1.Endp
}

pingerConfig := pinger.Config{
IP: endpoint.Spec.HealthCheckIP,
IP: endpoint.Spec.GetHealthCheckIP(k8snet.IPv4),
}

if h.config.PingInterval != 0 {
Expand All @@ -155,7 +156,7 @@ func (h *controller) processEndpointCreatedOrUpdated(endpoint *submarinerv1.Endp
pingerObject.Start()

logger.Infof("HealthChecker started pinger for CableName: %q with HealthCheckIP %q",
endpoint.Spec.CableName, endpoint.Spec.HealthCheckIP)
endpoint.Spec.CableName, endpoint.Spec.GetHealthCheckIP(k8snet.IPv4))
}

func (h *controller) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
kubeScheme "k8s.io/client-go/kubernetes/scheme"
k8snet "k8s.io/utils/net"
)

const (
Expand Down Expand Up @@ -160,7 +161,7 @@ var _ = Describe("RemoteEndpoint latency info", func() {
t.pingerMap[healthCheckIP1].AwaitStart()
t.pingerMap[healthCheckIP2] = fake.NewPinger(healthCheckIP2)

endpoint1.Spec.HealthCheckIP = healthCheckIP2
endpoint1.Spec.HealthCheckIPs = []string{healthCheckIP2}

t.UpdateEndpoint(endpoint1)
t.pingerMap[healthCheckIP1].AwaitStop()
Expand All @@ -176,7 +177,7 @@ var _ = Describe("RemoteEndpoint latency info", func() {
endpoint1.Spec.Hostname = "newHostName"
t.UpdateEndpoint(endpoint1)

pingerObject, found := t.pingerMap[endpoint1.Spec.HealthCheckIP]
pingerObject, found := t.pingerMap[endpoint1.Spec.GetHealthCheckIP(k8snet.IPv4)]
Expect(found).To(BeTrue())
Expect(pingerObject.GetIP()).To(Equal(healthCheckIP1))

Expand Down Expand Up @@ -306,7 +307,7 @@ func (t *testDriver) newSubmEndpoint(healthCheckIP string) *submarinerv1.Endpoin
ClusterID: remoteClusterID,
CableName: fmt.Sprintf("submariner-cable-%s-192-68-1-20", remoteClusterID),
}
endpointSpec.HealthCheckIP = healthCheckIP
endpointSpec.HealthCheckIPs = []string{healthCheckIP}

endpointName, err := endpointSpec.GenerateName()
Expect(err).To(Succeed())
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/dataplane/gateway_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func verifyGateway(gw *submarinerv1.Gateway, otherCluster string, healthCheckedE
}

if healthCheckedEnabled {
if gw.Status.Connections[i].Endpoint.HealthCheckIP == "" {
if len(gw.Status.Connections[i].Endpoint.HealthCheckIPs) == 0 {
return false, fmt.Sprintf("Connection for cluster %q has no health check IP. This could be because the Gateway or"+
" Globalnet pod could not determine the cluster's CNI IP address. If so, this would be reported in the pod log.",
otherCluster), nil
Expand Down

0 comments on commit 8c75edd

Please sign in to comment.