Skip to content

Commit

Permalink
started work
Browse files Browse the repository at this point in the history
Signed-off-by: wind57 <[email protected]>
  • Loading branch information
wind57 committed Jan 16, 2025
1 parent 2fd986b commit 7886223
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,6 @@ void beforeEach() {
util.busybox(NAMESPACE, Phase.CREATE);
}

/**
* <pre>
* - we deploy a busybox service with 2 replica pods
* - we receive an event from KubernetesCatalogWatcher, assert what is inside it
* - delete the busybox service
* - assert that we receive only spring-cloud-kubernetes-client-catalog-watcher pod
* </pre>
*/
@Test
@Order(1)
void testCatalogWatchWithEndpoints() {
waitForLogStatement("stateGenerator is of type: KubernetesEndpointsCatalogWatch", K3S, APP_NAME);
test();
}

@Test
@Order(2)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.k8s.client.catalog.watcher.it;

import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
import java.util.Set;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.util.Config;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.integration.tests.commons.Commons;
import org.springframework.cloud.kubernetes.integration.tests.commons.fabric8_client.Util;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.k3s.K3sContainer;

/**
* @author wind57
*/

@TestPropertySource(
properties = { "spring.main.cloud-platform=kubernetes", "spring.cloud.config.import-check.enabled=false",
"spring.cloud.kubernetes.discovery.catalogServicesWatchDelay=2000",
"spring.cloud.kubernetes.client.namespace=default",
"logging.level.org.springframework.cloud.kubernetes.client.discovery.catalog=DEBUG" })
@ExtendWith(OutputCaptureExtension.class)
abstract class KubernetesClientCatalogWatchBase {

protected static final String NAMESPACE = "default";

protected static final String NAMESPACE_A = "a";

protected static final String NAMESPACE_B = "b";

protected static final K3sContainer K3S = Commons.container();

protected static Util util;

@BeforeAll
protected static void beforeAll() {
K3S.start();
util = new Util(K3S);
}

protected static KubernetesDiscoveryProperties discoveryProperties(boolean useEndpointSlices) {
return new KubernetesDiscoveryProperties(true, false, Set.of(NAMESPACE, NAMESPACE_A), true, 60, false, null,
Set.of(443, 8443), Map.of(), null, KubernetesDiscoveryProperties.Metadata.DEFAULT, 0, useEndpointSlices,
false, null);
}

protected static ApiClient apiClient() {
String kubeConfigYaml = K3S.getKubeConfigYaml();

ApiClient client;
try {
client = Config.fromConfig(new StringReader(kubeConfigYaml));
}
catch (IOException e) {
throw new RuntimeException(e);
}
return new CoreV1Api(client).getApiClient();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.k8s.client.catalog.watcher.it;

import java.util.Set;

import io.kubernetes.client.openapi.ApiClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.integration.tests.commons.Images;
import org.springframework.cloud.kubernetes.integration.tests.commons.Phase;
import org.springframework.cloud.kubernetes.k8s.client.catalog.watcher.Application;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

import static org.springframework.cloud.kubernetes.k8s.client.catalog.watcher.it.TestAssertions.assertLogStatement;
import static org.springframework.cloud.kubernetes.k8s.client.catalog.watcher.it.TestAssertions.invokeAndAssert;

@SpringBootTest(classes = { KubernetesClientCatalogWatchEndpointsIT.TestConfig.class, Application.class },
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class KubernetesClientCatalogWatchEndpointsIT extends KubernetesClientCatalogWatchBase {

@LocalServerPort
private int port;

@BeforeEach
void beforeEach() {

util.createNamespace(NAMESPACE_A);
util.createNamespace(NAMESPACE_B);

Images.loadBusybox(K3S);

util.busybox(NAMESPACE_A, Phase.CREATE);
util.busybox(NAMESPACE_B, Phase.CREATE);

}

@AfterEach
void afterEach() {
// busybox is deleted as part of the assertions, thus not seen here
util.deleteNamespace(NAMESPACE_A);
util.deleteNamespace(NAMESPACE_B);
}

/**
* <pre>
* - we deploy a busybox service with 2 replica pods in two namespaces : a, b
* - we use endpoints
* - we enable namespace filtering for 'default' and 'a'
* - we receive an event from KubernetesCatalogWatcher, assert what is inside it
* - delete the busybox service in 'a' and 'b'
* - assert that we receive an empty response
* </pre>
*/
@Test
void testCatalogWatchWithEndpoints(CapturedOutput output) {
assertLogStatement(output, "stateGenerator is of type: KubernetesEndpointsCatalogWatch");
invokeAndAssert(util, Set.of(NAMESPACE_A, NAMESPACE_B), port, NAMESPACE_A);
}

@TestConfiguration
static class TestConfig {

@Bean
@Primary
ApiClient client() {
return apiClient();
}

@Bean
@Primary
KubernetesDiscoveryProperties kubernetesDiscoveryProperties() {
return discoveryProperties(false);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.k8s.client.catalog.watcher.it;

import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.cloud.kubernetes.commons.discovery.EndpointNameAndNamespace;
import org.springframework.cloud.kubernetes.integration.tests.commons.Phase;
import org.springframework.cloud.kubernetes.integration.tests.commons.fabric8_client.Util;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.client.WebClient;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.kubernetes.integration.tests.commons.Commons.builder;
import static org.springframework.cloud.kubernetes.integration.tests.commons.Commons.retrySpec;

/**
* @author wind57
*/
final class TestAssertions {

private TestAssertions() {

}

static void assertLogStatement(CapturedOutput output, String textToAssert) {
Awaitility.await()
.during(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(200))
.untilAsserted(() -> Assertions.assertThat(output.getOut()).contains(textToAssert));
}

/**
* the checks are the same for both endpoints and endpoint slices, while the set-up
* for them is different.
*/
@SuppressWarnings("unchecked")
static void invokeAndAssert(Util util, Set<String> namespaces, int port, String assertionNamespace) {

WebClient client = builder().baseUrl("http://localhost:" + port + "/result").build();
EndpointNameAndNamespace[] holder = new EndpointNameAndNamespace[2];
ResolvableType resolvableType = ResolvableType.forClassWithGenerics(List.class, EndpointNameAndNamespace.class);

await().pollInterval(Duration.ofMillis(200)).atMost(Duration.ofSeconds(30)).until(() -> {
List<EndpointNameAndNamespace> result = (List<EndpointNameAndNamespace>) client.method(HttpMethod.GET)
.retrieve()
.bodyToMono(ParameterizedTypeReference.forType(resolvableType.getType()))
.retryWhen(retrySpec())
.block();

if (result != null) {
if (result.size() != 2) {
return false;
}
holder[0] = result.get(0);
holder[1] = result.get(1);
return true;
}

return false;
});

EndpointNameAndNamespace resultOne = holder[0];
EndpointNameAndNamespace resultTwo = holder[1];

assertThat(resultOne).isNotNull();
assertThat(resultTwo).isNotNull();

assertThat(resultOne.endpointName()).contains("busybox");
assertThat(resultTwo.endpointName()).contains("busybox");

assertThat(resultOne.namespace()).isEqualTo(assertionNamespace);
assertThat(resultTwo.namespace()).isEqualTo(assertionNamespace);

namespaces.forEach(namespace -> util.busybox(namespace, Phase.DELETE));

await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(240)).until(() -> {
List<EndpointNameAndNamespace> result = (List<EndpointNameAndNamespace>) client.method(HttpMethod.GET)
.retrieve()
.bodyToMono(ParameterizedTypeReference.forType(resolvableType.getType()))
.retryWhen(retrySpec())
.block();

// we need to get the event from KubernetesCatalogWatch, but that happens
// on periodic bases. So in order to be sure we got the event we care about
// we wait until there is no entry, which means busybox was deleted
// and KubernetesCatalogWatch received that update.
return Objects.requireNonNull(result).isEmpty();
});

}

}

0 comments on commit 7886223

Please sign in to comment.