Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wind57 committed Apr 4, 2024
1 parent e291a92 commit 8291b41
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServicePort;

import io.kubernetes.client.openapi.models.V1ServiceSpec;

import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.kubernetes.commons.discovery.DefaultKubernetesServiceInstance;
import org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
Expand All @@ -34,15 +36,22 @@
import org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesLoadBalancerProperties;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesServiceInstanceMapper;
import org.springframework.core.log.LogAccessor;
import org.springframework.util.StringUtils;

import static java.util.Optional.ofNullable;
import static org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver.Input;

/**
* @author Ryan Baxter
*/
public class KubernetesClientServiceInstanceMapper implements KubernetesServiceInstanceMapper<V1Service> {

private static final LogAccessor LOG = new LogAccessor(
LogFactory.getLog(KubernetesClientServiceInstanceMapper.class));

private static final String PORT_NAME_PROPERTY = "'spring.cloud.kubernetes.loadbalancer.portName'";

/**
* empty on purpose, load balancer implementation does not need them.
*/
Expand All @@ -63,37 +72,57 @@ public KubernetesClientServiceInstanceMapper(KubernetesLoadBalancerProperties pr

@Override
public KubernetesServiceInstance map(V1Service service) {
final V1ObjectMeta meta = service.getMetadata();
V1ObjectMeta metadata = service.getMetadata();

List<V1ServicePort> ports = ofNullable(service.getSpec()).map(V1ServiceSpec::getPorts).orElse(List.of());
V1ServicePort port;

if (ports.isEmpty()) {
LOG.warn(() -> "service : " + metadata.getName() + " does not have any ServicePort(s),"
+ " will not consider it for load balancing");
return null;
}

final List<V1ServicePort> ports = service.getSpec().getPorts();
V1ServicePort port = null;
if (ports.size() == 1) {
LOG.debug(() -> "single ServicePort found, will use it as-is " + "(without checking " + PORT_NAME_PROPERTY
+ ")");
port = ports.get(0);
}
else if (ports.size() > 1 && StringUtils.hasText(this.properties.getPortName())) {
Optional<V1ServicePort> optPort = ports.stream()
.filter(it -> properties.getPortName().endsWith(it.getName())).findAny();
if (optPort.isPresent()) {
port = optPort.get();
else {
String portNameFromProperties = properties.getPortName();
if (StringUtils.hasText(portNameFromProperties)) {
Optional<V1ServicePort> optionalPort = ports.stream()
.filter(x -> Objects.equals(x.getName(), portNameFromProperties)).findAny();
if (optionalPort.isPresent()) {
LOG.debug(() -> "found port name that matches : " + portNameFromProperties);
port = optionalPort.get();
}
else {
logWarning(portNameFromProperties);
port = ports.get(0);
}
}
else {
LOG.warn(() -> PORT_NAME_PROPERTY + " is not set, as such will not consider service with name : "
+ metadata.getName());
return null;
}
}
if (port == null) {
return null;
}

String host = KubernetesServiceInstanceMapper.createHost(service.getMetadata().getName(),
service.getMetadata().getNamespace(), properties.getClusterDomain());

boolean secure = secure(port, service);

return new DefaultKubernetesServiceInstance(meta.getUid(), meta.getName(), host, port.getPort(),
return new DefaultKubernetesServiceInstance(metadata.getUid(), metadata.getName(), host, port.getPort(),
serviceMetadata(service), secure);
}

private Map<String, String> serviceMetadata(V1Service service) {
V1ObjectMeta metadata = service.getMetadata();
V1ServiceSpec serviceSpec = service.getSpec();
ServiceMetadata serviceMetadata = new ServiceMetadata(metadata.getName(), metadata.getNamespace(),
serviceSpec.getType(), metadata.getLabels(), metadata.getAnnotations());
serviceSpec.getType(), metadata.getLabels(), metadata.getAnnotations());

return DiscoveryClientUtils.serviceInstanceMetadata(PORTS_DATA, serviceMetadata, discoveryProperties);
}
Expand All @@ -105,4 +134,9 @@ private boolean secure(V1ServicePort port, V1Service service) {
return resolver.resolve(input);
}

private void logWarning(String portNameFromProperties) {
LOG.warn(() -> "Did not find a port name that is equal to the value " + portNameFromProperties);
LOG.warn(() -> "Will return 'first' port found, which is non-deterministic");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.openapi.models.V1ServicePortBuilder;
import io.kubernetes.client.openapi.models.V1ServiceSpecBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.kubernetes.commons.discovery.DefaultKubernetesServiceInstance;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesServiceInstance;
Expand All @@ -37,6 +41,7 @@
/**
* @author Ryan Baxter
*/
@ExtendWith(OutputCaptureExtension.class)
class KubernetesClientServiceInstanceMapperTests {

@Test
Expand All @@ -47,14 +52,12 @@ void singlePortNonSecure() {

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(
new V1ServicePortBuilder().withName("http").withPort(80).build()
);
List<V1ServicePort> servicePorts = List.of(new V1ServicePortBuilder().withName("http").withPort(80).build());
V1Service service = createService("database", "default", annotations, labels, servicePorts);

KubernetesServiceInstance serviceInstance = mapper.map(service);
Map<String, String> metadata = Map.of("org.springframework.cloud", "true", "beta", "true",
"k8s_namespace", "default", "type", "V1Service");
Map<String, String> metadata = Map.of("org.springframework.cloud", "true", "beta", "true", "k8s_namespace",
"default", "type", "V1Service");
DefaultKubernetesServiceInstance result = new DefaultKubernetesServiceInstance("0", "database",
"database.default.svc.cluster.local", 80, metadata, false);
assertThat(serviceInstance).isEqualTo(result);
Expand All @@ -65,20 +68,18 @@ void singlePortNonSecure() {
void singlePortSecure() {
KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true", "secured", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(
new V1ServicePortBuilder().withName("http").withPort(80).build()
);
List<V1ServicePort> servicePorts = List.of(new V1ServicePortBuilder().withName("http").withPort(80).build());
V1Service service = createService("database", "default", annotations, labels, servicePorts);

KubernetesServiceInstance serviceInstance = mapper.map(service);
Map<String, String> metadata = Map.of("org.springframework.cloud", "true", "beta", "true", "secured", "true",
"k8s_namespace", "default", "type", "V1Service");
"k8s_namespace", "default", "type", "V1Service");
DefaultKubernetesServiceInstance result = new DefaultKubernetesServiceInstance("0", "database",
"database.default.svc.cluster.local", 80, metadata, true);
"database.default.svc.cluster.local", 80, metadata, true);
assertThat(serviceInstance).isEqualTo(result);
}

Expand All @@ -89,29 +90,118 @@ void multiplePortsSecure() {
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(new V1ServicePortBuilder().withName("http").withPort(80).build(),
new V1ServicePortBuilder().withName("https").withPort(443).build());
V1Service service = createService("database", "default", annotations, labels, servicePorts);

Map<String, String> metadata = Map.of("org.springframework.cloud", "true", "beta", "true", "k8s_namespace",
"default", "type", "V1Service");
KubernetesServiceInstance serviceInstance = mapper.map(service);
DefaultKubernetesServiceInstance result = new DefaultKubernetesServiceInstance("0", "database",
"database.default.svc.cluster.local", 443, metadata, true);
assertThat(serviceInstance).isEqualTo(result);
}

@Test
void testEmptyPorts(CapturedOutput output) {
KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
loadBalancerProperties.setPortName("https");
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of();
V1Service service = createService("database", "default", annotations, labels, servicePorts);
KubernetesServiceInstance serviceInstance = mapper.map(service);
Assertions.assertNull(serviceInstance);
Assertions.assertTrue(output.getOut().contains(
"service : database does not have any ServicePort(s), will not consider it for load balancing"));
}

@Test
void singlePortNameMatchesProperty(CapturedOutput output) {
KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
loadBalancerProperties.setPortName("http");
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(new V1ServicePortBuilder().withName("http").withPort(80).build());
V1Service service = createService("database", "default", annotations, labels, servicePorts);
KubernetesServiceInstance serviceInstance = mapper.map(service);
Assertions.assertNotNull(serviceInstance);
Assertions.assertTrue(output.getOut().contains("single ServicePort found, " +
"will use it as-is (without checking 'spring.cloud.kubernetes.loadbalancer.portName')"));
}

@Test
void singlePortNameDoesNotMatchProperty(CapturedOutput output) {
KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
loadBalancerProperties.setPortName("http-api");
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(new V1ServicePortBuilder().withName("http").withPort(80).build());
V1Service service = createService("database", "default", annotations, labels, servicePorts);
KubernetesServiceInstance serviceInstance = mapper.map(service);
Assertions.assertNotNull(serviceInstance);
Assertions.assertTrue(output.getOut().contains("single ServicePort found, " +
"will use it as-is (without checking 'spring.cloud.kubernetes.loadbalancer.portName')"));
}

@Test
void multiplePortsNameMatchesProperty(CapturedOutput output) {
KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
loadBalancerProperties.setPortName("http");
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(
new V1ServicePortBuilder().withName("http").withPort(80).build(),
new V1ServicePortBuilder().withName("https").withPort(443).build()
);
V1Service service = createService("database", "default", annotations, labels, servicePorts);
KubernetesServiceInstance serviceInstance = mapper.map(service);
Assertions.assertNotNull(serviceInstance);
Assertions.assertTrue(output.getOut().contains("found port name that matches : http"));
Assertions.assertEquals(serviceInstance.getPort(), 80);
}

Map<String, String> metadata = Map.of("org.springframework.cloud", "true", "beta", "true",
"k8s_namespace", "default", "type", "V1Service");
@Test
void multiplePortsNameDoesNotMatchProperty(CapturedOutput output) {
KubernetesLoadBalancerProperties loadBalancerProperties = new KubernetesLoadBalancerProperties();
loadBalancerProperties.setPortName("http");
KubernetesClientServiceInstanceMapper mapper = new KubernetesClientServiceInstanceMapper(loadBalancerProperties,
KubernetesDiscoveryProperties.DEFAULT);

Map<String, String> annotations = Map.of("org.springframework.cloud", "true");
Map<String, String> labels = Map.of("beta", "true");
List<V1ServicePort> servicePorts = List.of(
new V1ServicePortBuilder().withName("http-api").withPort(80).build(),
new V1ServicePortBuilder().withName("https").withPort(443).build()
);
V1Service service = createService("database", "default", annotations, labels, servicePorts);
KubernetesServiceInstance serviceInstance = mapper.map(service);
DefaultKubernetesServiceInstance result = new DefaultKubernetesServiceInstance("0", "database",
"database.default.svc.cluster.local", 443, metadata, true);
assertThat(serviceInstance).isEqualTo(result);
Assertions.assertNotNull(serviceInstance);
Assertions.assertTrue(output.getOut().contains("Did not find a port name that is equal to the value http"));
Assertions.assertTrue(output.getOut().contains("Will return 'first' port found, which is non-deterministic"));
Assertions.assertTrue(serviceInstance.getPort() == 80 || serviceInstance.getPort() == 443);
}

private V1Service createService(String name, String namespace, Map<String, String> annotations,
Map<String, String> labels, List<V1ServicePort> servicePorts) {
return new V1ServiceBuilder()
.withMetadata(new V1ObjectMetaBuilder().withName(name).withUid("0")
.withNamespace(namespace).addToAnnotations(annotations)
.addToLabels(labels).build())
.withSpec(new V1ServiceSpecBuilder().addAllToPorts(servicePorts).withType("V1Service").build()).build();
.withMetadata(new V1ObjectMetaBuilder().withName(name).withUid("0").withNamespace(namespace)
.addToAnnotations(annotations).addToLabels(labels).build())
.withSpec(new V1ServiceSpecBuilder().addAllToPorts(servicePorts).withType("V1Service").build()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@

import java.util.StringJoiner;

import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.kubernetes.commons.discovery.KubernetesServiceInstance;
import org.springframework.core.log.LogAccessor;
import org.springframework.util.StringUtils;

/**
* @author Ryan Baxter
*/
public interface KubernetesServiceInstanceMapper<T> {

/**
* Logger instance.
*/
LogAccessor LOG = new LogAccessor(LogFactory.getLog(KubernetesServiceInstanceMapper.class));

KubernetesServiceInstance map(T service);

static String createHost(String serviceName, String namespace, String clusterDomain) {
Expand Down

0 comments on commit 8291b41

Please sign in to comment.