Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] Remove usage of examples from our template classes and add namespaceName parameter #10418

Merged
merged 7 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ public static List<KafkaUser> getListOfKafkaUsers(final TestStorage testStorage,
for (int i = startPointer; i < endPointer; i++) {
if (userAuthType.equals(UserAuthType.Tls)) {
usersList.add(
KafkaUserTemplates.tlsUser(testStorage.getNamespaceName(), testStorage.getClusterName(), userName + "-" + i)
KafkaUserTemplates.tlsUser(testStorage.getNamespaceName(), userName + "-" + i, testStorage.getClusterName())
.editOrNewSpec()
.withAuthorization(usersAcl)
.endSpec()
.build()
);
} else {
usersList.add(
KafkaUserTemplates.scramShaUser(testStorage.getNamespaceName(), testStorage.getClusterName(), userName + "-" + i)
KafkaUserTemplates.scramShaUser(testStorage.getNamespaceName(), userName + "-" + i, testStorage.getClusterName())
.editOrNewSpec()
.withAuthorization(usersAcl)
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,32 @@
*/
package io.strimzi.systemtest.templates.crd;

import io.strimzi.api.kafka.model.bridge.KafkaBridge;
import io.strimzi.api.kafka.model.bridge.KafkaBridgeBuilder;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.test.TestUtils;

public class KafkaBridgeTemplates {

private KafkaBridgeTemplates() {}

public static KafkaBridgeBuilder kafkaBridge(String name, String bootstrap, int kafkaBridgeReplicas) {
return kafkaBridge(name, name, bootstrap, kafkaBridgeReplicas);
}

public static KafkaBridgeBuilder kafkaBridge(String name, String clusterName, String bootstrap, int kafkaBridgeReplicas) {
KafkaBridge kafkaBridge = getKafkaBridgeFromYaml(TestConstants.PATH_TO_KAFKA_BRIDGE_CONFIG);
return defaultKafkaBridge(kafkaBridge, name, clusterName, bootstrap, kafkaBridgeReplicas);
}
private final static int DEFAULT_HTTP_PORT = 8080;

public static KafkaBridgeBuilder kafkaBridgeWithCors(String name, String bootstrap, int kafkaBridgeReplicas,
String allowedCorsOrigin, String allowedCorsMethods) {
return kafkaBridgeWithCors(name, name, bootstrap, kafkaBridgeReplicas, allowedCorsOrigin, allowedCorsMethods);
public static KafkaBridgeBuilder kafkaBridge(
String namespaceName,
String bridgeName,
String bootstrap,
int kafkaBridgeReplicas
) {
return defaultKafkaBridge(namespaceName, bridgeName, bootstrap, kafkaBridgeReplicas);
}

public static KafkaBridgeBuilder kafkaBridgeWithCors(String name, String clusterName, String bootstrap,
int kafkaBridgeReplicas, String allowedCorsOrigin,
String allowedCorsMethods) {
KafkaBridge kafkaBridge = getKafkaBridgeFromYaml(TestConstants.PATH_TO_KAFKA_BRIDGE_CONFIG);

KafkaBridgeBuilder kafkaBridgeBuilder = defaultKafkaBridge(kafkaBridge, name, clusterName, bootstrap, kafkaBridgeReplicas);

kafkaBridgeBuilder
public static KafkaBridgeBuilder kafkaBridgeWithCors(
String namespaceName,
String bridgeName,
String bootstrap,
int kafkaBridgeReplicas,
String allowedCorsOrigin,
String allowedCorsMethods
) {
return defaultKafkaBridge(namespaceName, bridgeName, bootstrap, kafkaBridgeReplicas)
.editSpec()
.editHttp()
.withNewCors()
Expand All @@ -44,39 +38,40 @@ public static KafkaBridgeBuilder kafkaBridgeWithCors(String name, String cluster
.endCors()
.endHttp()
.endSpec();

return kafkaBridgeBuilder;
}

public static KafkaBridgeBuilder kafkaBridgeWithMetrics(String name, String clusterName, String bootstrap) {
return kafkaBridgeWithMetrics(name, clusterName, bootstrap, 1);
}

public static KafkaBridgeBuilder kafkaBridgeWithMetrics(String name, String clusterName, String bootstrap, int kafkaBridgeReplicas) {
KafkaBridge kafkaBridge = getKafkaBridgeFromYaml(TestConstants.PATH_TO_KAFKA_BRIDGE_CONFIG);

return defaultKafkaBridge(kafkaBridge, name, clusterName, bootstrap, kafkaBridgeReplicas)
public static KafkaBridgeBuilder kafkaBridgeWithMetrics(
String namespaceName,
String bridgeName,
String bootstrap,
int kafkaBridgeReplicas
) {
return defaultKafkaBridge(namespaceName, bridgeName, bootstrap, kafkaBridgeReplicas)
.editSpec()
.withEnableMetrics(true)
.endSpec();
}

private static KafkaBridgeBuilder defaultKafkaBridge(KafkaBridge kafkaBridge, String name, String kafkaClusterName, String bootstrap, int kafkaBridgeReplicas) {
return new KafkaBridgeBuilder(kafkaBridge)
private static KafkaBridgeBuilder defaultKafkaBridge(
String namespaceName,
String bridgeName,
String bootstrap,
int kafkaBridgeReplicas
) {
return new KafkaBridgeBuilder()
.withNewMetadata()
.withName(name)
.withNamespace(ResourceManager.kubeClient().getNamespace())
.withName(bridgeName)
.withNamespace(namespaceName)
.endMetadata()
.editSpec()
.withNewSpec()
.withBootstrapServers(bootstrap)
.withReplicas(kafkaBridgeReplicas)
.withNewInlineLogging()
.addToLoggers("bridge.root.logger", "DEBUG")
.endInlineLogging()
.withNewHttp()
.withPort(DEFAULT_HTTP_PORT)
.endHttp()
.endSpec();
}

private static KafkaBridge getKafkaBridgeFromYaml(String yamlPath) {
return TestUtils.configFromYaml(yamlPath, KafkaBridge.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder;
import io.strimzi.api.kafka.model.connect.KafkaConnect;
import io.strimzi.api.kafka.model.connect.KafkaConnectBuilder;
import io.strimzi.api.kafka.model.connect.KafkaConnectResources;
import io.strimzi.api.kafka.model.connect.build.DockerOutput;
Expand All @@ -25,99 +25,122 @@

import java.util.Random;

import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;

public class KafkaConnectTemplates {

private static final Logger LOGGER = LogManager.getLogger(KafkaConnectTemplates.class);

private KafkaConnectTemplates() {}

public static KafkaConnectBuilder kafkaConnect(String name, final String namespaceName, String clusterName, int kafkaConnectReplicas, String pathToConnectConfig) {
KafkaConnect kafkaConnect = getKafkaConnectFromYaml(pathToConnectConfig);
return defaultKafkaConnect(kafkaConnect, namespaceName, name, clusterName, kafkaConnectReplicas);
}
private static final String METRICS_CONNECT_CONFIG_MAP_SUFFIX = "-connect-metrics";
private static final String CONFIG_MAP_KEY = "metrics-config.yml";

public static KafkaConnectBuilder kafkaConnect(String name, final String namespaceName, String clusterName, int kafkaConnectReplicas) {
return kafkaConnect(name, namespaceName, clusterName, kafkaConnectReplicas, TestConstants.PATH_TO_KAFKA_CONNECT_CONFIG);
}
private KafkaConnectTemplates() {}

public static KafkaConnectBuilder kafkaConnect(String name, final String namespaceName, int kafkaConnectReplicas) {
return kafkaConnect(name, namespaceName, name, kafkaConnectReplicas, TestConstants.PATH_TO_KAFKA_CONNECT_CONFIG);
public static KafkaConnectBuilder kafkaConnect(
final String namespaceName,
final String kafkaConnectClusterName,
final String kafkaClusterName,
final int kafkaConnectReplicas
) {
return defaultKafkaConnect(namespaceName, kafkaConnectClusterName, kafkaClusterName, kafkaConnectReplicas);
}

public static KafkaConnectBuilder kafkaConnectWithMetrics(String name, String namespaceName, int kafkaConnectReplicas) {
return kafkaConnectWithMetrics(name, namespaceName, name, kafkaConnectReplicas);
public static KafkaConnectBuilder kafkaConnect(
final String namespaceName,
final String kafkaClusterName,
final int kafkaConnectReplicas
) {
return kafkaConnect(namespaceName, kafkaClusterName, kafkaClusterName, kafkaConnectReplicas);
}

public static KafkaConnectBuilder kafkaConnectWithMetrics(String name, String namespaceName, String clusterName, int kafkaConnectReplicas) {
KafkaConnect kafkaConnect = getKafkaConnectFromYaml(TestConstants.PATH_TO_KAFKA_CONNECT_METRICS_CONFIG);
createOrReplaceConnectMetrics(namespaceName);
return defaultKafkaConnect(kafkaConnect, namespaceName, name, clusterName, kafkaConnectReplicas);
public static KafkaConnectBuilder kafkaConnectWithMetricsAndFileSinkPlugin(
final String namespaceName,
final String kafkaConnectClusterName,
final String kafkaClusterName,
final int replicas
) {
return kafkaConnectWithFilePlugin(namespaceName, kafkaConnectClusterName, kafkaClusterName, replicas)
.editOrNewSpec()
.withNewJmxPrometheusExporterMetricsConfig()
.withNewValueFrom()
.withNewConfigMapKeyRef(CONFIG_MAP_KEY, getConfigMapName(kafkaConnectClusterName), false)
.endValueFrom()
.endJmxPrometheusExporterMetricsConfig()
.endSpec();
}

public static KafkaConnectBuilder kafkaConnectWithMetricsAndFileSinkPlugin(String name, String namespaceName, String clusterName, int replicas) {
createOrReplaceConnectMetrics(namespaceName);
return kafkaConnectWithFilePlugin(name, namespaceName, clusterName, replicas, TestConstants.PATH_TO_KAFKA_CONNECT_METRICS_CONFIG);
public static ConfigMap connectMetricsConfigMap(String namespaceName, String kafkaConnectClusterName) {
return new ConfigMapBuilder(TestUtils.configMapFromYaml(TestConstants.PATH_TO_KAFKA_CONNECT_METRICS_CONFIG, "connect-metrics"))
.editOrNewMetadata()
.withNamespace(namespaceName)
.withName(getConfigMapName(kafkaConnectClusterName))
.endMetadata()
.build();
}

private static void createOrReplaceConnectMetrics(String namespaceName) {
ConfigMap metricsCm = TestUtils.configMapFromYaml(TestConstants.PATH_TO_KAFKA_CONNECT_METRICS_CONFIG, "connect-metrics");
kubeClient().createConfigMapInNamespace(namespaceName, metricsCm);
private static String getConfigMapName(String kafkaConnectClusterName) {
return kafkaConnectClusterName + METRICS_CONNECT_CONFIG_MAP_SUFFIX;
}

private static KafkaConnectBuilder defaultKafkaConnect(KafkaConnect kafkaConnect, final String namespaceName, String name, String kafkaClusterName, int kafkaConnectReplicas) {
return new KafkaConnectBuilder(kafkaConnect)
private static KafkaConnectBuilder defaultKafkaConnect(
final String namespaceName,
String kafkaConnectClusterName,
String kafkaClusterName,
int kafkaConnectReplicas
) {
return new KafkaConnectBuilder()
.withNewMetadata()
.withName(name)
.withName(kafkaConnectClusterName)
.withNamespace(namespaceName)
.endMetadata()
.editOrNewSpec()
.withVersion(Environment.ST_KAFKA_VERSION)
.withBootstrapServers(KafkaResources.tlsBootstrapAddress(kafkaClusterName))
.withReplicas(kafkaConnectReplicas)
.withNewTls()
.withTrustedCertificates(new CertSecretSourceBuilder().withSecretName(kafkaClusterName + "-cluster-ca-cert").withCertificate("ca.crt").build())
.withTrustedCertificates(
new CertSecretSourceBuilder()
.withSecretName(KafkaResources.clusterCaCertificateSecretName(kafkaClusterName))
.withCertificate("ca.crt")
.build()
)
.endTls()
.addToConfig("group.id", KafkaConnectResources.componentName(name))
.addToConfig("offset.storage.topic", KafkaConnectResources.configStorageTopicOffsets(name))
.addToConfig("config.storage.topic", KafkaConnectResources.metricsAndLogConfigMapName(name))
.addToConfig("status.storage.topic", KafkaConnectResources.configStorageTopicStatus(name))
.addToConfig("group.id", KafkaConnectResources.componentName(kafkaConnectClusterName))
.addToConfig("offset.storage.topic", KafkaConnectResources.configStorageTopicOffsets(kafkaConnectClusterName))
.addToConfig("config.storage.topic", KafkaConnectResources.metricsAndLogConfigMapName(kafkaConnectClusterName))
.addToConfig("status.storage.topic", KafkaConnectResources.configStorageTopicStatus(kafkaConnectClusterName))
.addToConfig("config.storage.replication.factor", "-1")
.addToConfig("offset.storage.replication.factor", "-1")
.addToConfig("status.storage.replication.factor", "-1")
.withNewInlineLogging()
.addToLoggers("connect.root.logger.level", "DEBUG")
.endInlineLogging()
.endSpec();
}

public static KafkaConnectBuilder kafkaConnectWithFilePlugin(String clusterName, String namespaceName, int replicas) {
return kafkaConnectWithFilePlugin(clusterName, namespaceName, clusterName, replicas);
}

public static KafkaConnectBuilder kafkaConnectWithFilePlugin(String name, String namespaceName, String clusterName, int replicas) {
return kafkaConnectWithFilePlugin(name, namespaceName, clusterName, replicas, TestConstants.PATH_TO_KAFKA_CONNECT_CONFIG);
public static KafkaConnectBuilder kafkaConnectWithFilePlugin(String namespaceName, String kafkaClusterName, int replicas) {
return kafkaConnectWithFilePlugin(namespaceName, kafkaClusterName, kafkaClusterName, replicas);
}

/**
* Method for creating the KafkaConnect builder with File plugin - using the KafkaConnect build feature.
* @param name Name for the KafkaConnect resource
* @param namespaceName namespace, where the KafkaConnect resource will be deployed
* @param clusterName name of the Kafka cluster
* @param kafkaConnectClusterName Name for the KafkaConnect resource
* @param kafkaClusterName name of the Kafka cluster
* @param replicas number of KafkaConnect replicas
* @return KafkaConnect builder with File plugin
*/
public static KafkaConnectBuilder kafkaConnectWithFilePlugin(String name, String namespaceName, String clusterName, int replicas, String pathToConnectConfig) {
return addFileSinkPluginOrImage(namespaceName, kafkaConnect(name, namespaceName, clusterName, replicas, pathToConnectConfig));
public static KafkaConnectBuilder kafkaConnectWithFilePlugin(String namespaceName, String kafkaConnectClusterName, String kafkaClusterName, int replicas) {
return addFileSinkPluginOrImage(namespaceName, kafkaConnect(namespaceName, kafkaConnectClusterName, kafkaClusterName, replicas));
}

/**
* Method for adding Connect Build with file-sink plugin to the Connect spec or set Connect's image in case that
* the image is set in `CONNECT_IMAGE_WITH_FILE_SINK_PLUGIN` env. variable
* @param namespaceName namespace for output registry
* @param connectBuilder builder of the Connect resource
* @param kafkaConnectBuilder builder of the Connect resource
* @return updated Connect resource in builder
*/
@SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE")
public static KafkaConnectBuilder addFileSinkPluginOrImage(String namespaceName, KafkaConnectBuilder connectBuilder) {
public static KafkaConnectBuilder addFileSinkPluginOrImage(String namespaceName, KafkaConnectBuilder kafkaConnectBuilder) {
if (!KubeClusterResource.getInstance().isMicroShift() && Environment.CONNECT_IMAGE_WITH_FILE_SINK_PLUGIN.isEmpty()) {
final Plugin fileSinkPlugin = new PluginBuilder()
.withName("file-plugin")
Expand All @@ -130,7 +153,7 @@ public static KafkaConnectBuilder addFileSinkPluginOrImage(String namespaceName,

final String imageFullPath = Environment.getImageOutputRegistry(namespaceName, TestConstants.ST_CONNECT_BUILD_IMAGE_NAME, String.valueOf(new Random().nextInt(Integer.MAX_VALUE)));

return connectBuilder
return kafkaConnectBuilder
.editOrNewSpec()
.editOrNewBuild()
.withPlugins(fileSinkPlugin)
Expand All @@ -144,7 +167,7 @@ public static KafkaConnectBuilder addFileSinkPluginOrImage(String namespaceName,

LOGGER.info("Using {} image from {} env variable", Environment.CONNECT_IMAGE_WITH_FILE_SINK_PLUGIN, Environment.CONNECT_IMAGE_WITH_FILE_SINK_PLUGIN_ENV);

return connectBuilder
return kafkaConnectBuilder
.editOrNewSpec()
.withImage(Environment.CONNECT_IMAGE_WITH_FILE_SINK_PLUGIN)
.endSpec();
Expand All @@ -166,8 +189,4 @@ public static DockerOutput dockerOutput(String imageName) {

return dockerOutputBuilder.build();
}

private static KafkaConnect getKafkaConnectFromYaml(String yamlPath) {
return TestUtils.configFromYaml(yamlPath, KafkaConnect.class);
}
}
Loading
Loading