Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.

Fix epslices #532

Merged
merged 2 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ require (
github.com/golang/protobuf v1.5.3
google.golang.org/grpc v1.50.0
google.golang.org/protobuf v1.28.1
k8s.io/klog/v2 v2.80.1
)

require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
Expand All @@ -15,3 +16,4 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
42 changes: 33 additions & 9 deletions api/localv1/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package localv1

import (
"fmt"
"k8s.io/klog/v2"
"net"
)

Expand All @@ -29,30 +31,52 @@ func (ep *Endpoint) AddAddress(s string) (ip net.IP) {
return ep.IPs.Add(s)
}

func (ep *Endpoint) PortMapping(port *PortMapping) (target int32) {
target = port.TargetPort
if port.TargetPortName != "" {
func (ep *Endpoint) PortMapping(port *PortMapping) (int32, error) {
nameToFind := ""
if port.Name != "" {
nameToFind = port.Name
} else if port.TargetPortName != "" {
nameToFind = port.TargetPortName
}

if nameToFind != "" {
for _, override := range ep.PortOverrides {
if override.Name == port.Name {
target = override.Port
break
if override.Name == nameToFind {
return override.Port, nil
}
}
return 0, fmt.Errorf("not found %s in port overrides", nameToFind)
}
return

if port.TargetPort > 0 {
return port.TargetPort, nil
}

return 0, fmt.Errorf("port mapping is undefined")
}

func (ep *Endpoint) PortMappings(ports []*PortMapping) (mapping map[int32]int32) {
mapping = make(map[int32]int32, len(ports))
for _, port := range ports {
mapping[port.Port] = ep.PortMapping(port)
p, err := ep.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
mapping[port.Port] = p
}
return
}

func (ep *Endpoint) PortNameMappings(ports []*PortMapping) (mapping map[string]int32) {
mapping = make(map[string]int32, len(ports))
for _, port := range ports {
mapping[port.Name] = ep.PortMapping(port)
p, err := ep.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
mapping[port.Name] = p
}
return
}
29 changes: 24 additions & 5 deletions api/localv1/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@ import "fmt"

func ExampleEndpointPortMapping() {
ports := []*PortMapping{
// name doesn't match -> ignore the rest
{Name: "http", TargetPortName: "t-http", TargetPort: 8080},
// name matches -> ignore the rest
{Name: "http2", TargetPortName: "t-http2", TargetPort: 800},
{Name: "metrics", TargetPortName: "t-metrics"},
// name matches -> ignore the rest
{Name: "metrics", TargetPortName: "http2"},
// name matches -> ignore the rest
{Name: "metrics", TargetPort: 80},
// name matches
{Name: "metrics"},
// targetPortName matches, no name -> ignore TargetPort
{TargetPortName: "metrics", TargetPort: 8080},
// targetPortName doesn't match, no name -> ignore targetPort
{TargetPortName: "t-metrics", TargetPort: 8080},
// nothing to match -> err expected
{},
}

ep := &Endpoint{
Expand All @@ -33,11 +46,17 @@ func ExampleEndpointPortMapping() {
}

for _, port := range ports {
fmt.Println(port.Name, ep.PortMapping(port))
p, err := ep.PortMapping(port)
fmt.Println(port.Name, p, err)
}

// Output:
// http 8080
// http2 888
// metrics 1011
// http 0 not found http in port overrides
// http2 888 <nil>
// metrics 1011 <nil>
// metrics 1011 <nil>
// metrics 1011 <nil>
// 1011 <nil>
// 0 not found t-metrics in port overrides
// 0 port mapping is undefined
}
17 changes: 8 additions & 9 deletions backends/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,10 +647,15 @@ func (t *iptables) createEndpointsChain(svcInfo *serviceInfo, allEndpoints *endp
ep = epInfo.IPs.V4[0]
}

targetPort := epInfo.PortMapping(&localv1.PortMapping{
targetPort, err := epInfo.PortMapping(&localv1.PortMapping{
Name: svcInfo.portName,
TargetPortName: svcInfo.targetPortName,
TargetPort: int32(svcInfo.targetPort),
})
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
endpointPortMap[ep] = targetPort
endpoints = append(endpoints, &ep)

Expand Down Expand Up @@ -748,21 +753,15 @@ func (t *iptables) writeDNATRules(svcInfo *serviceInfo, svcName types.Namespaced

targetPort := t.getTargetPort(svcInfo, endpointPortMap, *epIP)

// this seems very sly to me. Doing this because there were 2 entries being added
// one with the right target port and one with zero
// write better logic or verify how baseServiceInfo & endpointPortMap are populated
if targetPort == 0 {
continue
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", net.JoinHostPort(*epIP, strconv.Itoa(targetPort)))
t.natRules.Write(args)
}
}

// if the targetPort is string, fetch the value from endpointPortMap
// if the targetPort is string or portName exists, fetch the value from endpointPortMap
func (t *iptables) getTargetPort(svcInfo *serviceInfo, endpointPortMap map[string]int32, endpoint string) int {
if svcInfo.TargetPortName() != "" {
if svcInfo.PortName() != "" || svcInfo.TargetPortName() != "" {
Copy link
Contributor

@jayunit100 jayunit100 Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if in general , KPNG NEEDS to be dependent on port names for routing rules. To me it shoudlnt.

i.e. if

  • service 10.1.2.3:8080 points to my hamburger (tpn: hport) SAAS
  • service 10.1.2.3:9090 points to my pizza (tpn:pPort) SAAS

Why cant we blindly just write routing rules to 8080 and 9090. Who cares what the names are. After all, the 10.1.2.3:8080 and 10.1.2.3:9090 represent unique primary keys, and are complete in terms of the layer 4 information content required to distinguish these backends.? right?

Copy link
Contributor

@jayunit100 jayunit100 Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it.
Kubernetes supports this odd thing where targetPort=80 and portName=port8080 ... In that case ,

  • port name overrides targetPort
    but KPNG discarded that name info and then forwareded to the incorrect, targetPort 80 which was supposed to be replaced w 8080

Copy link
Contributor

@mcluseau mcluseau Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm isn't it what we provide with port overrides in endpoints? I mean, the PortMapping(...) call in api/localv1/endpoint.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcluseau previously, the PortMapping only worked if TargetPortName was specified. This PR added port name check too: if port.Name != "" ....

Copy link
Contributor

@jayunit100 jayunit100 Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcluseau maybe suggesting there a possible brain fix here ???

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, the idea is still to keep k8s business logic outside of the backends. Otherwise, we're just doing that https://xkcd.com/927/ ;-)
As I'm not very available on the project nowadays, I couldn't dig this a lot, but I can see that @mneverov actually did some work in the file I mentioned, so he can have missed it :-) Now, I just wonder why there's a second round of interpretation has, from my "old" memories, the idea of the PortMappings() calls was to directly provide the "final form" of argument enpointPortMap argument here in iptables.go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, the idea is still to keep k8s business logic outside of the backends

I was not aware of it, got it.

the idea of the PortMappings() calls was to directly provide the "final form" of argument enpointPortMap argument here in iptables.go

From my perspective this PR did not change it.

My interpretation of how port mapping works:

K8s

Endpoint slice reconciler creates endoint slices with port names taken from a corresponding service port name. A Service port binds targetName to the Pod port if the targetName is a string (as it is described in the documentation). The same logic is used in the endpoints controller. If a Service port targetName is specified as a string and it does not match to any Pod port names then the controllers create an EndpointSlice (or legacy Endpoints) without this port. In case if it is the only port specified in a Service then EndpoinSlice will not have ports at all. If the targetName is an integer (i.e. a port 8080), then it is copied to the EndpointSlice port definition.

KPNG

KPNGs frontend kube2store watches api-server and maps service ports to the internal PortMapping representation. The only difference from k8s is that PortMapping has a separate field TargetPortName for the case when the service port is a string.

So, actually this check doesn't make sense and should be removed.

For services without selectors the mapping rules are almost the same:

  1. If a Service specifies a port name, then it has to match with the EndpointSlice port name. If it doesn't - no routing. One corner case is when Service doesn't specify port name, but EndpointSlice does. In this scenario again, no routing.
  2. If a Service specifies targetPort (doesn't matter if it is as a string or actual port) it is not part of the decision, i.e. it can be any bogus port and as long as there is a properly defined EndpointSlice, the routing will be created.
apiVersion: v1
kind: Service
metadata:
  name: nginx-service
spec:
  ports:
    - port: 80
      protocol: TCP
      targetPort: anything-here

---

apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
  name: nginx-service-1
  labels:
    kubernetes.io/service-name: nginx-service
addressType: IPv4
ports:
  - name: ''
    protocol: TCP
    port: 80
endpoints:
  - addresses:
      - "10.244.0.5" # ip of the nginx port

return int(endpointPortMap[endpoint])
}
return svcInfo.TargetPort()
Expand Down
6 changes: 4 additions & 2 deletions backends/nft/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nft
import (
"encoding/hex"
"fmt"
"k8s.io/klog/v2"
"net/netip"
"strconv"

Expand Down Expand Up @@ -63,8 +64,9 @@ func (ctx *renderContext) addEndpointChain(svc *localv1.Service, epIP EpIP, svcC
continue
}

targetPort := epIP.Endpoint.PortMapping(port)
if targetPort == 0 {
targetPort, err := epIP.Endpoint.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}

Expand Down
4 changes: 3 additions & 1 deletion backends/nft/svc-chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package nft

import (
"k8s.io/klog/v2"
"strconv"

localv1 "sigs.k8s.io/kpng/api/localv1"
Expand Down Expand Up @@ -55,7 +56,8 @@ func (ctx *renderContext) addSvcChain(svc *localv1.Service, epIPs []EpIP) {
// filter endpoint based on port availability
subset := make([]EpIP, 0, len(epIPs))
for _, epIP := range epIPs {
if epIP.Endpoint.PortMapping(port) == 0 {
if _, err := epIP.Endpoint.PortMapping(port); err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
subset = append(subset, epIP)
Expand Down
11 changes: 6 additions & 5 deletions client/plugins/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ func (ct Conntrack) Callback(ch <-chan *client.ServiceEndpoints) {

for _, ep := range seps.Endpoints {
for _, epIP := range ep.IPs.All() {
p, err := ep.PortMapping(svcPort)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
flow := Flow{
IPPort: ipp,
EndpointIP: epIP,
TargetPort: ep.PortMapping(svcPort),
}

if flow.TargetPort == 0 {
continue // no target port found
TargetPort: p,
}

ct.flows.Get(flow.Key()).Set(flow)
Expand Down
8 changes: 7 additions & 1 deletion client/plugins/conntrack/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package conntrack

import (
"k8s.io/klog/v2"
"sigs.k8s.io/kpng/api/localv1"
"sigs.k8s.io/kpng/client/localsink"
)
Expand Down Expand Up @@ -89,7 +90,12 @@ func (ps *Sink) DeleteEndpoint(namespace, serviceName, key string) {
for _, epIP := range ep.IPs.All() {
targetPort = port.Port
if port.Port == 0 {
targetPort = int32(ep.PortMapping(port))
p, err := ep.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
targetPort = p
}
flow := Flow{
IPPort: IPPort{Protocol: port.Protocol, DnatIP: svcIP, Port: targetPort},
Expand Down
5 changes: 2 additions & 3 deletions server/jobs/kube2store/slice-event-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (h sliceEventHandler) OnAdd(obj interface{}) {
ServiceName: serviceName,
SourceName: eps.Name,
Endpoint: &localv1.Endpoint{},
Conditions: &globalv1.EndpointConditions{},
Topology: &globalv1.TopologyInfo{},
}

Expand Down Expand Up @@ -85,8 +84,8 @@ func (h sliceEventHandler) OnAdd(obj interface{}) {
sort.Strings(info.Hints.Zones) // stable zone order
}

if r := sliceEndpoint.Conditions.Ready; r != nil && *r {
info.Conditions.Ready = true
if r := sliceEndpoint.Conditions.Ready; r != nil {
info.Conditions = &globalv1.EndpointConditions{Ready: *r}
}

for _, addr := range sliceEndpoint.Addresses {
Expand Down
2 changes: 1 addition & 1 deletion server/pkg/endpoints/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ForNode(tx *proxystore.Tx, si *globalv1.ServiceInfo, nodeName string) (endp

info.Endpoint.Local = info.Topology.Node == nodeName

if !info.Conditions.Ready {
if info.Conditions != nil && !info.Conditions.Ready {
return
}

Expand Down