diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e1242fbca..3c0f7a106a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * Store Kafka node certificates in separate Secrets, one Secret per pod. * Allow configuring `ssl.principal.mapping.rules` and custom trusted CAs in Kafka brokers with `type: custom` authentication * Moved HTTP bridge configuration to the ConfigMap setup by the operator. +* Moved Kafka Connect configuration to the ConfigMap created by the operator. ### Major changes, deprecations and removals diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java index cfb8c5eb8f..20ac5f76cb 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java @@ -97,18 +97,20 @@ public class KafkaConnectCluster extends AbstractModel implements SupportsMetric protected static final String EXTERNAL_CONFIGURATION_VOLUME_MOUNT_BASE_PATH = "/opt/kafka/external-configuration/"; protected static final String EXTERNAL_CONFIGURATION_VOLUME_NAME_PREFIX = "ext-conf-"; protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/kafka/oauth-certs/"; - protected static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "kafka-metrics-and-logging"; - protected static final String LOG_AND_METRICS_CONFIG_VOLUME_MOUNT = "/opt/kafka/custom-config/"; + protected static final String CONNECT_CONFIG_VOLUME_NAME = "kafka-connect-configurations"; + protected static final String CONNECT_CONFIG_VOLUME_MOUNT = "/opt/kafka/custom-config/"; // Configuration defaults private static final Probe DEFAULT_HEALTHCHECK_OPTIONS = new ProbeBuilder().withTimeoutSeconds(5).withInitialDelaySeconds(60).build(); + /** + * Key under which the Connect configuration is stored in ConfigMap + */ + static final String KAFKA_CONNECT_CONFIGURATION_FILENAME = "kafka-connect.properties"; + // Kafka Connect configuration keys (EnvVariables) protected static final String ENV_VAR_PREFIX = "KAFKA_CONNECT_"; - protected static final String ENV_VAR_KAFKA_CONNECT_CONFIGURATION = "KAFKA_CONNECT_CONFIGURATION"; protected static final String ENV_VAR_KAFKA_CONNECT_METRICS_ENABLED = "KAFKA_CONNECT_METRICS_ENABLED"; - protected static final String ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS = "KAFKA_CONNECT_BOOTSTRAP_SERVERS"; - protected static final String ENV_VAR_KAFKA_CONNECT_TLS = "KAFKA_CONNECT_TLS"; protected static final String ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS = "KAFKA_CONNECT_TRUSTED_CERTS"; protected static final String ENV_VAR_KAFKA_CONNECT_TLS_AUTH_CERT = "KAFKA_CONNECT_TLS_AUTH_CERT"; protected static final String ENV_VAR_KAFKA_CONNECT_TLS_AUTH_KEY = "KAFKA_CONNECT_TLS_AUTH_KEY"; @@ -129,7 +131,7 @@ public class KafkaConnectCluster extends AbstractModel implements SupportsMetric private Rack rack; private String initImage; protected String serviceName; - protected String loggingAndMetricsConfigMapName; + protected String connectConfigMapName; protected String bootstrapServers; @SuppressWarnings("deprecation") // External Configuration environment variables are deprecated @@ -188,7 +190,7 @@ protected KafkaConnectCluster(Reconciliation reconciliation, HasMetadata resourc super(reconciliation, resource, name, componentType, sharedEnvironmentProvider); this.serviceName = KafkaConnectResources.serviceName(cluster); - this.loggingAndMetricsConfigMapName = KafkaConnectResources.metricsAndLogConfigMapName(cluster); + this.connectConfigMapName = KafkaConnectResources.metricsAndLogConfigMapName(cluster); } /** @@ -370,7 +372,7 @@ protected List getContainerPortList() { protected List getVolumes(boolean isOpenShift) { List volumeList = new ArrayList<>(2); volumeList.add(VolumeUtils.createTempDirVolume(templatePod)); - volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, loggingAndMetricsConfigMapName)); + volumeList.add(VolumeUtils.createConfigMapVolume(CONNECT_CONFIG_VOLUME_NAME, connectConfigMapName)); if (rack != null) { volumeList.add(VolumeUtils.createEmptyDirVolume(INIT_VOLUME_NAME, "1Mi", "Memory")); @@ -434,7 +436,7 @@ private List getExternalConfigurationVolumes(boolean isOpenShift) { protected List getVolumeMounts() { List volumeMountList = new ArrayList<>(2); volumeMountList.add(VolumeUtils.createTempDirVolumeMount()); - volumeMountList.add(VolumeUtils.createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT)); + volumeMountList.add(VolumeUtils.createVolumeMount(CONNECT_CONFIG_VOLUME_NAME, CONNECT_CONFIG_VOLUME_MOUNT)); if (rack != null) { volumeMountList.add(VolumeUtils.createVolumeMount(INIT_VOLUME_NAME, INIT_VOLUME_MOUNT)); @@ -610,9 +612,7 @@ protected String getCommand() { protected List getEnvVars() { List varList = new ArrayList<>(); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_CONFIGURATION, configuration.getConfiguration())); varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_METRICS_ENABLED, String.valueOf(metrics.isEnabled()))); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS, bootstrapServers)); varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled))); JvmOptionUtils.heapOptions(varList, 75, 0L, jvmOptions, resources); @@ -642,8 +642,6 @@ protected List getEnvVars() { } private void populateTLSEnvVars(final List varList) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_TLS, "true")); - if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates()))); } @@ -835,21 +833,38 @@ protected Map defaultPodLabels() { } /** - * Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging - * or metrics, they will nto be set. + * Generates a ConfigMap containing Connect configurations. + * It also generates the metrics and logging configuration. If this operand doesn't support logging + * or metrics, they will not be set. * * @param metricsAndLogging The external CMs with logging and metrics configuration * * @return The generated ConfigMap */ - public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) { + public ConfigMap generateConnectConfigMap(MetricsAndLogging metricsAndLogging) { + // generate the ConfigMap data entries for the metrics and logging configuration + Map data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging); + // add the ConfigMap data entry for Connect configurations + data.put( + KAFKA_CONNECT_CONFIGURATION_FILENAME, + new KafkaConnectConfigurationBuilder(bootstrapServers) + .withConfigProviders() + .withRestListeners(REST_API_PORT) + .withPluginPath() + .withConfigurations(configuration.getConfiguration()) + .withTls(tls) + .withAuthentication(authentication) + .withRackId() + .build() + ); + return ConfigMapUtils .createConfigMap( - loggingAndMetricsConfigMapName, + connectConfigMapName, namespace, labels, ownerReference, - ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging) + data ); } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilder.java new file mode 100644 index 0000000000..d2b01c4013 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilder.java @@ -0,0 +1,323 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + + +import io.strimzi.api.kafka.model.common.ClientTls; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls; + +import java.io.PrintWriter; +import java.io.StringWriter; + +/** + * This class is used to generate the Connect configuration template. The template is later passed using a config map to + * the connect pods. The scripts in the container images will fill in the variables in the template and use the + * configuration file. This class is using the builder pattern to make it easy to test the different parts etc. To + * generate the configuration file, it is using the PrintWriter. + */ +public class KafkaConnectConfigurationBuilder { + // Names of environment variables expanded through config providers inside Connect node + private final static String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:CERTS_STORE_PASSWORD}"; + private final static String PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_SASL_USERNAME}"; + private static final String PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG}"; + private final static String PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_SECRET}"; + private static final String PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_OAUTH_REFRESH_TOKEN}"; + private static final String PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_OAUTH_ACCESS_TOKEN}"; + private static final String PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD}"; + private static final String PLACEHOLDER_OAUTH_CLIENT_ASSERTION_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION}"; + private static final String PLACEHOLDER_ADVERTISED_HOSTNAME_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:ADVERTISED_HOSTNAME}"; + private static final String PLACEHOLDER_RACK_ID_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:STRIMZI_RACK_ID}"; + + private final StringWriter stringWriter = new StringWriter(); + private final PrintWriter writer = new PrintWriter(stringWriter); + private String securityProtocol = "PLAINTEXT"; + + /** + * Connect configuration template constructor + * + * @param bootstrapServers Kafka cluster bootstrap servers to connect to + */ + public KafkaConnectConfigurationBuilder(String bootstrapServers) { + printHeader(); + configureBootstrapServers(bootstrapServers); + } + + /** + * Renders the Kafka cluster bootstrap servers configuration + * + * @param bootstrapServers Kafka cluster bootstrap servers to connect to + */ + private void configureBootstrapServers(String bootstrapServers) { + printSectionHeader("Bootstrap servers"); + writer.println("bootstrap.servers=" + bootstrapServers); + writer.println(); + } + + /** + * Configure the Kafka security protocol to be used + * This internal method is used when the configuration is build, because the security protocol depends on + * TLS and SASL authentication configurations and if they are set + */ + private void configureSecurityProtocol() { + printSectionHeader("Kafka Security protocol"); + writer.println("security.protocol=" + securityProtocol); + writer.println("producer.security.protocol=" + securityProtocol); + writer.println("consumer.security.protocol=" + securityProtocol); + writer.println("admin.security.protocol=" + securityProtocol); + writer.println(); + } + + /** + * Configures the Kafka config providers used for loading some parameters from env vars + * (i.e. user and password for authentication) + * + * @return the builder instance + */ + public KafkaConnectConfigurationBuilder withConfigProviders() { + printSectionHeader("Config providers"); + writer.println("config.providers=strimzienv"); + writer.println("config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"); + writer.println("config.providers.strimzienv.param.allowlist.pattern=.*"); + + writer.println(); + return this; + } + + /** + * Adds the TLS/SSL configuration for connecting to the Kafka cluster. + * The configuration includes the trusted certificates store for TLS connection (server authentication) + * + * @param tls client TLS configuration + * @return the builder instance + */ + public KafkaConnectConfigurationBuilder withTls(ClientTls tls) { + if (tls != null) { + securityProtocol = "SSL"; + + if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { + printSectionHeader("TLS / SSL"); + writer.println("ssl.truststore.location=/tmp/kafka/cluster.truststore.p12"); + writer.println("ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("ssl.truststore.type=PKCS12"); + + writer.println("producer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12"); + writer.println("producer.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + + writer.println("consumer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12"); + writer.println("consumer.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + + writer.println("admin.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12"); + writer.println("admin.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + + writer.println(); + } + + } + return this; + } + + /** + * Adds keystore configuration for mTLS (client authentication) + * or the SASL configuration for client authentication to the Kafka cluster + * + * @param authentication authentication configuration + * @return the builder instance + */ + public KafkaConnectConfigurationBuilder withAuthentication(KafkaClientAuthentication authentication) { + if (authentication != null) { + printSectionHeader("Authentication configuration"); + // configuring mTLS (client TLS authentication) if TLS client authentication is set + if (authentication instanceof KafkaClientAuthenticationTls tlsAuth && tlsAuth.getCertificateAndKey() != null) { + writer.println("ssl.keystore.location=/tmp/kafka/cluster.keystore.p12"); + writer.println("ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("ssl.keystore.type=PKCS12"); + + writer.println("producer.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12"); + writer.println("producer.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("producer.ssl.keystore.type=PKCS12"); + + writer.println("consumer.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12"); + writer.println("consumer.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("consumer.ssl.keystore.type=PKCS12"); + + writer.println("admin.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12"); + writer.println("admin.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("admin.ssl.keystore.type=PKCS12"); + // otherwise SASL or OAuth is going to be used for authentication + } else { + securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT"; + String saslMechanism = null; + StringBuilder jaasConfig = new StringBuilder(); + String oauthCallbackClass = ""; + String producerOauthCallbackClass = ""; + String consumerOauthCallbackClass = ""; + String adminOauthCallbackClass = ""; + + if (authentication instanceof KafkaClientAuthenticationPlain passwordAuth) { + saslMechanism = "PLAIN"; + String passwordFilePath = "/opt/kafka/connect-password/" + passwordAuth.getPasswordSecret().getSecretName() + ":" + passwordAuth.getPasswordSecret().getPassword(); + jaasConfig.append("org.apache.kafka.common.security.plain.PlainLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";"); + } else if (authentication instanceof KafkaClientAuthenticationScram scramAuth) { + if (scramAuth.getType().equals(KafkaClientAuthenticationScramSha256.TYPE_SCRAM_SHA_256)) { + saslMechanism = "SCRAM-SHA-256"; + } else if (scramAuth.getType().equals(KafkaClientAuthenticationScramSha512.TYPE_SCRAM_SHA_512)) { + saslMechanism = "SCRAM-SHA-512"; + } + String passwordFilePath = "/opt/kafka/connect-password/" + scramAuth.getPasswordSecret().getSecretName() + ":" + scramAuth.getPasswordSecret().getPassword(); + jaasConfig.append("org.apache.kafka.common.security.scram.ScramLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";"); + } else if (authentication instanceof KafkaClientAuthenticationOAuth oauth) { + saslMechanism = "OAUTHBEARER"; + jaasConfig.append("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR); + + if (oauth.getClientSecret() != null) { + jaasConfig.append(" oauth.client.secret=" + PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getRefreshToken() != null) { + jaasConfig.append(" oauth.refresh.token=" + PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getAccessToken() != null) { + jaasConfig.append(" oauth.access.token=" + PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getPasswordSecret() != null) { + jaasConfig.append(" oauth.password.grant.password=" + PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getClientAssertion() != null) { + jaasConfig.append(" oauth.client.assertion=" + PLACEHOLDER_OAUTH_CLIENT_ASSERTION_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) { + jaasConfig.append(" oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR + " oauth.ssl.truststore.type=\"PKCS12\""); + } + + jaasConfig.append(";"); + oauthCallbackClass = "sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"; + producerOauthCallbackClass = "producer." + oauthCallbackClass; + consumerOauthCallbackClass = "consumer." + oauthCallbackClass; + adminOauthCallbackClass = "admin." + oauthCallbackClass; + + } + writer.println("sasl.mechanism=" + saslMechanism); + writer.println("sasl.jaas.config=" + jaasConfig); + writer.println(oauthCallbackClass); + + writer.println("producer.sasl.mechanism=" + saslMechanism); + writer.println("producer.sasl.jaas.config=" + jaasConfig); + writer.println(producerOauthCallbackClass); + + writer.println("consumer.sasl.mechanism=" + saslMechanism); + writer.println("consumer.sasl.jaas.config=" + jaasConfig); + writer.println(consumerOauthCallbackClass); + + writer.println("admin.sasl.mechanism=" + saslMechanism); + writer.println("admin.sasl.jaas.config=" + jaasConfig); + writer.println(adminOauthCallbackClass); + + writer.println(); + } + } + return this; + } + + /** + * Adds consumer client rack {@code rack.id}. The rack ID will be set in the container based on the value of the + * {@code STRIMZI_RACK_ID} env var. + * + * @return Returns the builder instance + */ + public KafkaConnectConfigurationBuilder withRackId() { + printSectionHeader("Additional information"); + writer.println("consumer.client.rack=" + PLACEHOLDER_RACK_ID_CONFIG_PROVIDER_ENV_VAR); + writer.println(); + return this; + } + + /** + * Adds user provided Kafka Connect configurations. + * + * @param configurations User provided Kafka Connect configurations + * + * @return Returns the builder instance + */ + public KafkaConnectConfigurationBuilder withConfigurations(String configurations) { + printSectionHeader("Provided configuration"); + writer.println(configurations); + + return this; + } + + /** + * Configures the rest API listener. + * + * @param port Rest API port + * @return Returns the builder instance + */ + public KafkaConnectConfigurationBuilder withRestListeners(int port) { + printSectionHeader("REST Listeners"); + writer.println("rest.advertised.host.name=" + PLACEHOLDER_ADVERTISED_HOSTNAME_CONFIG_PROVIDER_ENV_VAR); + writer.println("rest.advertised.port=" + port); + writer.println(); + + return this; + } + + /** + * Configures plugins. + * + * @return Returns the builder instance + */ + public KafkaConnectConfigurationBuilder withPluginPath() { + printSectionHeader("Plugins"); + writer.println("plugin.path=/opt/kafka/plugins"); + writer.println(); + + return this; + } + + + /** + * Internal method which prints the section header into the configuration file. This makes it more human-readable + * when looking for issues in running pods etc. + * + * @param sectionName Name of the section for which is this header printed + */ + private void printSectionHeader(String sectionName) { + writer.println("##########"); + writer.println("# " + sectionName); + writer.println("##########"); + } + + /** + * Prints the file header which is on the beginning of the configuration file. + */ + private void printHeader() { + writer.println("##############################"); + writer.println("##############################"); + writer.println("# This file is automatically generated by the Strimzi Cluster Operator"); + writer.println("# Any changes to this file will be ignored and overwritten!"); + writer.println("##############################"); + writer.println("##############################"); + writer.println(); + } + + /** + * Generates the configuration template as String + * + * @return String with the Kafka connect configuration template + */ + public String build() { + configureSecurityProtocol(); + return stringWriter.toString(); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Cluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Cluster.java index c867838208..4c21e0dcc6 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Cluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Cluster.java @@ -85,7 +85,7 @@ private KafkaMirrorMaker2Cluster(Reconciliation reconciliation, KafkaMirrorMaker this.connectors = KafkaMirrorMaker2Connectors.fromCrd(reconciliation, resource); this.serviceName = KafkaMirrorMaker2Resources.serviceName(cluster); - this.loggingAndMetricsConfigMapName = KafkaMirrorMaker2Resources.metricsAndLogConfigMapName(cluster); + this.connectConfigMapName = KafkaMirrorMaker2Resources.metricsAndLogConfigMapName(cluster); } /** diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java index cb29f269a5..e7ffec5e95 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java @@ -269,7 +269,7 @@ protected Future manualRollingUpdate(Reconciliation reconciliation, KafkaC */ protected Future generateMetricsAndLoggingConfigMap(Reconciliation reconciliation, KafkaConnectCluster kafkaConnectCluster) { return MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, kafkaConnectCluster.logging(), kafkaConnectCluster.metrics()) - .compose(metricsAndLoggingCm -> Future.succeededFuture(kafkaConnectCluster.generateMetricsAndLogConfigMap(metricsAndLoggingCm))); + .compose(metricsAndLoggingCm -> Future.succeededFuture(kafkaConnectCluster.generateConnectConfigMap(metricsAndLoggingCm))); } /** diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java index 26e0b32238..2a82f2e15c 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java @@ -95,9 +95,11 @@ import java.util.TreeMap; import static java.util.Collections.singletonMap; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; @@ -165,12 +167,15 @@ public class KafkaConnectClusterTest { @ParallelTest public void testMetricsConfigMap() { - ConfigMap metricsCm = kc.generateMetricsAndLogConfigMap(new MetricsAndLogging(metricsCM, null)); - checkMetricsConfigMap(metricsCm); + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + checkMetricsConfigMap(configMap); } - private void checkMetricsConfigMap(ConfigMap metricsCm) { - assertThat(metricsCm.getData().get(MetricsModel.CONFIG_MAP_KEY), is(metricsCmJson)); + private void checkMetricsConfigMap(ConfigMap configMap) { + assertThat(configMap.getData().get(MetricsModel.CONFIG_MAP_KEY), is(metricsCmJson)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("bootstrap.servers=" + bootstrapServers)); + assertThat(connectConfigurations, containsString(expectedConfiguration.asPairs())); } private Map expectedLabels(String name) { @@ -195,9 +200,7 @@ private Map expectedLabels() { protected List getExpectedEnvVars() { List expected = new ArrayList<>(); - expected.add(new EnvVarBuilder().withName(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION).withValue(expectedConfiguration.asPairs()).build()); expected.add(new EnvVarBuilder().withName(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_METRICS_ENABLED).withValue(String.valueOf(true)).build()); - expected.add(new EnvVarBuilder().withName(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS).withValue(bootstrapServers).build()); expected.add(new EnvVarBuilder().withName(KafkaConnectCluster.ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED).withValue(Boolean.toString(JvmOptions.DEFAULT_GC_LOGGING_ENABLED)).build()); expected.add(new EnvVarBuilder().withName(AbstractModel.ENV_VAR_KAFKA_HEAP_OPTS).withValue(kafkaHeapOpts).build()); expected.add(new EnvVarBuilder().withName("NO_PROXY").withValue("127.0.0.1").build()); @@ -343,6 +346,15 @@ public void testPodSetWithTls() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("ssl.truststore.location=/tmp/kafka/cluster.truststore.p12")); + assertThat(connectConfigurations, containsString("ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}")); + assertThat(connectConfigurations, containsString("ssl.truststore.type=PKCS12")); + assertThat(connectConfigurations, containsString("security.protocol=SSL")); + assertThat(connectConfigurations, not(containsString("ssl.keystore."))); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { @@ -354,7 +366,6 @@ public void testPodSetWithTls() { assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-another-secret")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS), is("my-secret/cert.crt;my-secret/new-cert.crt;my-another-secret/another-cert.crt")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS), is("true")); }); } @@ -378,6 +389,15 @@ public void testPodSetWithTlsAuth() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("ssl.keystore.location=/tmp/kafka/cluster.keystore.p12")); + assertThat(connectConfigurations, containsString("ssl.keystore.password=${strimzienv:CERTS_STORE_PASSWORD}")); + assertThat(connectConfigurations, containsString("ssl.keystore.type=PKCS12")); + assertThat(connectConfigurations, containsString("security.protocol=SSL")); + assertThat(connectConfigurations, containsString("ssl.truststore.")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { @@ -385,10 +405,10 @@ public void testPodSetWithTlsAuth() { List containers = pod.getSpec().getContainers(); + assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "user-secret")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS_AUTH_CERT), is("user-secret/user.crt")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS_AUTH_KEY), is("user-secret/user.key")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS), is("true")); }); } @@ -437,6 +457,13 @@ public void testPodSetWithScramSha512Auth() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("sasl.mechanism=SCRAM-SHA-512")); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/user1-secret:password;")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { @@ -444,9 +471,7 @@ public void testPodSetWithScramSha512Auth() { List containers = pod.getSpec().getContainers(); assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.PASSWORD_VOLUME_MOUNT + "user1-secret")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE), is("user1-secret/password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME), is("user1")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM), is("scram-sha-512")); }); } @@ -474,12 +499,20 @@ public void testPodSetWithScramSha512AuthAndTLSSameSecret() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("sasl.mechanism=SCRAM-SHA-512")); + assertThat(connectConfigurations, containsString("security.protocol=SASL_SSL")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-secret:user1.password;")); + assertThat(connectConfigurations, containsString("ssl.truststore.")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(3)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); List containers = pod.getSpec().getContainers(); @@ -487,17 +520,14 @@ public void testPodSetWithScramSha512AuthAndTLSSameSecret() { assertThat(containers.get(0).getVolumeMounts().size(), is(4)); assertThat(containers.get(0).getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(containers.get(0).getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(containers.get(0).getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(containers.get(0).getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(containers.get(0).getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(containers.get(0).getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getName(), is("my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaConnectCluster.PASSWORD_VOLUME_MOUNT + "my-secret")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "scram-sha-512")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -517,6 +547,14 @@ public void testPodSetWithScramSha256Auth() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("sasl.mechanism=SCRAM-SHA-256")); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAIN")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/user1-secret:password;")); + assertThat(connectConfigurations, not(containsString("ssl.truststore."))); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { @@ -524,9 +562,7 @@ public void testPodSetWithScramSha256Auth() { List containers = pod.getSpec().getContainers(); assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.PASSWORD_VOLUME_MOUNT + "user1-secret")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE), is("user1-secret/password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME), is("user1")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM), is("scram-sha-256")); }); } @@ -554,12 +590,20 @@ public void testPodSetWithScramSha256AuthAndTLSSameSecret() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("sasl.mechanism=SCRAM-SHA-256")); + assertThat(connectConfigurations, containsString("security.protocol=SASL_SSL")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-secret:user1.password;")); + assertThat(connectConfigurations, containsString("ssl.truststore.")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(3)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); List containers = pod.getSpec().getContainers(); @@ -567,17 +611,14 @@ public void testPodSetWithScramSha256AuthAndTLSSameSecret() { assertThat(containers.get(0).getVolumeMounts().size(), is(4)); assertThat(containers.get(0).getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(containers.get(0).getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(containers.get(0).getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(containers.get(0).getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(containers.get(0).getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(containers.get(0).getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getName(), is("my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaConnectCluster.PASSWORD_VOLUME_MOUNT + "my-secret")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "scram-sha-256")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -597,6 +638,14 @@ public void testPodSetWithPlainAuth() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=PLAIN")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/user1-secret:password;")); + assertThat(connectConfigurations, not(containsString("ssl.truststore."))); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { @@ -604,9 +653,7 @@ public void testPodSetWithPlainAuth() { List containers = pod.getSpec().getContainers(); assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.PASSWORD_VOLUME_MOUNT + "user1-secret")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE), is("user1-secret/password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME), is("user1")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM), is("plain")); }); } @@ -634,12 +681,20 @@ public void testPodSetWithPlainAuthAndTLSSameSecret() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_SSL")); + assertThat(connectConfigurations, containsString("sasl.mechanism=PLAIN")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-secret:user1.password;")); + assertThat(connectConfigurations, containsString("ssl.truststore.")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(3)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); List containers = pod.getSpec().getContainers(); @@ -647,17 +702,14 @@ public void testPodSetWithPlainAuthAndTLSSameSecret() { assertThat(containers.get(0).getVolumeMounts().size(), is(4)); assertThat(containers.get(0).getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(containers.get(0).getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(containers.get(0).getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(containers.get(0).getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(containers.get(0).getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(containers.get(0).getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(containers.get(0).getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getName(), is("my-secret")); assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaConnectCluster.PASSWORD_VOLUME_MOUNT + "my-secret")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "plain")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -909,7 +961,7 @@ public void testTemplate() { assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(pod.getSpec().getVolumes().get(0).getEmptyDir(), is(notNullValue())); assertThat(pod.getSpec().getVolumes().get(0).getEmptyDir().getSizeLimit(), is(new Quantity("10Mi"))); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(1).getConfigMap().getName(), is("foo-connect-config")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("rack-volume")); assertThat(pod.getSpec().getVolumes().get(2).getEmptyDir(), is(notNullValue())); @@ -931,7 +983,7 @@ public void testTemplate() { assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().size(), is(5)); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(2).getName(), is("rack-volume")); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(2).getMountPath(), is("/opt/kafka/init")); @@ -1575,14 +1627,14 @@ public void testKafkaConnectContainerEnvVars() { @ParallelTest public void testKafkaContainerEnvVarsConflict() { ContainerEnvVar envVar1 = new ContainerEnvVar(); - String testEnvOneKey = KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION; - String testEnvOneValue = "test.env.one"; + String testEnvOneKey = KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS_AUTH_KEY; + String testEnvOneValue = "test user cert"; envVar1.setName(testEnvOneKey); envVar1.setValue(testEnvOneValue); ContainerEnvVar envVar2 = new ContainerEnvVar(); - String testEnvTwoKey = KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS; - String testEnvTwoValue = "test.env.two"; + String testEnvTwoKey = KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS; + String testEnvTwoValue = "test trusted cert"; envVar2.setName(testEnvTwoKey); envVar2.setValue(testEnvTwoValue); @@ -1594,6 +1646,17 @@ public void testKafkaContainerEnvVarsConflict() { KafkaConnect resource = new KafkaConnectBuilder(this.resource) .editSpec() + .withNewTls() + .addToTrustedCertificates(new CertSecretSourceBuilder().withSecretName("my-secret").withCertificate("cert.crt").build()) + .endTls() + .withAuthentication( + new KafkaClientAuthenticationTlsBuilder() + .withNewCertificateAndKey() + .withSecretName("user-secret") + .withCertificate("user.crt") + .withKey("user.key") + .endCertificateAndKey() + .build()) .withNewTemplate() .withConnectContainer(kafkaConnectContainer) .endTemplate() @@ -1601,7 +1664,6 @@ public void testKafkaContainerEnvVarsConflict() { .build(); List kafkaEnvVars = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER).getEnvVars(); - assertThat("Failed to prevent over writing existing container environment variable: " + testEnvOneKey, kafkaEnvVars.stream().filter(env -> testEnvOneKey.equals(env.getName())) .map(EnvVar::getValue).findFirst().orElse("").equals(testEnvOneValue), is(false)); @@ -1657,13 +1719,17 @@ public void testOpenTelemetryTracing() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("consumer.interceptor.classes=" + OpenTelemetryTracing.CONSUMER_INTERCEPTOR_CLASS_NAME)); + assertThat(connectConfigurations, containsString("producer.interceptor.classes=" + OpenTelemetryTracing.PRODUCER_INTERCEPTOR_CLASS_NAME)); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); assertThat(cont.getEnv().stream().filter(env -> KafkaConnectCluster.ENV_VAR_STRIMZI_TRACING.equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").equals(OpenTelemetryTracing.TYPE_OPENTELEMETRY), is(true)); - assertThat(cont.getEnv().stream().filter(env -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION.equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("consumer.interceptor.classes=" + OpenTelemetryTracing.CONSUMER_INTERCEPTOR_CLASS_NAME), is(true)); - assertThat(cont.getEnv().stream().filter(env -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION.equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("producer.interceptor.classes=" + OpenTelemetryTracing.PRODUCER_INTERCEPTOR_CLASS_NAME), is(true)); }); } @@ -1683,11 +1749,18 @@ public void testPodSetWithOAuthWithAccessToken() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.access.token=${strimzienv:KAFKA_CONNECT_OAUTH_ACCESS_TOKEN};")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_ACCESS_TOKEN.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-token-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_ACCESS_TOKEN.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-token-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().isEmpty(), is(true)); @@ -1718,11 +1791,18 @@ public void testPodSetWithOAuthWithRefreshToken() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.refresh.token=${strimzienv:KAFKA_CONNECT_OAUTH_REFRESH_TOKEN};")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_REFRESH_TOKEN.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-token-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_REFRESH_TOKEN.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-token-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), @@ -1751,11 +1831,18 @@ public void testPodSetWithOAuthWithClientSecret() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.client.secret=${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_SECRET};")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), @@ -1784,11 +1871,18 @@ public void testPodSetWithOAuthWithClientSecretAndSaslExtensions() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.client.secret=${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_SECRET};")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), @@ -1817,11 +1911,18 @@ public void testPodSetWithOAuthWithClientAssertion() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.client.assertion=${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION};")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), @@ -1852,11 +1953,18 @@ public void testPodSetWithOAuthWithUsernameAndPassword() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT")); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.client.secret=${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_SECRET} oauth.password.grant.password=${strimzienv:KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD};")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-password-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("user1.password")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); @@ -1939,12 +2047,18 @@ public void testPodSetWithOAuthWithTls() { KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER")); + assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG} oauth.client.secret=${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_SECRET} oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD} oauth.ssl.truststore.type=\"PKCS12\";")); + assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + // Check PodSet StrimziPodSet podSet = kc.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_SECRET.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilderTest.java new file mode 100644 index 0000000000..eaa80f1475 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilderTest.java @@ -0,0 +1,451 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import io.strimzi.api.kafka.model.common.ClientTls; +import io.strimzi.api.kafka.model.common.ClientTlsBuilder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuthBuilder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlainBuilder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256Builder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512Builder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTlsBuilder; +import io.strimzi.test.annotations.ParallelSuite; +import io.strimzi.test.annotations.ParallelTest; + +import static io.strimzi.operator.cluster.TestUtils.IsEquivalent.isEquivalent; +import static org.hamcrest.MatcherAssert.assertThat; + +@ParallelSuite +class KafkaConnectConfigurationBuilderTest { + + private static final String BOOTSTRAP_SERVERS = "my-cluster-kafka-bootstrap:9092"; + + @ParallelTest + public void testBuild() { + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS).build(); + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=PLAINTEXT", + "producer.security.protocol=PLAINTEXT", + "consumer.security.protocol=PLAINTEXT", + "admin.security.protocol=PLAINTEXT" + )); + } + + @ParallelTest + public void testWithConfigProviders() { + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withConfigProviders() + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=PLAINTEXT", + "producer.security.protocol=PLAINTEXT", + "consumer.security.protocol=PLAINTEXT", + "admin.security.protocol=PLAINTEXT", + "config.providers=strimzienv", + "config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider", + "config.providers.strimzienv.param.allowlist.pattern=.*" + )); + } + + @ParallelTest + public void testWithTls() { + ClientTls clientTls = new ClientTlsBuilder() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withTls(clientTls) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SSL", + "producer.security.protocol=SSL", + "consumer.security.protocol=SSL", + "admin.security.protocol=SSL", + "ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "ssl.truststore.type=PKCS12", + "producer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "producer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "consumer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "consumer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "admin.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "admin.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}" + )); + } + + @ParallelTest + public void testWithTlsAndClientAuthentication() { + ClientTls clientTls = new ClientTlsBuilder() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .build(); + + KafkaClientAuthenticationTls tlsAuth = new KafkaClientAuthenticationTlsBuilder() + .withNewCertificateAndKey() + .withSecretName("tls-keystore") + .withCertificate("pem-content") + .endCertificateAndKey() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withTls(clientTls) + .withAuthentication(tlsAuth) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SSL", + "producer.security.protocol=SSL", + "consumer.security.protocol=SSL", + "admin.security.protocol=SSL", + "ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "ssl.truststore.type=PKCS12", + "producer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "producer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "consumer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "consumer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "admin.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "admin.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "ssl.keystore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "ssl.keystore.type=PKCS12", + "producer.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "producer.ssl.keystore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "producer.ssl.keystore.type=PKCS12", + "consumer.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "consumer.ssl.keystore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "consumer.ssl.keystore.type=PKCS12", + "admin.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "admin.ssl.keystore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "admin.ssl.keystore.type=PKCS12" + )); + } + + @ParallelTest + public void testWithPlainAndSaslMechanism() { + KafkaClientAuthenticationPlain authPlain = new KafkaClientAuthenticationPlainBuilder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withAuthentication(authPlain) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SASL_PLAINTEXT", + "producer.security.protocol=SASL_PLAINTEXT", + "consumer.security.protocol=SASL_PLAINTEXT", + "admin.security.protocol=SASL_PLAINTEXT", + "sasl.mechanism=PLAIN", + "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "producer.sasl.mechanism=PLAIN", + "producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "consumer.sasl.mechanism=PLAIN", + "consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "admin.sasl.mechanism=PLAIN", + "admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;" + )); + } + + @ParallelTest + public void testWithTlsAndSaslMechanism() { + ClientTls clientTls = new ClientTlsBuilder() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .build(); + + KafkaClientAuthenticationPlain authPlain = new KafkaClientAuthenticationPlainBuilder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withTls(clientTls) + .withAuthentication(authPlain) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SASL_SSL", + "producer.security.protocol=SASL_SSL", + "consumer.security.protocol=SASL_SSL", + "admin.security.protocol=SASL_SSL", + "ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "ssl.truststore.type=PKCS12", + "producer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "producer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "consumer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "consumer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "admin.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "admin.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "sasl.mechanism=PLAIN", + "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "producer.sasl.mechanism=PLAIN", + "producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "consumer.sasl.mechanism=PLAIN", + "consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "admin.sasl.mechanism=PLAIN", + "admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;" + )); + } + + @ParallelTest + public void testWithPlainAndScramSha256() { + KafkaClientAuthenticationScramSha256 authScramSha256 = new KafkaClientAuthenticationScramSha256Builder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withAuthentication(authScramSha256) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SASL_PLAINTEXT", + "producer.security.protocol=SASL_PLAINTEXT", + "consumer.security.protocol=SASL_PLAINTEXT", + "admin.security.protocol=SASL_PLAINTEXT", + "sasl.mechanism=SCRAM-SHA-256", + "sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "producer.sasl.mechanism=SCRAM-SHA-256", + "producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "consumer.sasl.mechanism=SCRAM-SHA-256", + "consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "admin.sasl.mechanism=SCRAM-SHA-256", + "admin.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;" + )); + } + + @ParallelTest + public void testWithTlsAndScramSha256() { + ClientTls clientTls = new ClientTlsBuilder() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .build(); + + KafkaClientAuthenticationScramSha256 authScramSha256 = new KafkaClientAuthenticationScramSha256Builder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withTls(clientTls) + .withAuthentication(authScramSha256) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SASL_SSL", + "producer.security.protocol=SASL_SSL", + "consumer.security.protocol=SASL_SSL", + "admin.security.protocol=SASL_SSL", + "ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "ssl.truststore.type=PKCS12", + "producer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "producer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "consumer.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "consumer.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "admin.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "admin.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "sasl.mechanism=SCRAM-SHA-256", + "sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "producer.sasl.mechanism=SCRAM-SHA-256", + "producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "consumer.sasl.mechanism=SCRAM-SHA-256", + "consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "admin.sasl.mechanism=SCRAM-SHA-256", + "admin.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;" + )); + } + + @ParallelTest + public void testWithPlainAndScramSha512() { + KafkaClientAuthenticationScramSha512 authScramSha512 = new KafkaClientAuthenticationScramSha512Builder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withAuthentication(authScramSha512) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SASL_PLAINTEXT", + "producer.security.protocol=SASL_PLAINTEXT", + "consumer.security.protocol=SASL_PLAINTEXT", + "admin.security.protocol=SASL_PLAINTEXT", + "sasl.mechanism=SCRAM-SHA-512", + "sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "producer.sasl.mechanism=SCRAM-SHA-512", + "producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "consumer.sasl.mechanism=SCRAM-SHA-512", + "consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;", + "admin.sasl.mechanism=SCRAM-SHA-512", + "admin.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_CONNECT_SASL_USERNAME} password=/opt/kafka/connect-password/my-auth-secret:my-password-key;" + )); + } + + @ParallelTest + public void testWithAuthOauth() { + KafkaClientAuthenticationOAuth authOAuth = new KafkaClientAuthenticationOAuthBuilder() + .withClientId("oauth-client-id") + .withTokenEndpointUri("http://token-endpoint-uri") + .withUsername("oauth-username") + .withNewClientSecret() + .withSecretName("my-client-secret-secret") + .withKey("my-client-secret-key") + .endClientSecret() + .withNewRefreshToken() + .withSecretName("my-refresh-token-secret") + .withKey("my-refresh-token-key") + .endRefreshToken() + .withNewAccessToken() + .withSecretName("my-access-token-secret") + .withKey("my-access-token-key") + .endAccessToken() + .withNewPasswordSecret() + .withSecretName("my-password-secret-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .addNewTlsTrustedCertificate() + .withSecretName("my-tls-trusted-certificate") + .withCertificate("pem-content") + .endTlsTrustedCertificate() + .build(); + + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withAuthentication(authOAuth) + .build(); + + String saslJaasConfig = "sasl.jaas.config=" + + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_CONNECT_OAUTH_CONFIG}" + + " oauth.client.secret=${strimzienv:KAFKA_CONNECT_OAUTH_CLIENT_SECRET}" + + " oauth.refresh.token=${strimzienv:KAFKA_CONNECT_OAUTH_REFRESH_TOKEN}" + + " oauth.access.token=${strimzienv:KAFKA_CONNECT_OAUTH_ACCESS_TOKEN}" + + " oauth.password.grant.password=${strimzienv:KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD}" + + " oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD} oauth.ssl.truststore.type=\"PKCS12\";"; + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=SASL_PLAINTEXT", + "producer.security.protocol=SASL_PLAINTEXT", + "consumer.security.protocol=SASL_PLAINTEXT", + "admin.security.protocol=SASL_PLAINTEXT", + "sasl.mechanism=OAUTHBEARER", + saslJaasConfig, + "sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler", + "producer.sasl.mechanism=OAUTHBEARER", + "producer." + saslJaasConfig, + "producer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler", + "consumer.sasl.mechanism=OAUTHBEARER", + "consumer." + saslJaasConfig, + "consumer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler", + "admin.sasl.mechanism=OAUTHBEARER", + "admin." + saslJaasConfig, + "admin.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" + )); + } + + @ParallelTest + public void testWithRackId() { + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withRackId() + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=PLAINTEXT", + "producer.security.protocol=PLAINTEXT", + "consumer.security.protocol=PLAINTEXT", + "admin.security.protocol=PLAINTEXT", + "consumer.client.rack=${strimzienv:STRIMZI_RACK_ID}" + )); + + } + + @ParallelTest + public void testWithConfigurations() { + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withConfigurations("myconfig=abc") + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=PLAINTEXT", + "producer.security.protocol=PLAINTEXT", + "consumer.security.protocol=PLAINTEXT", + "admin.security.protocol=PLAINTEXT", + "myconfig=abc" + )); + } + + @ParallelTest + public void testWithRestListeners() { + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withRestListeners(8083) + .build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=PLAINTEXT", + "producer.security.protocol=PLAINTEXT", + "consumer.security.protocol=PLAINTEXT", + "admin.security.protocol=PLAINTEXT", + "rest.advertised.host.name=${strimzienv:ADVERTISED_HOSTNAME}", + "rest.advertised.port=8083" + )); + } + + @ParallelTest + public void withPluginPath() { + String configuration = new KafkaConnectConfigurationBuilder(BOOTSTRAP_SERVERS) + .withPluginPath().build(); + + assertThat(configuration, isEquivalent( + "bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "security.protocol=PLAINTEXT", + "producer.security.protocol=PLAINTEXT", + "consumer.security.protocol=PLAINTEXT", + "admin.security.protocol=PLAINTEXT", + "plugin.path=/opt/kafka/plugins" + )); + } +} \ No newline at end of file diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ClusterTest.java index 0261e922ce..adc955743f 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ClusterTest.java @@ -90,6 +90,7 @@ import java.util.TreeMap; import static java.util.Collections.singletonMap; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; @@ -162,17 +163,20 @@ public class KafkaMirrorMaker2ClusterTest { { // we were setting metricsEnabled in fromCrd, which was just checking it for non-null. With metrics in CM, we have to check // its content, what is done in generateMetricsAndLogConfigMap - kmm2.generateMetricsAndLogConfigMap(new MetricsAndLogging(metricsCM, null)); + kmm2.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); } @ParallelTest public void testMetricsConfigMap() { - ConfigMap metricsCm = kmm2.generateMetricsAndLogConfigMap(new MetricsAndLogging(metricsCM, null)); + ConfigMap metricsCm = kmm2.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); checkMetricsConfigMap(metricsCm); } - private void checkMetricsConfigMap(ConfigMap metricsCm) { - assertThat(metricsCm.getData().get(MetricsModel.CONFIG_MAP_KEY), is(metricsCmJson)); + private void checkMetricsConfigMap(ConfigMap configMap) { + assertThat(configMap.getData().get(MetricsModel.CONFIG_MAP_KEY), is(metricsCmJson)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("bootstrap.servers=" + bootstrapServers)); + assertThat(connectConfigurations, containsString(expectedConfiguration.asPairs())); } private Map expectedLabels(String name) { @@ -197,9 +201,7 @@ private Map expectedLabels() { protected List getExpectedEnvVars() { List expected = new ArrayList<>(); - expected.add(new EnvVarBuilder().withName(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION).withValue(expectedConfiguration.asPairs()).build()); expected.add(new EnvVarBuilder().withName(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_METRICS_ENABLED).withValue(String.valueOf(true)).build()); - expected.add(new EnvVarBuilder().withName(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS).withValue(bootstrapServers).build()); expected.add(new EnvVarBuilder().withName(KafkaMirrorMaker2Cluster.ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED).withValue(Boolean.toString(JvmOptions.DEFAULT_GC_LOGGING_ENABLED)).build()); expected.add(new EnvVarBuilder().withName(AbstractModel.ENV_VAR_KAFKA_HEAP_OPTS).withValue(kafkaHeapOpts).build()); expected.add(new EnvVarBuilder().withName(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_MIRRORMAKER_2_CLUSTERS).withValue(targetClusterAlias).build()); @@ -385,7 +387,6 @@ public void testPodSetWithTls() { assertThat(cont.getVolumeMounts().get(3).getMountPath(), is(KafkaMirrorMaker2Cluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-another-secret")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS), is("my-secret/cert.crt;my-secret/new-cert.crt;my-another-secret/another-cert.crt")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS), is("true")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_CLUSTERS), is("true")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_MIRRORMAKER_2_TRUSTED_CERTS_CLUSTERS), is("target=my-secret/cert.crt;my-secret/new-cert.crt;my-another-secret/another-cert.crt")); @@ -413,7 +414,6 @@ public void testPodSetWithTlsWithoutCerts() { Container cont = pod.getSpec().getContainers().get(0); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS), is(nullValue())); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS), is("true")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_CLUSTERS), is("true")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_MIRRORMAKER_2_TRUSTED_CERTS_CLUSTERS), is(nullValue())); @@ -452,7 +452,6 @@ public void testPodSetWithTlsAuth() { assertThat(cont.getVolumeMounts().get(3).getMountPath(), is(KafkaMirrorMaker2Cluster.TLS_CERTS_BASE_VOLUME_MOUNT + "user-secret")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS_AUTH_CERT), is("user-secret/user.crt")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS_AUTH_KEY), is("user-secret/user.key")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont).get(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS), is("true")); }); } @@ -488,7 +487,7 @@ public void testPodSetWithTlsSameSecret() { PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(5)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); assertThat(pod.getSpec().getVolumes().get(3).getName(), is("target-my-secret")); assertThat(pod.getSpec().getVolumes().get(4).getName(), is("source-my-secret")); @@ -498,7 +497,7 @@ public void testPodSetWithTlsSameSecret() { assertThat(cont.getVolumeMounts().size(), is(5)); assertThat(cont.getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(cont.getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(cont.getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(cont.getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(cont.getVolumeMounts().get(2).getMountPath(), is(KafkaMirrorMaker2Cluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); @@ -571,7 +570,7 @@ public void testPodSetWithScramSha512AuthAndTLSSameSecret() { PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(4)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); assertThat(pod.getSpec().getVolumes().get(3).getName(), is("target-my-secret")); @@ -580,7 +579,7 @@ public void testPodSetWithScramSha512AuthAndTLSSameSecret() { assertThat(cont.getVolumeMounts().size(), is(6)); assertThat(cont.getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(cont.getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(cont.getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(cont.getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(cont.getVolumeMounts().get(2).getMountPath(), is(KafkaMirrorMaker2Cluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); @@ -594,7 +593,6 @@ public void testPodSetWithScramSha512AuthAndTLSSameSecret() { assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "scram-sha-512")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -634,7 +632,7 @@ public void testPodSetWithMultipleClustersScramSha512AuthAndTLSSameSecret() { PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(5)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); assertThat(pod.getSpec().getVolumes().get(3).getName(), is("target-my-secret")); assertThat(pod.getSpec().getVolumes().get(4).getName(), is("source-my-secret")); @@ -644,7 +642,7 @@ public void testPodSetWithMultipleClustersScramSha512AuthAndTLSSameSecret() { assertThat(cont.getVolumeMounts().size(), is(8)); assertThat(cont.getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(cont.getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(cont.getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(cont.getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(cont.getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); @@ -662,7 +660,6 @@ public void testPodSetWithMultipleClustersScramSha512AuthAndTLSSameSecret() { assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "scram-sha-512")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -728,7 +725,7 @@ public void testPodSetWithScramSha256AuthAndTLSSameSecret() { PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(4)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); assertThat(pod.getSpec().getVolumes().get(3).getName(), is("target-my-secret")); @@ -737,7 +734,7 @@ public void testPodSetWithScramSha256AuthAndTLSSameSecret() { assertThat(cont.getVolumeMounts().size(), is(6)); assertThat(cont.getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(cont.getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(cont.getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(cont.getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(cont.getVolumeMounts().get(2).getMountPath(), is(KafkaMirrorMaker2Cluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); @@ -751,7 +748,6 @@ public void testPodSetWithScramSha256AuthAndTLSSameSecret() { assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "scram-sha-256")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -791,7 +787,7 @@ public void testPodSetWithMultipleClustersScramSha256AuthAndTLSSameSecret() { PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(5)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); assertThat(pod.getSpec().getVolumes().get(3).getName(), is("target-my-secret")); assertThat(pod.getSpec().getVolumes().get(4).getName(), is("source-my-secret")); @@ -801,7 +797,7 @@ public void testPodSetWithMultipleClustersScramSha256AuthAndTLSSameSecret() { assertThat(cont.getVolumeMounts().size(), is(8)); assertThat(cont.getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(cont.getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(cont.getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(cont.getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(cont.getVolumeMounts().get(2).getMountPath(), is(KafkaConnectCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); @@ -819,7 +815,6 @@ public void testPodSetWithMultipleClustersScramSha256AuthAndTLSSameSecret() { assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "scram-sha-256")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -888,7 +883,7 @@ public void testPodSetWithPlainAuthAndTLSSameSecret() { PodSetUtils.podSetToPods(podSet).forEach(pod -> { assertThat(pod.getSpec().getVolumes().size(), is(4)); assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("my-secret")); assertThat(pod.getSpec().getVolumes().get(3).getName(), is("target-my-secret")); @@ -897,7 +892,7 @@ public void testPodSetWithPlainAuthAndTLSSameSecret() { assertThat(cont.getVolumeMounts().size(), is(6)); assertThat(cont.getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(cont.getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(cont.getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(cont.getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(cont.getVolumeMounts().get(2).getName(), is("my-secret")); assertThat(cont.getVolumeMounts().get(2).getMountPath(), is(KafkaMirrorMaker2Cluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-secret")); @@ -911,7 +906,6 @@ public void testPodSetWithPlainAuthAndTLSSameSecret() { assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_PASSWORD_FILE, "my-secret/user1.password")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_USERNAME, "user1")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM, "plain")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(cont), hasEntry(KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_TLS, "true")); }); } @@ -1074,7 +1068,7 @@ public void testTemplate() { assertThat(pod.getSpec().getVolumes().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(pod.getSpec().getVolumes().get(0).getEmptyDir(), is(notNullValue())); assertThat(pod.getSpec().getVolumes().get(0).getEmptyDir().getSizeLimit(), is(new Quantity("10Mi"))); - assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getVolumes().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getVolumes().get(1).getConfigMap().getName(), is("foo-mirrormaker2-config")); assertThat(pod.getSpec().getVolumes().get(2).getName(), is("rack-volume")); assertThat(pod.getSpec().getVolumes().get(2).getEmptyDir(), is(notNullValue())); @@ -1093,7 +1087,7 @@ public void testTemplate() { assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().size(), is(4)); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_VOLUME_NAME)); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(), is(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_MOUNT_PATH)); - assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(1).getName(), is("kafka-metrics-and-logging")); + assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(1).getName(), is("kafka-connect-configurations")); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(1).getMountPath(), is("/opt/kafka/custom-config/")); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(2).getName(), is("rack-volume")); assertThat(pod.getSpec().getContainers().get(0).getVolumeMounts().get(2).getMountPath(), is("/opt/kafka/init")); @@ -1665,44 +1659,6 @@ public void testKafkaMirrorMaker2ContainerEnvVars() { .map(EnvVar::getValue).findFirst().orElse("").equals(testEnvTwoValue), is(true)); } - @ParallelTest - public void testKafkaContainerEnvVarsConflict() { - ContainerEnvVar envVar1 = new ContainerEnvVar(); - String testEnvOneKey = KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION; - String testEnvOneValue = "test.env.one"; - envVar1.setName(testEnvOneKey); - envVar1.setValue(testEnvOneValue); - - ContainerEnvVar envVar2 = new ContainerEnvVar(); - String testEnvTwoKey = KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS; - String testEnvTwoValue = "test.env.two"; - envVar2.setName(testEnvTwoKey); - envVar2.setValue(testEnvTwoValue); - - List testEnvs = new ArrayList<>(); - testEnvs.add(envVar1); - testEnvs.add(envVar2); - ContainerTemplate kafkaMirrorMaker2Container = new ContainerTemplate(); - kafkaMirrorMaker2Container.setEnv(testEnvs); - - KafkaMirrorMaker2 resource = new KafkaMirrorMaker2Builder(this.resource) - .editSpec() - .withNewTemplate() - .withConnectContainer(kafkaMirrorMaker2Container) - .endTemplate() - .endSpec() - .build(); - - List kafkaEnvVars = KafkaMirrorMaker2Cluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER).getEnvVars(); - - assertThat("Failed to prevent over writing existing container environment variable: " + testEnvOneKey, - kafkaEnvVars.stream().filter(env -> testEnvOneKey.equals(env.getName())) - .map(EnvVar::getValue).findFirst().orElse("").equals(testEnvOneValue), is(false)); - assertThat("Failed to prevent over writing existing container environment variable: " + testEnvTwoKey, - kafkaEnvVars.stream().filter(env -> testEnvTwoKey.equals(env.getName())) - .map(EnvVar::getValue).findFirst().orElse("").equals(testEnvTwoValue), is(false)); - } - @ParallelTest public void testOpenTelemetryTracing() { KafkaMirrorMaker2Builder builder = new KafkaMirrorMaker2Builder(this.resource) @@ -1714,13 +1670,17 @@ public void testOpenTelemetryTracing() { KafkaMirrorMaker2Cluster kmm2 = KafkaMirrorMaker2Cluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); + // Check config map + ConfigMap configMap = kmm2.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); + String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME); + assertThat(connectConfigurations, containsString("consumer.interceptor.classes=" + OpenTelemetryTracing.CONSUMER_INTERCEPTOR_CLASS_NAME)); + assertThat(connectConfigurations, containsString("producer.interceptor.classes=" + OpenTelemetryTracing.PRODUCER_INTERCEPTOR_CLASS_NAME)); + // Check PodSet StrimziPodSet podSet = kmm2.generatePodSet(3, Map.of(), Map.of(), false, null, null, null); PodSetUtils.podSetToPods(podSet).forEach(pod -> { Container cont = pod.getSpec().getContainers().get(0); assertThat(cont.getEnv().stream().filter(env -> KafkaMirrorMaker2Cluster.ENV_VAR_STRIMZI_TRACING.equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").equals(OpenTelemetryTracing.TYPE_OPENTELEMETRY), is(true)); - assertThat(cont.getEnv().stream().filter(env -> KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION.equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("consumer.interceptor.classes=" + OpenTelemetryTracing.CONSUMER_INTERCEPTOR_CLASS_NAME), is(true)); - assertThat(cont.getEnv().stream().filter(env -> KafkaMirrorMaker2Cluster.ENV_VAR_KAFKA_CONNECT_CONFIGURATION.equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("producer.interceptor.classes=" + OpenTelemetryTracing.PRODUCER_INTERCEPTOR_CLASS_NAME), is(true)); }); } @@ -2107,7 +2067,7 @@ public void testNetworkPolicy() { KafkaMirrorMaker2 resource = new KafkaMirrorMaker2Builder(this.resourceWithMetrics) .build(); KafkaMirrorMaker2Cluster kc = KafkaMirrorMaker2Cluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); - kc.generateMetricsAndLogConfigMap(new MetricsAndLogging(metricsCM, null)); + kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); NetworkPolicy np = kc.generateNetworkPolicy(true, "operator-namespace", null); @@ -2131,7 +2091,7 @@ public void testNetworkPolicyWithConnectorOperatorSameNamespace() { KafkaMirrorMaker2 resource = new KafkaMirrorMaker2Builder(this.resourceWithMetrics) .build(); KafkaMirrorMaker2Cluster kc = KafkaMirrorMaker2Cluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); - kc.generateMetricsAndLogConfigMap(new MetricsAndLogging(metricsCM, null)); + kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); NetworkPolicy np = kc.generateNetworkPolicy(true, namespace, null); @@ -2154,7 +2114,7 @@ public void testNetworkPolicyWithConnectorOperatorWithNamespaceLabels() { KafkaMirrorMaker2 resource = new KafkaMirrorMaker2Builder(this.resourceWithMetrics) .build(); KafkaMirrorMaker2Cluster kc = KafkaMirrorMaker2Cluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, VERSIONS, SHARED_ENV_PROVIDER); - kc.generateMetricsAndLogConfigMap(new MetricsAndLogging(metricsCM, null)); + kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null)); NetworkPolicy np = kc.generateNetworkPolicy(true, "operator-namespace", Labels.fromMap(Collections.singletonMap("nsLabelKey", "nsLabelValue"))); diff --git a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh deleted file mode 100755 index 86d5242a9b..0000000000 --- a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env bash -set -e - -SECURITY_PROTOCOL=PLAINTEXT - -if [ "$KAFKA_CONNECT_TLS" = "true" ]; then - SECURITY_PROTOCOL="SSL" - - if [ -n "$KAFKA_CONNECT_TRUSTED_CERTS" ]; then - TLS_CONFIGURATION=$(cat < exceptedConfig = StUtils.loadProperties("group.id=" + KafkaConnectResources.componentName(testStorage.getClusterName()) + "\n" + + final Map exceptedConfig = StUtils.loadProperties("bootstrap.servers=" + KafkaResources.tlsBootstrapAddress(testStorage.getClusterName()) + "\n" + + "group.id=" + KafkaConnectResources.componentName(testStorage.getClusterName()) + "\n" + "key.converter=org.apache.kafka.connect.json.JsonConverter\n" + "value.converter=org.apache.kafka.connect.json.JsonConverter\n" + "config.storage.replication.factor=-1\n" + @@ -175,12 +172,12 @@ void testDeployRollUndeploy() { StrimziPodSetUtils.annotateStrimziPodSet(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), Collections.singletonMap(Annotations.ANNO_STRIMZI_IO_MANUAL_ROLLING_UPDATE, "true")); RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getKafkaConnectSelector(), connectReplicasCount, connectPodsSnapshot); - final String podName = PodUtils.getPodNameByPrefix(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName())); - final String kafkaPodJson = ReadWriteUtils.writeObjectToJsonString(kubeClient(testStorage.getNamespaceName()).getPod(podName)); + LOGGER.info("Verifying configurations in config map"); + ConfigMap configMap = kubeClient().namespace(testStorage.getNamespaceName()).getConfigMap(KafkaConnectResources.metricsAndLogConfigMapName(testStorage.getClusterName())); + String connectConfigurations = configMap.getData().get("kafka-connect.properties"); + Map config = StUtils.loadProperties(connectConfigurations); + assertThat(config.entrySet().containsAll(exceptedConfig.entrySet()), is(true)); - assertThat(kafkaPodJson, hasJsonPath(StUtils.globalVariableJsonPathBuilder(0, "KAFKA_CONNECT_BOOTSTRAP_SERVERS"), - hasItem(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName())))); - assertThat(StUtils.getPropertiesFromJson(0, kafkaPodJson, "KAFKA_CONNECT_CONFIGURATION"), is(exceptedConfig)); VerificationUtils.verifyClusterOperatorConnectDockerImage(clusterOperator.getDeploymentNamespace(), testStorage.getNamespaceName(), testStorage.getClusterName()); VerificationUtils.verifyPodsLabels(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), testStorage.getKafkaConnectSelector()); @@ -817,12 +814,10 @@ void testConnectorTaskAutoRestart() { ) void testCustomAndUpdatedValues() { final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - final String usedVariable = "KAFKA_CONNECT_CONFIGURATION"; LinkedHashMap envVarGeneral = new LinkedHashMap<>(); envVarGeneral.put("TEST_ENV_1", "test.env.one"); envVarGeneral.put("TEST_ENV_2", "test.env.two"); - envVarGeneral.put(usedVariable, "test.value"); LinkedHashMap envVarUpdated = new LinkedHashMap<>(); envVarUpdated.put("TEST_ENV_2", "updated.test.env.two"); @@ -875,18 +870,11 @@ void testCustomAndUpdatedValues() { Map connectSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getKafkaConnectSelector()); // Remove variable which is already in use - envVarGeneral.remove(usedVariable); LOGGER.info("Verifying values before update"); VerificationUtils.verifyReadinessAndLivenessProbes(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), KafkaConnectResources.componentName(testStorage.getClusterName()), initialDelaySeconds, timeoutSeconds, periodSeconds, successThreshold, failureThreshold); VerificationUtils.verifyContainerEnvVariables(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), KafkaConnectResources.componentName(testStorage.getClusterName()), envVarGeneral); - LOGGER.info("Check if actual env variable {} has different value than {}", usedVariable, "test.value"); - assertThat( - StUtils.checkEnvVarInPod(testStorage.getNamespaceName(), kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName())).get(0).getMetadata().getName(), usedVariable), - is(not("test.value")) - ); - LOGGER.info("Updating values in Connect container"); KafkaConnectResource.replaceKafkaConnectResourceInSpecificNamespace(testStorage.getNamespaceName(), testStorage.getClusterName(), kc -> { @@ -908,7 +896,12 @@ void testCustomAndUpdatedValues() { VerificationUtils.verifyReadinessAndLivenessProbes(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), KafkaConnectResources.componentName(testStorage.getClusterName()), updatedInitialDelaySeconds, updatedTimeoutSeconds, updatedPeriodSeconds, successThreshold, updatedFailureThreshold); VerificationUtils.verifyContainerEnvVariables(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), KafkaConnectResources.componentName(testStorage.getClusterName()), envVarUpdated); - VerificationUtils.verifyComponentConfiguration(testStorage.getNamespaceName(), KafkaConnectResources.componentName(testStorage.getClusterName()), KafkaConnectResources.componentName(testStorage.getClusterName()), "KAFKA_CONNECT_CONFIGURATION", connectConfig); + + LOGGER.info("Verifying configurations in config map after update"); + ConfigMap configMap = kubeClient().namespace(testStorage.getNamespaceName()).getConfigMap(KafkaConnectResources.metricsAndLogConfigMapName(testStorage.getClusterName())); + String connectConfigurations = configMap.getData().get("kafka-connect.properties"); + Map config = StUtils.loadProperties(connectConfigurations); + assertThat(config.entrySet().containsAll(connectConfig.entrySet()), is(true)); } @ParallelNamespaceTest @@ -940,7 +933,7 @@ void testMultiNodeKafkaConnectWithConnectorCreation() { KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build() ); resourceManager.createResourceWithWait(KafkaTemplates.kafka(testStorage.getNamespaceName(), testStorage.getClusterName(), 3).build()); - // Crate connect cluster with default connect image + // Create connect cluster with default connect image resourceManager.createResourceWithWait(KafkaConnectTemplates.kafkaConnectWithFilePlugin(testStorage.getNamespaceName(), testStorage.getClusterName(), 3) .editMetadata() .addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")