From ccda1e040f02ccdbfc4368584569c8d93cb0d056 Mon Sep 17 00:00:00 2001 From: Max Neverov Date: Sat, 23 Sep 2023 20:50:34 +0200 Subject: [PATCH 1/2] Set endpoint ready condition. --- server/jobs/kube2store/slice-event-handler.go | 5 ++--- server/pkg/endpoints/node.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/jobs/kube2store/slice-event-handler.go b/server/jobs/kube2store/slice-event-handler.go index 7e4c80646..ca262b027 100644 --- a/server/jobs/kube2store/slice-event-handler.go +++ b/server/jobs/kube2store/slice-event-handler.go @@ -55,7 +55,6 @@ func (h sliceEventHandler) OnAdd(obj interface{}) { ServiceName: serviceName, SourceName: eps.Name, Endpoint: &localv1.Endpoint{}, - Conditions: &globalv1.EndpointConditions{}, Topology: &globalv1.TopologyInfo{}, } @@ -85,8 +84,8 @@ func (h sliceEventHandler) OnAdd(obj interface{}) { sort.Strings(info.Hints.Zones) // stable zone order } - if r := sliceEndpoint.Conditions.Ready; r != nil && *r { - info.Conditions.Ready = true + if r := sliceEndpoint.Conditions.Ready; r != nil { + info.Conditions = &globalv1.EndpointConditions{Ready: *r} } for _, addr := range sliceEndpoint.Addresses { diff --git a/server/pkg/endpoints/node.go b/server/pkg/endpoints/node.go index 5e3ef9c99..e53f1ce3f 100644 --- a/server/pkg/endpoints/node.go +++ b/server/pkg/endpoints/node.go @@ -47,7 +47,7 @@ func ForNode(tx *proxystore.Tx, si *globalv1.ServiceInfo, nodeName string) (endp info.Endpoint.Local = info.Topology.Node == nodeName - if !info.Conditions.Ready { + if info.Conditions != nil && !info.Conditions.Ready { return } From 6ec42e7b6ce414092d406ea1c9bc75bd849cea75 Mon Sep 17 00:00:00 2001 From: Max Neverov Date: Sat, 23 Sep 2023 20:56:08 +0200 Subject: [PATCH 2/2] Map port based on port name, targetPortName, and targetPort. --- api/go.mod | 2 ++ api/go.sum | 2 ++ api/localv1/endpoint.go | 42 +++++++++++++++++++++------ api/localv1/endpoint_test.go | 29 ++++++++++++++---- backends/iptables/iptables.go | 17 +++++------ backends/nft/endpoint.go | 6 ++-- backends/nft/svc-chain.go | 4 ++- client/plugins/conntrack/conntrack.go | 11 +++---- client/plugins/conntrack/sink.go | 8 ++++- 9 files changed, 89 insertions(+), 32 deletions(-) diff --git a/api/go.mod b/api/go.mod index ac2687d96..9cd323a9a 100644 --- a/api/go.mod +++ b/api/go.mod @@ -6,9 +6,11 @@ require ( github.com/golang/protobuf v1.5.3 google.golang.org/grpc v1.50.0 google.golang.org/protobuf v1.28.1 + k8s.io/klog/v2 v2.80.1 ) require ( + github.com/go-logr/logr v1.2.4 // indirect github.com/google/go-cmp v0.5.9 // indirect golang.org/x/net v0.14.0 // indirect golang.org/x/sys v0.11.0 // indirect diff --git a/api/go.sum b/api/go.sum index 60b941f84..793e843d3 100644 --- a/api/go.sum +++ b/api/go.sum @@ -1,3 +1,4 @@ +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -15,3 +16,4 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= diff --git a/api/localv1/endpoint.go b/api/localv1/endpoint.go index 821b1a767..aa29285ba 100644 --- a/api/localv1/endpoint.go +++ b/api/localv1/endpoint.go @@ -17,6 +17,8 @@ limitations under the License. package localv1 import ( + "fmt" + "k8s.io/klog/v2" "net" ) @@ -29,30 +31,52 @@ func (ep *Endpoint) AddAddress(s string) (ip net.IP) { return ep.IPs.Add(s) } -func (ep *Endpoint) PortMapping(port *PortMapping) (target int32) { - target = port.TargetPort - if port.TargetPortName != "" { +func (ep *Endpoint) PortMapping(port *PortMapping) (int32, error) { + nameToFind := "" + if port.Name != "" { + nameToFind = port.Name + } else if port.TargetPortName != "" { + nameToFind = port.TargetPortName + } + + if nameToFind != "" { for _, override := range ep.PortOverrides { - if override.Name == port.Name { - target = override.Port - break + if override.Name == nameToFind { + return override.Port, nil } } + return 0, fmt.Errorf("not found %s in port overrides", nameToFind) } - return + + if port.TargetPort > 0 { + return port.TargetPort, nil + } + + return 0, fmt.Errorf("port mapping is undefined") } func (ep *Endpoint) PortMappings(ports []*PortMapping) (mapping map[int32]int32) { mapping = make(map[int32]int32, len(ports)) for _, port := range ports { - mapping[port.Port] = ep.PortMapping(port) + p, err := ep.PortMapping(port) + if err != nil { + klog.V(1).InfoS("failed to map port", "err", err) + continue + } + mapping[port.Port] = p } return } + func (ep *Endpoint) PortNameMappings(ports []*PortMapping) (mapping map[string]int32) { mapping = make(map[string]int32, len(ports)) for _, port := range ports { - mapping[port.Name] = ep.PortMapping(port) + p, err := ep.PortMapping(port) + if err != nil { + klog.V(1).InfoS("failed to map port", "err", err) + continue + } + mapping[port.Name] = p } return } diff --git a/api/localv1/endpoint_test.go b/api/localv1/endpoint_test.go index b626dada6..2142cdc4c 100644 --- a/api/localv1/endpoint_test.go +++ b/api/localv1/endpoint_test.go @@ -20,9 +20,22 @@ import "fmt" func ExampleEndpointPortMapping() { ports := []*PortMapping{ + // name doesn't match -> ignore the rest {Name: "http", TargetPortName: "t-http", TargetPort: 8080}, + // name matches -> ignore the rest {Name: "http2", TargetPortName: "t-http2", TargetPort: 800}, - {Name: "metrics", TargetPortName: "t-metrics"}, + // name matches -> ignore the rest + {Name: "metrics", TargetPortName: "http2"}, + // name matches -> ignore the rest + {Name: "metrics", TargetPort: 80}, + // name matches + {Name: "metrics"}, + // targetPortName matches, no name -> ignore TargetPort + {TargetPortName: "metrics", TargetPort: 8080}, + // targetPortName doesn't match, no name -> ignore targetPort + {TargetPortName: "t-metrics", TargetPort: 8080}, + // nothing to match -> err expected + {}, } ep := &Endpoint{ @@ -33,11 +46,17 @@ func ExampleEndpointPortMapping() { } for _, port := range ports { - fmt.Println(port.Name, ep.PortMapping(port)) + p, err := ep.PortMapping(port) + fmt.Println(port.Name, p, err) } // Output: - // http 8080 - // http2 888 - // metrics 1011 + // http 0 not found http in port overrides + // http2 888 + // metrics 1011 + // metrics 1011 + // metrics 1011 + // 1011 + // 0 not found t-metrics in port overrides + // 0 port mapping is undefined } diff --git a/backends/iptables/iptables.go b/backends/iptables/iptables.go index a7ea10539..6d7a18e51 100644 --- a/backends/iptables/iptables.go +++ b/backends/iptables/iptables.go @@ -647,10 +647,15 @@ func (t *iptables) createEndpointsChain(svcInfo *serviceInfo, allEndpoints *endp ep = epInfo.IPs.V4[0] } - targetPort := epInfo.PortMapping(&localv1.PortMapping{ + targetPort, err := epInfo.PortMapping(&localv1.PortMapping{ + Name: svcInfo.portName, TargetPortName: svcInfo.targetPortName, TargetPort: int32(svcInfo.targetPort), }) + if err != nil { + klog.V(1).InfoS("failed to map port", "err", err) + continue + } endpointPortMap[ep] = targetPort endpoints = append(endpoints, &ep) @@ -748,21 +753,15 @@ func (t *iptables) writeDNATRules(svcInfo *serviceInfo, svcName types.Namespaced targetPort := t.getTargetPort(svcInfo, endpointPortMap, *epIP) - // this seems very sly to me. Doing this because there were 2 entries being added - // one with the right target port and one with zero - // write better logic or verify how baseServiceInfo & endpointPortMap are populated - if targetPort == 0 { - continue - } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", net.JoinHostPort(*epIP, strconv.Itoa(targetPort))) t.natRules.Write(args) } } -// if the targetPort is string, fetch the value from endpointPortMap +// if the targetPort is string or portName exists, fetch the value from endpointPortMap func (t *iptables) getTargetPort(svcInfo *serviceInfo, endpointPortMap map[string]int32, endpoint string) int { - if svcInfo.TargetPortName() != "" { + if svcInfo.PortName() != "" || svcInfo.TargetPortName() != "" { return int(endpointPortMap[endpoint]) } return svcInfo.TargetPort() diff --git a/backends/nft/endpoint.go b/backends/nft/endpoint.go index 3fda095dd..9141924c2 100644 --- a/backends/nft/endpoint.go +++ b/backends/nft/endpoint.go @@ -19,6 +19,7 @@ package nft import ( "encoding/hex" "fmt" + "k8s.io/klog/v2" "net/netip" "strconv" @@ -63,8 +64,9 @@ func (ctx *renderContext) addEndpointChain(svc *localv1.Service, epIP EpIP, svcC continue } - targetPort := epIP.Endpoint.PortMapping(port) - if targetPort == 0 { + targetPort, err := epIP.Endpoint.PortMapping(port) + if err != nil { + klog.V(1).InfoS("failed to map port", "err", err) continue } diff --git a/backends/nft/svc-chain.go b/backends/nft/svc-chain.go index 9e21d77e4..9d32280a0 100644 --- a/backends/nft/svc-chain.go +++ b/backends/nft/svc-chain.go @@ -17,6 +17,7 @@ limitations under the License. package nft import ( + "k8s.io/klog/v2" "strconv" localv1 "sigs.k8s.io/kpng/api/localv1" @@ -55,7 +56,8 @@ func (ctx *renderContext) addSvcChain(svc *localv1.Service, epIPs []EpIP) { // filter endpoint based on port availability subset := make([]EpIP, 0, len(epIPs)) for _, epIP := range epIPs { - if epIP.Endpoint.PortMapping(port) == 0 { + if _, err := epIP.Endpoint.PortMapping(port); err != nil { + klog.V(1).InfoS("failed to map port", "err", err) continue } subset = append(subset, epIP) diff --git a/client/plugins/conntrack/conntrack.go b/client/plugins/conntrack/conntrack.go index d8ad55e09..f5006451a 100644 --- a/client/plugins/conntrack/conntrack.go +++ b/client/plugins/conntrack/conntrack.go @@ -90,14 +90,15 @@ func (ct Conntrack) Callback(ch <-chan *client.ServiceEndpoints) { for _, ep := range seps.Endpoints { for _, epIP := range ep.IPs.All() { + p, err := ep.PortMapping(svcPort) + if err != nil { + klog.V(1).InfoS("failed to map port", "err", err) + continue + } flow := Flow{ IPPort: ipp, EndpointIP: epIP, - TargetPort: ep.PortMapping(svcPort), - } - - if flow.TargetPort == 0 { - continue // no target port found + TargetPort: p, } ct.flows.Get(flow.Key()).Set(flow) diff --git a/client/plugins/conntrack/sink.go b/client/plugins/conntrack/sink.go index aa5be6397..94db3639d 100644 --- a/client/plugins/conntrack/sink.go +++ b/client/plugins/conntrack/sink.go @@ -17,6 +17,7 @@ limitations under the License. package conntrack import ( + "k8s.io/klog/v2" "sigs.k8s.io/kpng/api/localv1" "sigs.k8s.io/kpng/client/localsink" ) @@ -89,7 +90,12 @@ func (ps *Sink) DeleteEndpoint(namespace, serviceName, key string) { for _, epIP := range ep.IPs.All() { targetPort = port.Port if port.Port == 0 { - targetPort = int32(ep.PortMapping(port)) + p, err := ep.PortMapping(port) + if err != nil { + klog.V(1).InfoS("failed to map port", "err", err) + continue + } + targetPort = p } flow := Flow{ IPPort: IPPort{Protocol: port.Protocol, DnatIP: svcIP, Port: targetPort},