diff --git a/.checkstyle/suppressions.xml b/.checkstyle/suppressions.xml
index 0f667867c12..9ca6217de78 100644
--- a/.checkstyle/suppressions.xml
+++ b/.checkstyle/suppressions.xml
@@ -21,6 +21,9 @@
+
+
diff --git a/Makefile b/Makefile
index 808c58c65e9..28517d92165 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ RELEASE_VERSION ?= latest
CHART_PATH ?= ./helm-charts/strimzi-kafka-operator/
CHART_SEMANTIC_RELEASE_VERSION ?= $(shell cat ./release.version | tr A-Z a-z)
-SUBDIRS=docker-images crd-generator api certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init helm-charts examples
+SUBDIRS=docker-images test crd-generator api certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init helm-charts examples
DOCKER_TARGETS=docker_build docker_push docker_tag
all: $(SUBDIRS)
diff --git a/api/pom.xml b/api/pom.xml
index c3afbe22f1e..0a756447b52 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -70,6 +70,10 @@
junit
junit
+
+ io.strimzi
+ test
+
io.strimzi
crd-generator
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthentication.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthentication.java
index 620d59f4083..ded83bc2918 100644
--- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthentication.java
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthentication.java
@@ -4,25 +4,25 @@
*/
package io.strimzi.api.kafka.model;
-import io.strimzi.crdgenerator.annotations.Description;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.strimzi.crdgenerator.annotations.Description;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
/**
- * Configures the broker authorization
+ * Configures listener authentication.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = KafkaListenerAuthenticationTls.TYPE_TLS, value = KafkaListenerAuthenticationTls.class),
+ @JsonSubTypes.Type(name = KafkaListenerAuthenticationScramSha512.SCRAM_SHA_512, value = KafkaListenerAuthenticationScramSha512.class),
})
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class KafkaListenerAuthentication implements Serializable {
@@ -31,7 +31,7 @@ public abstract class KafkaListenerAuthentication implements Serializable {
private Map additionalProperties;
@Description("Authentication type. " +
- "Currently the only supported type is `tls`. " +
+ "`scram-sha-512` type uses SASL SCRAM-SHA-512 Authentication. " +
"`tls` type uses TLS Client Authentication. " +
"`tls` type is supported only on TLS listeners.")
@JsonIgnore
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationScramSha512.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationScramSha512.java
new file mode 100644
index 00000000000..8dcf1acc57f
--- /dev/null
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationScramSha512.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.api.kafka.model;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.strimzi.crdgenerator.annotations.Description;
+import io.sundr.builder.annotations.Buildable;
+
+/**
+ * Configures a listener to use SASL SCRAM-SHA-512 for authentication.
+ */
+@Buildable(
+ editableEnabled = false,
+ generateBuilderPackage = false,
+ builderPackage = "io.fabric8.kubernetes.api.builder"
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class KafkaListenerAuthenticationScramSha512 extends KafkaListenerAuthentication {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SCRAM_SHA_512 = "scram-sha-512";
+
+ @Description("Must be `" + SCRAM_SHA_512 + "`")
+ @Override
+ public String getType() {
+ return SCRAM_SHA_512;
+ }
+}
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationTls.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationTls.java
index 9292fd6046c..1d4b4821047 100644
--- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationTls.java
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerAuthenticationTls.java
@@ -10,7 +10,7 @@
import io.sundr.builder.annotations.Buildable;
/**
- * Configures the broker authorization
+ * Configures a listener to use mutual TLS authentication.
*/
@Buildable(
editableEnabled = false,
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerPlain.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerPlain.java
index d0e7ab6c436..04c11458534 100644
--- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerPlain.java
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerPlain.java
@@ -4,15 +4,16 @@
*/
package io.strimzi.api.kafka.model;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonInclude;
+import io.strimzi.crdgenerator.annotations.Description;
import io.sundr.builder.annotations.Buildable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
import static java.util.Collections.emptyMap;
/**
@@ -29,6 +30,18 @@ public class KafkaListenerPlain implements Serializable {
private Map additionalProperties;
+ private KafkaListenerAuthentication authentication;
+
+ @Description("Authentication configuration for this listener. " +
+ "Since this listener does not use TLS transport you cannot configure an authentication with `type: tls`.")
+ public KafkaListenerAuthentication getAuthentication() {
+ return authentication;
+ }
+
+ public void setAuthentication(KafkaListenerAuthentication authentication) {
+ this.authentication = authentication;
+ }
+
@JsonAnyGetter
public Map getAdditionalProperties() {
return this.additionalProperties != null ? this.additionalProperties : emptyMap();
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerTls.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerTls.java
index 2ddd3c4736c..6d12f8c605a 100644
--- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerTls.java
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaListenerTls.java
@@ -4,17 +4,17 @@
*/
package io.strimzi.api.kafka.model;
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
import io.strimzi.crdgenerator.annotations.Description;
+import io.sundr.builder.annotations.Buildable;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import com.fasterxml.jackson.annotation.JsonAnyGetter;
-import com.fasterxml.jackson.annotation.JsonAnySetter;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import io.sundr.builder.annotations.Buildable;
-
import static java.util.Collections.emptyMap;
/**
@@ -29,17 +29,18 @@
public class KafkaListenerTls implements Serializable {
private static final long serialVersionUID = 1L;
- private KafkaListenerAuthentication serverAuthentication;
+ private KafkaListenerAuthentication auth;
private Map additionalProperties;
- @Description("Authorization configuration for Kafka brokers")
+ @Description("Authentication configuration for this listener.")
@JsonInclude(JsonInclude.Include.NON_NULL)
- public KafkaListenerAuthentication getAuthentication() {
- return serverAuthentication;
+ @JsonProperty("authentication")
+ public KafkaListenerAuthentication getAuth() {
+ return auth;
}
- public void setAuthentication(KafkaListenerAuthentication serverAuthentication) {
- this.serverAuthentication = serverAuthentication;
+ public void setAuth(KafkaListenerAuthentication auth) {
+ this.auth = auth;
}
@JsonAnyGetter
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserAuthentication.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserAuthentication.java
index 0af3c63a1eb..fa5e7d48f0c 100644
--- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserAuthentication.java
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserAuthentication.java
@@ -4,22 +4,22 @@
*/
package io.strimzi.api.kafka.model;
-import io.strimzi.crdgenerator.annotations.Description;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.strimzi.crdgenerator.annotations.Description;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = KafkaUserTlsClientAuthentication.TYPE_TLS, value = KafkaUserTlsClientAuthentication.class),
+ @JsonSubTypes.Type(name = KafkaUserScramSha512ClientAuthentication.TYPE_SCRAM_SHA_512, value = KafkaUserScramSha512ClientAuthentication.class),
})
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class KafkaUserAuthentication implements Serializable {
@@ -27,8 +27,7 @@ public abstract class KafkaUserAuthentication implements Serializable {
private Map additionalProperties;
- @Description("Authentication type. " +
- "Currently the only supported type is `tls` for TLS Client Authentication.")
+ @Description("Authentication type.")
@JsonIgnore
public abstract String getType();
diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserScramSha512ClientAuthentication.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserScramSha512ClientAuthentication.java
new file mode 100644
index 00000000000..26644fd968f
--- /dev/null
+++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaUserScramSha512ClientAuthentication.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.api.kafka.model;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.strimzi.crdgenerator.annotations.Description;
+import io.sundr.builder.annotations.Buildable;
+
+@Buildable(
+ editableEnabled = false,
+ generateBuilderPackage = false,
+ builderPackage = "io.fabric8.kubernetes.api.builder"
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class KafkaUserScramSha512ClientAuthentication extends KafkaUserAuthentication {
+ private static final long serialVersionUID = 1L;
+
+ public static final String TYPE_SCRAM_SHA_512 = "scram-sha-512";
+
+ @Description("Must be `" + TYPE_SCRAM_SHA_512 + "`")
+ @Override
+ public String getType() {
+ return TYPE_SCRAM_SHA_512;
+ }
+
+
+}
diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.out.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.out.yaml
index 620c960fc49..70a83a682fe 100644
--- a/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.out.yaml
+++ b/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.out.yaml
@@ -14,7 +14,9 @@ spec:
size: "500Gi"
deleteClaim: false
listeners:
- plain: {}
+ plain:
+ authentication:
+ type: "scram-sha-512"
tls:
authentication:
type: "tls"
diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.yaml
index cc2fd92feb8..8792f73ff83 100644
--- a/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.yaml
+++ b/api/src/test/resources/io/strimzi/api/kafka/model/Kafka.yaml
@@ -37,7 +37,9 @@ spec:
config:
min.insync.replicas: 3
listeners:
- plain: {}
+ plain:
+ authentication:
+ type: scram-sha-512
tls:
authentication:
type: tls
diff --git a/certificate-manager/pom.xml b/certificate-manager/pom.xml
index 5b291003bad..e3579753235 100644
--- a/certificate-manager/pom.xml
+++ b/certificate-manager/pom.xml
@@ -40,10 +40,7 @@
io.strimzi
- api
- tests
- test-jar
- test
+ test
diff --git a/cluster-operator/pom.xml b/cluster-operator/pom.xml
index 132018807b1..51d86aed865 100644
--- a/cluster-operator/pom.xml
+++ b/cluster-operator/pom.xml
@@ -94,10 +94,7 @@
io.strimzi
- api
- tests
- test-jar
- test
+ test
io.strimzi
diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/InvalidConfigParameterException.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/InvalidConfigParameterException.java
index eb163ab9670..7aa262014f6 100644
--- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/InvalidConfigParameterException.java
+++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/InvalidConfigParameterException.java
@@ -5,7 +5,9 @@
package io.strimzi.operator.cluster;
-public class InvalidConfigParameterException extends RuntimeException {
+import io.strimzi.operator.cluster.model.InvalidResourceException;
+
+public class InvalidConfigParameterException extends InvalidResourceException {
private String key;
public InvalidConfigParameterException(String key, String message) {
diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/InvalidResourceException.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/InvalidResourceException.java
new file mode 100644
index 00000000000..6cfd4ec56b1
--- /dev/null
+++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/InvalidResourceException.java
@@ -0,0 +1,14 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.operator.cluster.model;
+
+public class InvalidResourceException extends RuntimeException {
+ public InvalidResourceException() {
+ super();
+ }
+
+ protected InvalidResourceException(String s) {
+ }
+}
diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java
index 9404102da82..2daf6f56d16 100644
--- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java
+++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java
@@ -71,8 +71,13 @@ public class KafkaCluster extends AbstractModel {
protected static final String RACK_VOLUME_MOUNT = "/opt/kafka/rack";
private static final String ENV_VAR_KAFKA_INIT_RACK_TOPOLOGY_KEY = "RACK_TOPOLOGY_KEY";
private static final String ENV_VAR_KAFKA_INIT_NODE_NAME = "NODE_NAME";
+ /** {@code TRUE} when the CLIENT listener (PLAIN transport) should be enabled*/
private static final String ENV_VAR_KAFKA_CLIENT_ENABLED = "KAFKA_CLIENT_ENABLED";
+ /** The authentication to configure for the CLIENT listener (PLAIN transport). */
+ private static final String ENV_VAR_KAFKA_CLIENT_AUTHENTICATION = "KAFKA_CLIENT_AUTHENTICATION";
+ /** {@code TRUE} when the CLIENTTLS listener (TLS transport) should be enabled*/
private static final String ENV_VAR_KAFKA_CLIENTTLS_ENABLED = "KAFKA_CLIENTTLS_ENABLED";
+ /** The authentication to configure for the CLIENTTLS listener (TLS transport) . */
private static final String ENV_VAR_KAFKA_CLIENTTLS_AUTHENTICATION = "KAFKA_CLIENTTLS_AUTHENTICATION";
private static final String ENV_VAR_KAFKA_AUTHORIZATION_TYPE = "KAFKA_AUTHORIZATION_TYPE";
private static final String ENV_VAR_KAFKA_AUTHORIZATION_SUPER_USERS = "KAFKA_AUTHORIZATION_SUPER_USERS";
@@ -243,7 +248,14 @@ public static KafkaCluster fromCrd(CertManager certManager, Kafka kafkaAssembly,
result.generateCertificates(certManager, secrets);
result.setTlsSidecar(kafkaClusterSpec.getTlsSidecar());
- result.setListeners(kafkaClusterSpec.getListeners());
+ KafkaListeners listeners = kafkaClusterSpec.getListeners();
+ if (listeners != null) {
+ if (listeners.getPlain() != null
+ && listeners.getPlain().getAuthentication() instanceof KafkaListenerAuthenticationTls) {
+ throw new InvalidResourceException("You cannot configure TLS authentication on a plain listener.");
+ }
+ }
+ result.setListeners(listeners);
result.setAuthorization(kafkaClusterSpec.getAuthorization());
return result;
@@ -619,13 +631,17 @@ protected List getEnvVars() {
if (listeners != null) {
if (listeners.getPlain() != null) {
varList.add(buildEnvVar(ENV_VAR_KAFKA_CLIENT_ENABLED, "TRUE"));
+
+ if (listeners.getPlain().getAuthentication() != null) {
+ varList.add(buildEnvVar(ENV_VAR_KAFKA_CLIENT_AUTHENTICATION, listeners.getPlain().getAuthentication().getType()));
+ }
}
if (listeners.getTls() != null) {
varList.add(buildEnvVar(ENV_VAR_KAFKA_CLIENTTLS_ENABLED, "TRUE"));
- if (listeners.getTls().getAuthentication() != null && KafkaListenerAuthenticationTls.TYPE_TLS.equals(listeners.getTls().getAuthentication().getType())) {
- varList.add(buildEnvVar(ENV_VAR_KAFKA_CLIENTTLS_AUTHENTICATION, KafkaListenerAuthenticationTls.TYPE_TLS));
+ if (listeners.getTls().getAuth() != null) {
+ varList.add(buildEnvVar(ENV_VAR_KAFKA_CLIENTTLS_AUTHENTICATION, listeners.getTls().getAuth().getType()));
}
}
}
diff --git a/docker-images/kafka/scripts/kafka_config_generator.sh b/docker-images/kafka/scripts/kafka_config_generator.sh
index 0f33462f49b..5981dcbb1b1 100755
--- a/docker-images/kafka/scripts/kafka_config_generator.sh
+++ b/docker-images/kafka/scripts/kafka_config_generator.sh
@@ -6,17 +6,29 @@
LISTENERS="REPLICATION://0.0.0.0:9091"
ADVERTISED_LISTENERS="REPLICATION://$(hostname -f):9091"
LISTENER_SECURITY_PROTOCOL_MAP="REPLICATION:SSL"
+SASL_ENABLED_MECHANISMS=""
if [ "$KAFKA_CLIENT_ENABLED" = "TRUE" ]; then
LISTENERS="${LISTENERS},CLIENT://0.0.0.0:9092"
ADVERTISED_LISTENERS="${ADVERTISED_LISTENERS},CLIENT://$(hostname -f):9092"
- LISTENER_SECURITY_PROTOCOL_MAP="${LISTENER_SECURITY_PROTOCOL_MAP},CLIENT:PLAINTEXT"
+
+ if [ "$KAFKA_CLIENT_AUTHENTICATION" = "scram-sha-512" ]; then
+ SASL_ENABLED_MECHANISMS="SCRAM-SHA-512\n$SASL_ENABLED_MECHANISMS"
+ LISTENER_SECURITY_PROTOCOL_MAP="${LISTENER_SECURITY_PROTOCOL_MAP},CLIENT:SASL_PLAINTEXT"
+ CLIENT_LISTENER=$(cat < verifyArgs) throws IOException, InterruptedException {
+ if (verifyArgs.isEmpty() || !new File(verifyArgs.get(0)).canExecute()) {
+ throw new RuntimeException("Command " + verifyArgs + " lacks an executable arg[0]");
+ }
+
+ ProcessBuilder pb = new ProcessBuilder(verifyArgs);
+ // If we redirect stderr to stdout we could break clients which parse the output because the
+ // characters will be jumbled.
+ // Reading two pipes without deadlocking on the blocking is difficult, so let's just write stderr to a file.
+ File stdout = createTmpFile(".out");
+ File stderr = createTmpFile(".err");
+ pb.redirectError(stderr);
+ pb.redirectOutput(stdout);
+ Process p = pb.start();
+ LOGGER.info("Started process {} with command line {}", p, verifyArgs);
+ p.getOutputStream().close();
+ int exitCode = p.waitFor();
+ // TODO timeout on wait
+ LOGGER.info("Process {}: exited with status {}", p, exitCode);
+ return new ProcessResult(p, stdout, stderr);
+ }
+
+ public static File createTmpFile(String suffix) throws IOException {
+ File tmpFile = File.createTempFile(ProcessHelper.class.getName(), suffix);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Created temporary file {}", tmpFile);
+ }
+ tmpFile.deleteOnExit();
+ return tmpFile;
+ }
+
+ public static void delete(File file) {
+ if (!file.delete()) {
+ LOGGER.warn("Unable to delete temporary file {}", file);
+ }
+ }
+
+ public static class ProcessResult implements AutoCloseable {
+ private final File stdout;
+ private final File stderr;
+ private final Process process;
+
+ ProcessResult(Process process, File stdout, File stderr) {
+ this.process = process;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ }
+
+ public int exitCode() {
+ return this.process.exitValue();
+ }
+
+ public File standardOutput() {
+ return stdout;
+ }
+
+ public File standardError() {
+ return stderr;
+ }
+
+ @Override
+ public void close() {
+ delete(stdout);
+ delete(stderr);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 017ba7e1a44..f9b668d31e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
+ test
crd-generator
api
operator-common
@@ -59,6 +60,12 @@
+
+ io.strimzi
+ test
+ ${project.version}
+ test
+
io.strimzi
api
diff --git a/systemtest/pom.xml b/systemtest/pom.xml
index 8fed914af38..0b296584906 100644
--- a/systemtest/pom.xml
+++ b/systemtest/pom.xml
@@ -54,6 +54,10 @@
junit
junit
+
+ io.strimzi
+ test
+
io.strimzi
api
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java b/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java
index dcad6afed48..65ead8f8403 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java
@@ -11,12 +11,18 @@
import io.fabric8.kubernetes.api.model.JobStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
+import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.strimzi.api.kafka.model.KafkaClusterSpec;
+import io.strimzi.api.kafka.model.KafkaListenerAuthenticationScramSha512;
import io.strimzi.api.kafka.model.KafkaListenerAuthenticationTls;
+import io.strimzi.api.kafka.model.KafkaListenerPlain;
import io.strimzi.api.kafka.model.KafkaListenerTls;
import io.strimzi.api.kafka.model.KafkaTopic;
+import io.strimzi.api.kafka.model.KafkaUser;
+import io.strimzi.api.kafka.model.KafkaUserScramSha512ClientAuthentication;
+import io.strimzi.api.kafka.model.KafkaUserTlsClientAuthentication;
import io.strimzi.api.kafka.model.ZookeeperClusterSpec;
import io.strimzi.test.ClusterOperator;
import io.strimzi.test.JUnitGroup;
@@ -35,6 +41,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -58,6 +65,7 @@
import static io.strimzi.test.TestUtils.fromYamlString;
import static io.strimzi.test.TestUtils.indent;
import static io.strimzi.test.TestUtils.map;
+import static io.strimzi.test.TestUtils.toYamlString;
import static io.strimzi.test.TestUtils.waitFor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.assertTrue;
@@ -82,6 +90,8 @@ public class KafkaST extends AbstractST {
static KubernetesClient client = new DefaultKubernetesClient();
+ private Random rng = new Random();
+
@Test
@JUnitGroup(name = "regression")
@OpenShiftOnly
@@ -300,16 +310,17 @@ public void testCustomAndUpdatedValues() {
* Test sending messages over plain transport, without auth
*/
@Test
- @JUnitGroup(name = "regression")
+ @JUnitGroup(name = "acceptance")
public void testSendMessagesPlainAnonymous() throws InterruptedException {
String name = "send-messages-plain-anon";
int messagesCount = 20;
+ String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
resources().kafkaEphemeral(CLUSTER_NAME, 3).done();
- resources().topic(CLUSTER_NAME, TOPIC_NAME).done();
+ resources().topic(CLUSTER_NAME, topicName).done();
// Create ping job
- Job job = waitForJobSuccess(pingJob(name, TOPIC_NAME, messagesCount, null, false));
+ Job job = waitForJobSuccess(pingJob(name, topicName, messagesCount, null, false));
// Now get the pod logs (which will be both producer and consumer logs)
checkPings(messagesCount, job);
@@ -319,15 +330,16 @@ public void testSendMessagesPlainAnonymous() throws InterruptedException {
* Test sending messages over tls transport using mutual tls auth
*/
@Test
- @JUnitGroup(name = "acceptance")
+ @JUnitGroup(name = "regression")
public void testSendMessagesTlsAuthenticated() {
String kafkaUser = "my-user";
String name = "send-messages-tls-auth";
int messagesCount = 20;
+ String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
KafkaListenerAuthenticationTls auth = new KafkaListenerAuthenticationTls();
KafkaListenerTls listenerTls = new KafkaListenerTls();
- listenerTls.setAuthentication(auth);
+ listenerTls.setAuth(auth);
// Use a Kafka with plain listener disabled
resources().kafka(resources().defaultKafka(CLUSTER_NAME, 3)
@@ -340,11 +352,103 @@ public void testSendMessagesTlsAuthenticated() {
.endListeners()
.endKafka()
.endSpec().build()).done();
- resources().topic(CLUSTER_NAME, TOPIC_NAME).done();
- resources().tlsUser(kafkaUser).done();
+ resources().topic(CLUSTER_NAME, topicName).done();
+ KafkaUser user = resources().tlsUser(kafkaUser).done();
+ waitTillSecretExists(kafkaUser);
+
+ // Create ping job
+ Job job = waitForJobSuccess(pingJob(name, topicName, messagesCount, user, true));
+
+ // Now check the pod logs the messages were produced and consumed
+ checkPings(messagesCount, job);
+ }
+
+ /**
+ * Test sending messages over plain transport using scram sha auth
+ */
+ @Test
+ @JUnitGroup(name = "regression")
+ public void testSendMessagesPlainScramSha() {
+ String kafkaUser = "my-user";
+ String name = "send-messages-plain-scram-sha";
+ int messagesCount = 20;
+ String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
+
+ KafkaListenerAuthenticationScramSha512 auth = new KafkaListenerAuthenticationScramSha512();
+ KafkaListenerPlain listenerTls = new KafkaListenerPlain();
+ listenerTls.setAuthentication(auth);
+
+ // Use a Kafka with plain listener disabled
+ resources().kafka(resources().defaultKafka(CLUSTER_NAME, 1)
+ .editSpec()
+ .editKafka()
+ .withNewListeners()
+ .withPlain(listenerTls)
+ .endListeners()
+ .endKafka()
+ .endSpec().build()).done();
+ resources().topic(CLUSTER_NAME, topicName).done();
+ KafkaUser user = resources().scramShaUser(kafkaUser).done();
+ waitTillSecretExists(kafkaUser);
+ String brokerPodLog = podLog(CLUSTER_NAME + "-kafka-0", "kafka");
+ Pattern p = Pattern.compile("^.*" + Pattern.quote(kafkaUser) + ".*$", Pattern.MULTILINE);
+ Matcher m = p.matcher(brokerPodLog);
+ boolean found = false;
+ while (m.find()) {
+ found = true;
+ LOGGER.info("Broker pod log line about user {}: {}", kafkaUser, m.group());
+ }
+ if (!found) {
+ LOGGER.warn("No broker pod log lines about user {}", kafkaUser);
+ LOGGER.info("Broker pod log:\n----\n{}\n----\n", brokerPodLog);
+ }
+
+ // Create ping job
+ Job job = waitForJobSuccess(pingJob(name, topicName, messagesCount, user, false));
+
+ // Now check the pod logs the messages were produced and consumed
+ checkPings(messagesCount, job);
+ }
+
+ private void waitTillSecretExists(String secretName) {
+ waitFor("secret " + secretName + " exists", 5000, 300000,
+ () -> namespacedClient().secrets().withName(secretName).get() != null);
+ try {
+ Thread.sleep(60000L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Test sending messages over tls transport using scram sha auth
+ */
+ @Test
+ @JUnitGroup(name = "regression")
+ public void testSendMessagesTlsScramSha() {
+ String kafkaUser = "my-user";
+ String name = "send-messages-tls-scram-sha";
+ int messagesCount = 20;
+ String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
+
+ KafkaListenerTls listenerTls = new KafkaListenerTls();
+ listenerTls.setAuth(new KafkaListenerAuthenticationScramSha512());
+
+ // Use a Kafka with plain listener disabled
+ resources().kafka(resources().defaultKafka(CLUSTER_NAME, 3)
+ .editSpec()
+ .editKafka()
+ .withNewListeners()
+ .withNewTls().withAuth(new KafkaListenerAuthenticationScramSha512()).endTls()
+ .endListeners()
+ .endKafka()
+ .endSpec().build()).done();
+ resources().topic(CLUSTER_NAME, topicName).done();
+ KafkaUser user = resources().scramShaUser(kafkaUser).done();
+ waitTillSecretExists(kafkaUser);
// Create ping job
- Job job = waitForJobSuccess(pingJob(name, TOPIC_NAME, messagesCount, kafkaUser, true));
+ Job job = waitForJobSuccess(pingJob(name, topicName, messagesCount, user, true));
// Now check the pod logs the messages were produced and consumed
checkPings(messagesCount, job);
@@ -355,9 +459,20 @@ private String podLog(String podName) {
return namespacedClient().pods().withName(podName).getLog();
}
+ private String podLog(String podName, String containerId) {
+ return namespacedClient().pods().withName(podName).inContainer(containerId).getLog();
+ }
+
/** Get the name of the pod for a job */
private String jobPodName(Job job) {
- Map labels = job.getSpec().getTemplate().getMetadata().getLabels();
+ return podNameWithLabels(job.getSpec().getTemplate().getMetadata().getLabels());
+ }
+
+ private String userOperatorPodName() {
+ return podNameWithLabels(Collections.singletonMap("strimzi.io/name", CLUSTER_NAME + "-entity-operator"));
+ }
+
+ private String podNameWithLabels(Map labels) {
List pods = namespacedClient().pods().withLabels(labels).list().getItems();
if (pods.size() != 1) {
fail("There are " + pods.size() + " pods with labels " + labels);
@@ -417,28 +532,34 @@ private Job waitForJobSuccess(Job job) {
} else if (status.getSucceeded() != null && status.getSucceeded() == 1) {
LOGGER.debug("Poll job succeeded");
return true;
- } else if (status.getActive() > 0) {
+ } else if (status.getActive() != null && status.getActive() > 0) {
LOGGER.debug("Poll job has active");
return false;
}
}
- throw new RuntimeException("Unexpected state");
+ LOGGER.debug("Poll job in indeterminate state");
+ return false;
});
return job;
} catch (TimeoutException e) {
LOGGER.info("Original Job: {}", job);
try {
- LOGGER.info("Job: {}", namespacedClient().extensions().jobs().withName(job.getMetadata().getName()).get());
+ LOGGER.info("Job: {}", indent(toYamlString(namespacedClient().extensions().jobs().withName(job.getMetadata().getName()).get())));
} catch (Exception | AssertionError t) {
LOGGER.info("Job not available: {}", t.getMessage());
}
try {
- LOGGER.info("Pod: {}", TestUtils.toYamlString(namespacedClient().pods().withName(jobPodName(job)).get()));
+ LOGGER.info("Pod: {}", indent(TestUtils.toYamlString(namespacedClient().pods().withName(jobPodName(job)).get())));
} catch (Exception | AssertionError t) {
LOGGER.info("Pod not available: {}", t.getMessage());
}
try {
- LOGGER.info("Job timeout: Pod logs\n----\n{}\n----", podLog(jobPodName(job)));
+ LOGGER.info("Job timeout: Job Pod logs\n----\n{}\n----", indent(podLog(jobPodName(job))));
+ } catch (Exception | AssertionError t) {
+ LOGGER.info("Pod logs not available: {}", t.getMessage());
+ }
+ try {
+ LOGGER.info("Job timeout: User Operator Pod logs\n----\n{}\n----", indent(podLog(userOperatorPodName(), "user-operator")));
} catch (Exception | AssertionError t) {
LOGGER.info("Pod logs not available: {}", t.getMessage());
}
@@ -451,12 +572,17 @@ private Job waitForJobSuccess(Job job) {
* The job will be deleted from the kubernetes cluster at the end of the test.
* @param name The name of the {@code Job} and also the consumer group id.
* The Job's pod will also use this in a {@code job=} selector.
+ * @param topic The topic to send messages over
+ * @param messagesCount The number of messages to send and receive.
+ * @param kafkaUser The user to send and receive the messages as.
+ * @param tlsListener true if the clients should connect over the TLS listener,
+ * otherwise the plaintext listener will be used.
* @param messagesCount The number of messages to produce & consume
* @return The job
*/
- private Job pingJob(String name, String topic, int messagesCount, String kafkaUser, boolean tls) {
+ private Job pingJob(String name, String topic, int messagesCount, KafkaUser kafkaUser, boolean tlsListener) {
- String connect = tls ? CLUSTER_NAME + "-kafka-bootstrap:9093" : CLUSTER_NAME + "-kafka-bootstrap:9092";
+ String connect = tlsListener ? CLUSTER_NAME + "-kafka-bootstrap:9093" : CLUSTER_NAME + "-kafka-bootstrap:9092";
ContainerBuilder cb = new ContainerBuilder()
.withName("ping")
.withImage(TestUtils.changeOrgAndTag("strimzi/test-client:latest"))
@@ -466,7 +592,7 @@ private Job pingJob(String name, String topic, int messagesCount, String kafkaUs
"--max-messages " + messagesCount).endEnv()
.addNewEnv().withName("CONSUMER_OPTS").withValue(
"--broker-list " + connect + " " +
- "--group-id " + name + "-" + new Random().nextInt() + " " +
+ "--group-id " + name + "-" + rng.nextInt(Integer.MAX_VALUE) + " " +
"--verbose " +
"--topic " + topic + " " +
"--max-messages " + messagesCount).endEnv()
@@ -475,47 +601,88 @@ private Job pingJob(String name, String topic, int messagesCount, String kafkaUs
PodSpecBuilder podSpecBuilder = new PodSpecBuilder()
.withRestartPolicy("OnFailure");
- if (tls) {
- String clusterCaSecretName = CLUSTER_NAME + "-cluster-ca-cert";
- String clusterCaSecretVolumeName = "ca-cert";
+ String kafkaUserName = kafkaUser != null ? kafkaUser.getMetadata().getName() : null;
+ boolean scramShaUser = kafkaUser != null && kafkaUser.getSpec() != null && kafkaUser.getSpec().getAuthentication() instanceof KafkaUserScramSha512ClientAuthentication;
+ boolean tlsUser = kafkaUser != null && kafkaUser.getSpec() != null && kafkaUser.getSpec().getAuthentication() instanceof KafkaUserTlsClientAuthentication;
+ String producerConfiguration = "acks=all\n";
+ String consumerConfiguration = "auto.offset.reset=earliest\n";
+ if (tlsListener) {
+ if (scramShaUser) {
+ consumerConfiguration += "security.protocol=SASL_SSL\n";
+ producerConfiguration += "security.protocol=SASL_SSL\n";
+ consumerConfiguration += saslConfigs(kafkaUser);
+ producerConfiguration += saslConfigs(kafkaUser);
+ } else {
+ consumerConfiguration += "security.protocol=SSL\n";
+ producerConfiguration += "security.protocol=SSL\n";
+ }
+ producerConfiguration +=
+ "ssl.truststore.location=/tmp/truststore.p12\n" +
+ "ssl.truststore.type=pkcs12\n";
+ consumerConfiguration += "auto.offset.reset=earliest\n" +
+ "ssl.truststore.location=/tmp/truststore.p12\n" +
+ "ssl.truststore.type=pkcs12\n";
+ } else {
+ if (scramShaUser) {
+ consumerConfiguration += "security.protocol=SASL_PLAINTEXT\n";
+ producerConfiguration += "security.protocol=SASL_PLAINTEXT\n";
+ consumerConfiguration += saslConfigs(kafkaUser);
+ producerConfiguration += saslConfigs(kafkaUser);
+ } else {
+ consumerConfiguration += "security.protocol=PLAINTEXT\n";
+ producerConfiguration += "security.protocol=PLAINTEXT\n";
+ }
+ }
+
+ if (tlsUser) {
+ producerConfiguration +=
+ "ssl.keystore.location=/tmp/keystore.p12\n" +
+ "ssl.keystore.type=pkcs12\n";
+ consumerConfiguration += "auto.offset.reset=earliest\n" +
+ "ssl.keystore.location=/tmp/keystore.p12\n" +
+ "ssl.keystore.type=pkcs12\n";
+ cb.addNewEnv().withName("PRODUCER_TLS").withValue("TRUE").endEnv()
+ .addNewEnv().withName("CONSUMER_TLS").withValue("TRUE").endEnv();
String userSecretVolumeName = "tls-cert";
String userSecretMountPoint = "/opt/kafka/user-secret";
- String caSecretMountPoint = "/opt/kafka/cluster-ca";
cb.addNewVolumeMount()
.withName(userSecretVolumeName)
.withMountPath(userSecretMountPoint)
- .endVolumeMount()
- .addNewVolumeMount()
+ .endVolumeMount()
+ .addNewEnv().withName("USER_LOCATION").withValue(userSecretMountPoint).endEnv();
+ podSpecBuilder
+ .addNewVolume()
+ .withName(userSecretVolumeName)
+ .withNewSecret()
+ .withSecretName(kafkaUserName)
+ .endSecret()
+ .endVolume();
+ }
+
+ cb.addNewEnv().withName("PRODUCER_CONFIGURATION").withValue(producerConfiguration).endEnv()
+ .addNewEnv().withName("CONSUMER_CONFIGURATION").withValue(consumerConfiguration).endEnv();
+
+ if (kafkaUserName != null) {
+ cb.addNewEnv().withName("KAFKA_USER").withValue(kafkaUserName).endEnv();
+ }
+
+ if (tlsListener) {
+ String clusterCaSecretName = CLUSTER_NAME + "-cluster-ca-cert";
+ String clusterCaSecretVolumeName = "ca-cert";
+ String caSecretMountPoint = "/opt/kafka/cluster-ca";
+ cb.addNewVolumeMount()
.withName(clusterCaSecretVolumeName)
.withMountPath(caSecretMountPoint)
.endVolumeMount()
- .addNewEnv().withName("PRODUCER_CONFIGURATION").withValue("\n" +
- "security.protocol=SSL\n" +
- "ssl.keystore.location=/tmp/keystore.p12\n" +
- "ssl.truststore.location=/tmp/truststore.p12\n" +
- "ssl.keystore.type=pkcs12").endEnv()
- .addNewEnv().withName("CONSUMER_CONFIGURATION").withValue("\n" +
- "auto.offset.reset=earliest\n" +
- "security.protocol=SSL\n" +
- "ssl.keystore.location=/tmp/keystore.p12\n" +
- "ssl.truststore.location=/tmp/truststore.p12\n" +
- "ssl.keystore.type=pkcs12").endEnv()
.addNewEnv().withName("PRODUCER_TLS").withValue("TRUE").endEnv()
.addNewEnv().withName("CONSUMER_TLS").withValue("TRUE").endEnv()
- .addNewEnv().withName("KAFKA_USER").withValue(kafkaUser).endEnv()
- .addNewEnv().withName("USER_LOCATION").withValue(userSecretMountPoint).endEnv()
.addNewEnv().withName("CA_LOCATION").withValue(caSecretMountPoint).endEnv()
- .addNewEnv().withName("TRUSTSTORE_LOCATION").withValue("/tmp/truststore.p12").endEnv()
- .addNewEnv().withName("KEYSTORE_LOCATION").withValue("/tmp/keystore.p12").endEnv();
-
+ .addNewEnv().withName("TRUSTSTORE_LOCATION").withValue("/tmp/truststore.p12").endEnv();
+ if (tlsUser) {
+ cb.addNewEnv().withName("KEYSTORE_LOCATION").withValue("/tmp/keystore.p12").endEnv();
+ }
podSpecBuilder
- .addNewVolume()
- .withName(userSecretVolumeName)
- .withNewSecret()
- .withSecretName(kafkaUser)
- .endSecret()
- .endVolume()
.addNewVolume()
.withName(clusterCaSecretVolumeName)
.withNewSecret()
@@ -542,6 +709,21 @@ private Job pingJob(String name, String topic, int messagesCount, String kafkaUs
return job;
}
+
+ String saslConfigs(KafkaUser kafkaUser) {
+ Secret secret = namespacedClient().secrets().withName(kafkaUser.getMetadata().getName()).get();
+
+ String password = secret.getData().get("password");
+ if (password == null) {
+ LOGGER.info("Secret {}:\n{}", kafkaUser.getMetadata().getName(), TestUtils.toYamlString(secret));
+ throw new RuntimeException("The Secret " + kafkaUser.getMetadata().getName() + " lacks the 'password' key");
+ }
+ return "sasl.mechanism=SCRAM-SHA-512\n" +
+ "sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n" +
+ "username=\"" + kafkaUser.getMetadata().getName() + "\" \\\n" +
+ "password=\"" + password + "\";\n";
+ }
+
/**
*
*/
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/Resources.java b/systemtest/src/test/java/io/strimzi/systemtest/Resources.java
index 66907f5943b..dd72b233f54 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/Resources.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/Resources.java
@@ -7,6 +7,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Job;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
@@ -94,7 +95,7 @@ void deleteResources() {
}
}
- io.strimzi.api.kafka.model.DoneableKafka kafkaEphemeral(String name, int kafkaReplicas) {
+ DoneableKafka kafkaEphemeral(String name, int kafkaReplicas) {
return kafka(defaultKafka(name, kafkaReplicas).build());
}
@@ -140,8 +141,25 @@ public KafkaBuilder defaultKafka(String name, int kafkaReplicas) {
.endSpec();
}
- io.strimzi.api.kafka.model.DoneableKafka kafka(Kafka kafka) {
- return new io.strimzi.api.kafka.model.DoneableKafka(kafka, k -> waitFor(deleteLater(kafka().create(k))));
+ DoneableKafka kafka(Kafka kafka) {
+ return new DoneableKafka(kafka, k -> {
+ TestUtils.waitFor("Kafka creation", 60000, 5000,
+ () -> {
+ try {
+ kafka().create(k);
+ return true;
+ } catch (KubernetesClientException e) {
+ if (e.getMessage().contains("object is being deleted")) {
+ return false;
+ } else {
+ throw e;
+ }
+ }
+ }
+ );
+ return waitFor(deleteLater(
+ k));
+ });
}
Kafka waitFor(Kafka kafka) {
@@ -213,6 +231,19 @@ DoneableKafkaUser tlsUser(String name) {
.build());
}
+ DoneableKafkaUser scramShaUser(String name) {
+ return user(new KafkaUserBuilder().withMetadata(
+ new ObjectMetaBuilder()
+ .withName(name)
+ .withNamespace(client().getNamespace())
+ .build())
+ .withNewSpec()
+ .withNewKafkaUserScramSha512ClientAuthenticationAuthentication()
+ .endKafkaUserScramSha512ClientAuthenticationAuthentication()
+ .endSpec()
+ .build());
+ }
+
private DoneableKafkaUser user(KafkaUser user) {
return new DoneableKafkaUser(user, ku -> {
KafkaUser resource = kafkaUser().create(ku);
diff --git a/test/Makefile b/test/Makefile
new file mode 100644
index 00000000000..7064f5de080
--- /dev/null
+++ b/test/Makefile
@@ -0,0 +1,11 @@
+PROJECT_NAME=test
+
+docker_build: java_install
+docker_push:
+docker_tag:
+all: docker_build docker_push
+clean: java_clean
+
+include ../Makefile.maven
+
+.PHONY: build clean release
diff --git a/test/pom.xml b/test/pom.xml
new file mode 100644
index 00000000000..0909af40ede
--- /dev/null
+++ b/test/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+ strimzi
+ io.strimzi
+ 0.7.0-SNAPSHOT
+
+ 4.0.0
+
+ test
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ junit
+ junit
+ compile
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ io.debezium
+ debezium-core
+ 0.8.1.Final
+ compile
+
+
+ io.debezium
+ debezium-core
+ 0.8.1.Final
+ test-jar
+ compile
+
+
+
+
\ No newline at end of file
diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/EmbeddedZooKeeper.java b/test/src/main/java/io/strimzi/test/EmbeddedZooKeeper.java
similarity index 85%
rename from topic-operator/src/test/java/io/strimzi/operator/topic/EmbeddedZooKeeper.java
rename to test/src/main/java/io/strimzi/test/EmbeddedZooKeeper.java
index 4b24bc8f00e..8cc2c4923b0 100644
--- a/topic-operator/src/test/java/io/strimzi/operator/topic/EmbeddedZooKeeper.java
+++ b/test/src/main/java/io/strimzi/test/EmbeddedZooKeeper.java
@@ -2,7 +2,7 @@
* Copyright 2017-2018, Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.operator.topic;
+package io.strimzi.test;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -20,7 +20,6 @@ public class EmbeddedZooKeeper {
public EmbeddedZooKeeper() throws IOException, InterruptedException {
dir = Files.createTempDirectory("strimzi").toFile();
- dir.mkdirs();
zk = new ZooKeeperServer(dir, dir, 1000);
start(new InetSocketAddress(0));
}
@@ -49,18 +48,23 @@ public void close() {
if (factory != null) {
factory.shutdown();
}
- //delete(dir);
+ delete(dir);
}
private static void delete(File file) {
- if (file.isFile()) {
- file.delete();
- } else {
- for (File child : file.listFiles()) {
+ File[] children = file.listFiles();
+ if (children != null) {
+ for (File child : children) {
delete(child);
}
- file.delete();
}
+ if (!file.delete()) {
+ noop();
+ }
+ }
+
+ private static void noop() {
+ // Here merely to please findbugs
}
public int getZkPort() {
diff --git a/api/src/test/java/io/strimzi/test/TestUtils.java b/test/src/main/java/io/strimzi/test/TestUtils.java
similarity index 97%
rename from api/src/test/java/io/strimzi/test/TestUtils.java
rename to test/src/main/java/io/strimzi/test/TestUtils.java
index 34f29f61f95..febe132d24e 100644
--- a/api/src/test/java/io/strimzi/test/TestUtils.java
+++ b/test/src/main/java/io/strimzi/test/TestUtils.java
@@ -157,16 +157,14 @@ public static String readResource(Class> cls, String resourceName) {
public static String readFile(File file) {
try {
- URL url = file.toURI().toURL();
- if (url == null) {
+ if (file == null) {
return null;
} else {
return new String(
- Files.readAllBytes(Paths.get(
- url.toURI())),
+ Files.readAllBytes(file.toPath()),
StandardCharsets.UTF_8);
}
- } catch (IOException | URISyntaxException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
diff --git a/api/src/test/java/io/strimzi/test/TimeoutException.java b/test/src/main/java/io/strimzi/test/TimeoutException.java
similarity index 100%
rename from api/src/test/java/io/strimzi/test/TimeoutException.java
rename to test/src/main/java/io/strimzi/test/TimeoutException.java
diff --git a/topic-operator/pom.xml b/topic-operator/pom.xml
index bace184a61c..63ef6427c56 100644
--- a/topic-operator/pom.xml
+++ b/topic-operator/pom.xml
@@ -74,16 +74,8 @@
test
- junit
- junit
-
-
- org.mockito
- mockito-core
-
-
- io.vertx
- vertx-unit
+ io.strimzi
+ test
io.debezium
@@ -98,6 +90,18 @@
test-jar
test
+
+ junit
+ junit
+
+
+ org.mockito
+ mockito-core
+
+
+ io.vertx
+ vertx-unit
+
org.apache.kafka
kafka_2.12
@@ -108,6 +112,10 @@
scala-library
test
+
+ io.strimzi
+ operator-common
+
diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/OperatorAssignedKafkaImpl.java b/topic-operator/src/main/java/io/strimzi/operator/topic/OperatorAssignedKafkaImpl.java
index 68ae8cdf04d..ee6b64727ca 100644
--- a/topic-operator/src/main/java/io/strimzi/operator/topic/OperatorAssignedKafkaImpl.java
+++ b/topic-operator/src/main/java/io/strimzi/operator/topic/OperatorAssignedKafkaImpl.java
@@ -7,6 +7,7 @@
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+import io.strimzi.operator.common.process.ProcessHelper;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
@@ -20,7 +21,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -91,7 +91,7 @@ public void changeReplicationFactor(Topic topic, Handler> hand
LOGGER.debug("Generating reassignment json for topic {}", topic.getTopicName());
String reassignment = generateReassignment(topic, zookeeper);
LOGGER.debug("Reassignment json for topic {}: {}", topic.getTopicName(), reassignment);
- File reassignmentJsonFile = createTmpFile("-reassignment.json");
+ File reassignmentJsonFile = ProcessHelper.createTmpFile("-reassignment.json");
try (Writer w = new OutputStreamWriter(new FileOutputStream(reassignmentJsonFile), StandardCharsets.UTF_8)) {
w.write(reassignment);
}
@@ -146,13 +146,13 @@ public void changeReplicationFactor(Topic topic, Handler> hand
if (ar.succeeded()) {
if (ar.result()) {
LOGGER.info("Reassignment complete");
- delete(reassignmentJsonFile);
+ ProcessHelper.delete(reassignmentJsonFile);
LOGGER.debug("Cancelling timer " + timerId);
vertx.cancelTimer(timerId);
reassignmentFinishedFuture.complete();
} else if (System.currentTimeMillis() - first > timeout) {
LOGGER.error("Reassignment timed out");
- delete(reassignmentJsonFile);
+ ProcessHelper.delete(reassignmentJsonFile);
LOGGER.debug("Cancelling timer " + timerId);
vertx.cancelTimer(timerId);
reassignmentFinishedFuture.fail("Timeout");
@@ -183,20 +183,7 @@ public void changeReplicationFactor(Topic topic, Handler> hand
// though we aren't relieved of the statefullness wrt removing throttles :-(
}
- private static void delete(File file) {
- /*if (!file.delete()) {
- logger.warn("Unable to delete temporary file {}", file);
- }*/
- }
- private static File createTmpFile(String suffix) throws IOException {
- File tmpFile = File.createTempFile(OperatorAssignedKafkaImpl.class.getName(), suffix);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Created temporary file {}", tmpFile);
- }
- /*tmpFile.deleteOnExit();*/
- return tmpFile;
- }
private static class VerifyLineParser implements Function {
int complete = 0;
@@ -216,6 +203,24 @@ public Void apply(String line) {
}
}
+ private T forEachLineStdout(ProcessHelper.ProcessResult pr, Function fn) throws IOException {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+ // Use platform default charset, on assumption that
+ // the ReassignPartitionsCommand will output in that
+ new FileInputStream(pr.standardOutput()), Charset.defaultCharset()))) {
+ String line = reader.readLine();
+ while (line != null) {
+ LOGGER.debug("Process {}: stdout: {}", pr, line);
+ T result = fn.apply(line);
+ if (result != null) {
+ return result;
+ }
+ line = reader.readLine();
+ }
+ return null;
+ }
+ }
+
private boolean verifyReassignment(File reassignmentJsonFile, String zookeeper, Long throttle) throws IOException, InterruptedException {
List verifyArgs = new ArrayList<>();
addJavaArgs(verifyArgs);
@@ -230,7 +235,8 @@ private boolean verifyReassignment(File reassignmentJsonFile, String zookeeper,
verifyArgs.add(reassignmentJsonFile.toString());
verifyArgs.add("--verify");
VerifyLineParser verifyLineParser = new VerifyLineParser();
- executeSubprocess(verifyArgs).forEachLineStdout(verifyLineParser);
+ forEachLineStdout(ProcessHelper.executeSubprocess(verifyArgs),
+ verifyLineParser);
return verifyLineParser.inProgress == 0;
}
@@ -247,7 +253,7 @@ private void executeReassignment(File reassignmentJsonFile, String zookeeper, Lo
executeArgs.add(reassignmentJsonFile.toString());
executeArgs.add("--execute");
- if (!executeSubprocess(executeArgs).forEachLineStdout(line -> {
+ if (!forEachLineStdout(ProcessHelper.executeSubprocess(executeArgs), line -> {
if (line.contains("Partitions reassignment failed due to")
|| line.contains("There is an existing assignment running")
|| line.contains("Failed to reassign partitions")) {
@@ -265,7 +271,7 @@ private void executeReassignment(File reassignmentJsonFile, String zookeeper, Lo
private String generateReassignment(Topic topic, String zookeeper) throws IOException, InterruptedException, ExecutionException {
JsonFactory factory = new JsonFactory();
- File topicsToMove = createTmpFile("-topics-to-move.json");
+ File topicsToMove = ProcessHelper.createTmpFile("-topics-to-move.json");
try (JsonGenerator gen = factory.createGenerator(topicsToMove, JsonEncoding.UTF8)) {
gen.writeStartObject();
@@ -288,9 +294,9 @@ private String generateReassignment(Topic topic, String zookeeper) throws IOExce
executeArgs.add(brokerList());
executeArgs.add("--generate");
- final ProcessResult processResult = executeSubprocess(executeArgs);
- delete(topicsToMove);
- String json = processResult.forEachLineStdout(new ReassignmentLineParser());
+ final ProcessHelper.ProcessResult processResult = ProcessHelper.executeSubprocess(executeArgs);
+ ProcessHelper.delete(topicsToMove);
+ String json = forEachLineStdout(processResult, new ReassignmentLineParser());
return json;
}
@@ -319,76 +325,8 @@ protected void addJavaArgs(List verifyArgs) {
verifyArgs.add("kafka.admin.ReassignPartitionsCommand");
}
- private ProcessResult executeSubprocess(List verifyArgs) throws IOException, InterruptedException {
- // We choose to run the reassignment as an external process because the Scala class:
- // a) doesn't throw on errors, but
- // b) writes them to stdout
- // so we need to parse its output, but we can't do that in an isolated way if we run it in our process
- // (System.setOut being global to the VM).
-
- if (verifyArgs.isEmpty() || !new File(verifyArgs.get(0)).canExecute()) {
- throw new OperatorException("Command " + verifyArgs + " lacks an executable arg[0]");
- }
-
- ProcessBuilder pb = new ProcessBuilder(verifyArgs);
- // If we redirect stderr to stdout we could break the predicates because the
- // characters will be jumbled.
- // Reading two pipes without deadlocking on the blocking is difficult, so let's just write stderr to a file.
- File stdout = createTmpFile(".out");
- File stderr = createTmpFile(".err");
- pb.redirectError(stderr);
- pb.redirectOutput(stdout);
- Process p = pb.start();
- LOGGER.info("Started process {} with command line {}", p, verifyArgs);
- p.getOutputStream().close();
- int exitCode = p.waitFor();
- // TODO timeout on wait
- LOGGER.info("Process {}: exited with status {}", p, exitCode);
- return new ProcessResult(p, stdout, stderr);
- }
-
- private static class ProcessResult implements AutoCloseable {
- private final File stdout;
- private final File stderr;
- private final Object pid;
-
- ProcessResult(Object pid, File stdout, File stderr) {
- this.pid = pid;
- this.stdout = stdout;
- this.stderr = stderr;
- }
-
- public T forEachLineStdout(Function fn) throws IOException {
- return forEachLine(this.stdout, fn);
- }
-
- private T forEachLine(File file, Function fn) throws IOException {
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(
- // Use platform default charset, on assumption that
- // the ReassignPartitionsCommand will output in that
- new FileInputStream(file), Charset.defaultCharset()))) {
- String line = reader.readLine();
- while (line != null) {
- LOGGER.debug("Process {}: stdout: {}", pid, line);
- T result = fn.apply(line);
- if (result != null) {
- return result;
- }
- line = reader.readLine();
- }
- return null;
- }
- }
-
- @Override
- public void close() throws Exception {
- delete(stdout);
- delete(stderr);
- }
- }
-
private static class ReassignmentLineParser implements Function {
boolean returnLine = false;
diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/ZkTopicStoreTest.java b/topic-operator/src/test/java/io/strimzi/operator/topic/ZkTopicStoreTest.java
index 16a732d2601..d2f817d7b1a 100644
--- a/topic-operator/src/test/java/io/strimzi/operator/topic/ZkTopicStoreTest.java
+++ b/topic-operator/src/test/java/io/strimzi/operator/topic/ZkTopicStoreTest.java
@@ -5,6 +5,7 @@
package io.strimzi.operator.topic;
import io.strimzi.operator.topic.zk.ZkImpl;
+import io.strimzi.test.EmbeddedZooKeeper;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/zk/ZkImplTest.java b/topic-operator/src/test/java/io/strimzi/operator/topic/zk/ZkImplTest.java
index fb46f09b05f..9a754d32af4 100644
--- a/topic-operator/src/test/java/io/strimzi/operator/topic/zk/ZkImplTest.java
+++ b/topic-operator/src/test/java/io/strimzi/operator/topic/zk/ZkImplTest.java
@@ -4,7 +4,7 @@
*/
package io.strimzi.operator.topic.zk;
-import io.strimzi.operator.topic.EmbeddedZooKeeper;
+import io.strimzi.test.EmbeddedZooKeeper;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
diff --git a/user-operator/pom.xml b/user-operator/pom.xml
index faa1ff7c5df..3f2369fa173 100644
--- a/user-operator/pom.xml
+++ b/user-operator/pom.xml
@@ -83,6 +83,23 @@
mockito-core
test
+
+ io.debezium
+ debezium-core
+ 0.8.1.Final
+ test
+
+
+ io.debezium
+ debezium-core
+ 0.8.1.Final
+ test-jar
+ test
+
+
+ io.strimzi
+ test
+
io.strimzi
operator-common
diff --git a/user-operator/src/main/java/io/strimzi/operator/user/Main.java b/user-operator/src/main/java/io/strimzi/operator/user/Main.java
index 7c65c3b1547..42790079cc3 100644
--- a/user-operator/src/main/java/io/strimzi/operator/user/Main.java
+++ b/user-operator/src/main/java/io/strimzi/operator/user/Main.java
@@ -14,18 +14,18 @@
import io.strimzi.operator.common.operator.resource.CrdOperator;
import io.strimzi.operator.common.operator.resource.SecretOperator;
import io.strimzi.operator.user.operator.KafkaUserOperator;
+import io.strimzi.operator.user.operator.ScramShaCredentials;
+import io.strimzi.operator.user.operator.ScramShaCredentialsOperator;
import io.strimzi.operator.user.operator.SimpleAclOperator;
-
import io.vertx.core.Future;
import io.vertx.core.Vertx;
-
-import java.util.HashMap;
-import java.util.Map;
-
import kafka.security.auth.SimpleAclAuthorizer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.HashMap;
+import java.util.Map;
+
public class Main {
private static final Logger log = LogManager.getLogger(Main.class.getName());
@@ -59,9 +59,11 @@ static Future run(Vertx vertx, KubernetesClient client, SimpleAclAuthori
SecretOperator secretOperations = new SecretOperator(vertx, client);
CrdOperator crdOperations = new CrdOperator<>(vertx, client, KafkaUser.class, KafkaUserList.class, DoneableKafkaUser.class);
SimpleAclOperator aclOperations = new SimpleAclOperator(vertx, authorizer);
+ ScramShaCredentials scramShaCredentials = new ScramShaCredentials(config.getZookeperConnect());
+ ScramShaCredentialsOperator scramShaCredentialsOperator = new ScramShaCredentialsOperator(vertx, scramShaCredentials);
KafkaUserOperator kafkaUserOperations = new KafkaUserOperator(vertx,
- certManager, crdOperations, secretOperations, aclOperations, config.getCaName(), config.getCaNamespace());
+ certManager, crdOperations, secretOperations, scramShaCredentialsOperator, aclOperations, config.getCaName(), config.getCaNamespace());
Future fut = Future.future();
UserOperator operator = new UserOperator(config.getNamespace(),
diff --git a/user-operator/src/main/java/io/strimzi/operator/user/model/KafkaUserModel.java b/user-operator/src/main/java/io/strimzi/operator/user/model/KafkaUserModel.java
index 68425075bb4..b0730406df8 100644
--- a/user-operator/src/main/java/io/strimzi/operator/user/model/KafkaUserModel.java
+++ b/user-operator/src/main/java/io/strimzi/operator/user/model/KafkaUserModel.java
@@ -4,16 +4,22 @@
*/
package io.strimzi.operator.user.model;
+import io.fabric8.kubernetes.api.model.Secret;
+import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.strimzi.api.kafka.model.AclRule;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserAuthentication;
import io.strimzi.api.kafka.model.KafkaUserAuthorizationSimple;
+import io.strimzi.api.kafka.model.KafkaUserScramSha512ClientAuthentication;
import io.strimzi.api.kafka.model.KafkaUserTlsClientAuthentication;
import io.strimzi.certs.CertAndKey;
import io.strimzi.certs.CertManager;
import io.strimzi.certs.Subject;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.user.model.acl.SimpleAclRule;
+import io.strimzi.operator.user.operator.PasswordGenerator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import javax.naming.InvalidNameException;
import javax.naming.ldap.LdapName;
@@ -28,15 +34,11 @@
import java.util.Set;
import java.util.stream.Collectors;
-import io.fabric8.kubernetes.api.model.Secret;
-import io.fabric8.kubernetes.api.model.SecretBuilder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
public class KafkaUserModel {
private static final Logger log = LogManager.getLogger(KafkaUserModel.class.getName());
private final static int CERTS_EXPIRATION_DAYS = 356;
+ public static final String KEY_PASSWORD = "password";
protected final String namespace;
protected final String name;
@@ -45,6 +47,7 @@ public class KafkaUserModel {
protected KafkaUserAuthentication authentication;
protected CertAndKey caCertAndKey;
protected CertAndKey userCertAndKey;
+ protected String scramSha512Password;
protected Set simpleAclRules = null;
@@ -65,19 +68,25 @@ protected KafkaUserModel(String namespace, String name, Labels labels) {
* Creates instance of KafkaUserModel from CRD definition
*
* @param certManager CertManager instance for work with certificates
+ * @param passwordGenerator A password generator
* @param kafkaUser The Custom Resource based on which the model should be created
* @param clientsCa Kubernetes secret with the clients certification authority
- * @param userSecret ubernetes secret with existing user certificate
+ * @param userSecret Kubernetes secret with existing user certificate
* @return
*/
- public static KafkaUserModel fromCrd(CertManager certManager, KafkaUser kafkaUser, Secret clientsCa, Secret userSecret) {
+ public static KafkaUserModel fromCrd(CertManager certManager,
+ PasswordGenerator passwordGenerator,
+ KafkaUser kafkaUser,
+ Secret clientsCa, Secret userSecret) {
KafkaUserModel result = new KafkaUserModel(kafkaUser.getMetadata().getNamespace(),
kafkaUser.getMetadata().getName(),
Labels.fromResource(kafkaUser).withKind(kafkaUser.getKind()));
result.setAuthentication(kafkaUser.getSpec().getAuthentication());
- if (kafkaUser.getSpec().getAuthentication() != null && kafkaUser.getSpec().getAuthentication().getType().equals(KafkaUserTlsClientAuthentication.TYPE_TLS)) {
+ if (kafkaUser.getSpec().getAuthentication() instanceof KafkaUserTlsClientAuthentication) {
result.maybeGenerateCertificates(certManager, clientsCa, userSecret);
+ } else if (kafkaUser.getSpec().getAuthentication() instanceof KafkaUserScramSha512ClientAuthentication) {
+ result.maybeGeneratePassword(passwordGenerator, userSecret);
}
if (kafkaUser.getSpec().getAuthorization() != null && kafkaUser.getSpec().getAuthorization().getType().equals(KafkaUserAuthorizationSimple.TYPE_SIMPLE)) {
@@ -95,12 +104,16 @@ public static KafkaUserModel fromCrd(CertManager certManager, KafkaUser kafkaUse
* @return
*/
public Secret generateSecret() {
- if (authentication != null && authentication.getType().equals(KafkaUserTlsClientAuthentication.TYPE_TLS)) {
+ if (authentication instanceof KafkaUserTlsClientAuthentication) {
Map data = new HashMap<>();
data.put("ca.crt", Base64.getEncoder().encodeToString(caCertAndKey.cert()));
data.put("user.key", Base64.getEncoder().encodeToString(userCertAndKey.key()));
data.put("user.crt", Base64.getEncoder().encodeToString(userCertAndKey.cert()));
return createSecret(data);
+ } else if (authentication instanceof KafkaUserScramSha512ClientAuthentication) {
+ Map data = new HashMap<>();
+ data.put(KafkaUserModel.KEY_PASSWORD, scramSha512Password);
+ return createSecret(data);
} else {
return null;
}
@@ -177,6 +190,20 @@ public void maybeGenerateCertificates(CertManager certManager, Secret clientsCa,
}
}
+ public void maybeGeneratePassword(PasswordGenerator generator, Secret userSecret) {
+ if (userSecret != null) {
+ // Secret already exists -> lets verify if it has a password
+ String password = userSecret.getData().get(KEY_PASSWORD);
+ if (password != null && !password.isEmpty()) {
+ this.scramSha512Password = password;
+ return;
+ }
+ }
+ log.debug("Generating user password");
+ this.scramSha512Password = generator.generate();
+
+ }
+
/**
* Decode from Base64 a keyed value from a Secret
*
@@ -243,6 +270,10 @@ public String getUserName() {
return getUserName(name);
}
+ public String getName() {
+ return name;
+ }
+
/**
* Generates the name of the USer secret based on the username
*
@@ -270,6 +301,10 @@ public void setAuthentication(KafkaUserAuthentication authentication) {
this.authentication = authentication;
}
+ public KafkaUserAuthentication getAuthentication() {
+ return this.authentication;
+ }
+
/**
* Get list of ACL rules for Simple Authorization which should apply to this user
*
@@ -282,7 +317,7 @@ public Set getSimpleAclRules() {
/**
* Sets list of ACL rules for Simple authorization
*
- * @param simpleAclRules List of ACL rules which should be applied to this user
+ * @param rules List of ACL rules which should be applied to this user
*/
public void setSimpleAclRules(List rules) {
Set simpleAclRules = new HashSet();
diff --git a/user-operator/src/main/java/io/strimzi/operator/user/operator/KafkaUserOperator.java b/user-operator/src/main/java/io/strimzi/operator/user/operator/KafkaUserOperator.java
index 09a82ce4b29..3b7b1338f53 100644
--- a/user-operator/src/main/java/io/strimzi/operator/user/operator/KafkaUserOperator.java
+++ b/user-operator/src/main/java/io/strimzi/operator/user/operator/KafkaUserOperator.java
@@ -10,7 +10,6 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
-
import io.strimzi.api.kafka.model.DoneableKafkaUser;
import io.strimzi.api.kafka.KafkaUserList;
import io.strimzi.api.kafka.model.KafkaUser;
@@ -26,6 +25,9 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
+import io.vertx.core.shareddata.Lock;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,10 +38,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import io.vertx.core.shareddata.Lock;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
/**
* Operator for a Kafka Users.
*/
@@ -54,20 +52,32 @@ public class KafkaUserOperator {
private final CertManager certManager;
private final String caName;
private final String caNamespace;
+ private final ScramShaCredentialsOperator scramShaCredentialOperator;
+ private PasswordGenerator passwordGenerator = new PasswordGenerator(12,
+ "abcdefghijklmnopqrstuvwxyz" +
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
+ "0123456789");
/**
* @param vertx The Vertx instance
* @param certManager For managing certificates
* @param crdOperator For operating on Custom Resources
* @param secretOperations For operating on Secrets
+ * @param scramShaCredentialOperator For operating on SCRAM SHA credentials
+ * @param aclOperations For operating on ACLs
+ * @param caName The name of the Secret containing the clients CA certificate and private key
+ * @param caNamespace The namespace of the Secret containing the clients CA certificate and private key
*/
public KafkaUserOperator(Vertx vertx,
CertManager certManager,
CrdOperator crdOperator,
- SecretOperator secretOperations, SimpleAclOperator aclOperations, String caName, String caNamespace) {
+ SecretOperator secretOperations,
+ ScramShaCredentialsOperator scramShaCredentialOperator,
+ SimpleAclOperator aclOperations, String caName, String caNamespace) {
this.vertx = vertx;
this.certManager = certManager;
this.secretOperations = secretOperations;
+ this.scramShaCredentialOperator = scramShaCredentialOperator;
this.crdOperator = crdOperator;
this.aclOperations = aclOperations;
this.caName = caName;
@@ -100,14 +110,17 @@ protected void createOrUpdate(Reconciliation reconciliation, KafkaUser kafkaUser
String userName = reconciliation.name();
KafkaUserModel user;
try {
- user = KafkaUserModel.fromCrd(certManager, kafkaUser, clientsCa, userSecret);
+ user = KafkaUserModel.fromCrd(certManager, passwordGenerator, kafkaUser, clientsCa, userSecret);
} catch (Exception e) {
handler.handle(Future.failedFuture(e));
return;
}
log.debug("{}: Updating User", reconciliation, userName, namespace);
- CompositeFuture.join(secretOperations.reconcile(namespace, user.getSecretName(), user.generateSecret()),
+ Secret desired = user.generateSecret();
+ CompositeFuture.join(
+ scramShaCredentialOperator.reconcile(user.getName(), desired != null ? desired.getData().get("password") : null),
+ secretOperations.reconcile(namespace, user.getSecretName(), desired),
aclOperations.reconcile(user.getUserName(), user.getSimpleAclRules()))
.map((Void) null).setHandler(handler);
}
@@ -120,10 +133,10 @@ protected void createOrUpdate(Reconciliation reconciliation, KafkaUser kafkaUser
protected void delete(Reconciliation reconciliation, Handler> handler) {
String namespace = reconciliation.namespace();
String user = reconciliation.name();
-
log.debug("{}: Deleting User", reconciliation, user, namespace);
CompositeFuture.join(secretOperations.reconcile(namespace, KafkaUserModel.getSecretName(user), null),
- aclOperations.reconcile(KafkaUserModel.getUserName(user), null))
+ aclOperations.reconcile(KafkaUserModel.getUserName(user), null),
+ scramShaCredentialOperator.reconcile(KafkaUserModel.getUserName(user), null))
.map((Void) null).setHandler(handler);
}
@@ -209,7 +222,6 @@ public final CountDownLatch reconcileAll(String trigger, String namespace, Label
CountDownLatch outerLatch = new CountDownLatch(1);
-
vertx.createSharedWorkerExecutor("kubernetes-ops-pool").executeBlocking(
future -> {
try {
@@ -223,6 +235,7 @@ public final CountDownLatch reconcileAll(String trigger, String namespace, Label
log.debug("reconcileAll({}, {}): User with ACLs: {}", RESOURCE_KIND, trigger, res.result());
desiredNames.addAll((Collection extends String>) res.result());
desiredNames.addAll(resourceNames);
+ desiredNames.addAll(scramShaCredentialOperator.list());
// We use a latch so that callers (specifically, test callers) know when the reconciliation is complete
// Using futures would be more complex for no benefit
diff --git a/user-operator/src/main/java/io/strimzi/operator/user/operator/PasswordGenerator.java b/user-operator/src/main/java/io/strimzi/operator/user/operator/PasswordGenerator.java
new file mode 100644
index 00000000000..29b9e821443
--- /dev/null
+++ b/user-operator/src/main/java/io/strimzi/operator/user/operator/PasswordGenerator.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.operator.user.operator;
+
+import java.security.SecureRandom;
+
+public class PasswordGenerator {
+
+ private final SecureRandom rng = new SecureRandom();
+ private final int length;
+ private final String alphabet;
+
+ public PasswordGenerator(int length, String alphabet) {
+ this.length = length;
+ this.alphabet = alphabet;
+ }
+
+ public String generate() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ sb.append(alphabet.charAt(rng.nextInt(alphabet.length())));
+ }
+ return sb.toString();
+ }
+}
diff --git a/user-operator/src/main/java/io/strimzi/operator/user/operator/ScramShaCredentials.java b/user-operator/src/main/java/io/strimzi/operator/user/operator/ScramShaCredentials.java
new file mode 100644
index 00000000000..b73bf684b11
--- /dev/null
+++ b/user-operator/src/main/java/io/strimzi/operator/user/operator/ScramShaCredentials.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.operator.user.operator;
+
+import io.strimzi.operator.common.process.ProcessHelper;
+import kafka.admin.ConfigCommand;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.asList;
+
+public class ScramShaCredentials {
+
+ private static final Logger log = LogManager.getLogger(SimpleAclOperator.class.getName());
+
+ private final ScramMechanism mechanism = ScramMechanism.SCRAM_SHA_512;
+
+ private final String zookeeper;
+
+ public ScramShaCredentials(String zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ /**
+ * Create or update the SCRAM-SHA credentials for the given user.
+ * @param iterations If <= 0 the default number of iterations will be used.
+ */
+ public void createOrUpdate(String username, String password, int iterations) {
+ if (0 < iterations && iterations < mechanism.minIterations()) {
+ throw new RuntimeException("Given number of iterations (" + iterations + ") " +
+ "is less than minimum iterations for mechanism (" + mechanism.minIterations() + ")");
+ }
+ StringBuilder value = new StringBuilder(mechanism.mechanismName()).append("=[");
+ if (iterations > 0) {
+ value.append("iterations=").append(iterations).append(',');
+ }
+ value.append("password=").append(password).append(']');
+ try (ProcessHelper.ProcessResult pr = exec(asList(
+ "--zookeeper", zookeeper,
+ "--alter",
+ "--entity-name", username,
+ "--entity-type", "users",
+ "--add-config", value.toString()))) {
+ Pattern compile = Pattern.compile("Completed Updating config for entity: user-principal '.*'\\.");
+ if (!matchResult(pr, pr.standardOutput(), 0, compile)) {
+ throw unexpectedOutput(pr);
+ }
+ }
+ }
+
+ /**
+ * Delete the SCRAM-SHA credentials for the given user.
+ * It is not an error if the user doesn't exist, or doesn't currently have any SCRAM-SHA credentials.
+ */
+ public void delete(String username) {
+ try (ProcessHelper.ProcessResult pr = exec(asList(
+ "--zookeeper", zookeeper,
+ "--alter",
+ "--entity-name", username,
+ "--entity-type", "users",
+ "--delete-config", mechanism.mechanismName()))) {
+ if (!matchResult(pr, pr.standardOutput(), 0,
+ Pattern.compile("Completed Updating config for entity: user-principal '.*'\\."))
+ && !matchResult(pr, pr.standardError(), 1,
+ Pattern.compile(Pattern.quote("Invalid config(s): " + mechanism.mechanismName())))) {
+ throw unexpectedOutput(pr);
+ }
+ }
+ }
+
+ /**
+ * Determine whether the given user has SCRAM-SHA credentials.
+ */
+ public boolean exists(String username) {
+ try (ProcessHelper.ProcessResult pr = exec(asList("kafka-configs.sh",
+ "--zookeeper", zookeeper,
+ "--describe",
+ "--entity-name", username,
+ "--entity-type", "users"))) {
+ if (matchResult(pr, pr.standardOutput(), 0,
+ Pattern.compile("Configs for user-principal '.*?' are .*" + mechanism.mechanismName() + "=salt=[a-zA-Z0-9=]+,stored_key=([a-zA-Z0-9/+=]+),server_key=([a-zA-Z0-9/+=]+),iterations=[0-9]+"))) {
+ return true;
+ } else if (matchResult(pr, pr.standardOutput(), 0,
+ Pattern.compile("Configs for user-principal '.*?' are .*(?!" + mechanism.mechanismName() + "=salt=[a-zA-Z0-9=]+,stored_key=([a-zA-Z0-9/+=]+),server_key=([a-zA-Z0-9/+=]+),iterations=[0-9]+)"))) {
+ return false;
+ } else {
+ throw unexpectedOutput(pr);
+ }
+ }
+ }
+
+ /**
+ * List users with SCRAM-SHA credentials
+ */
+ public List list() {
+ List result = new ArrayList<>();
+ try (ProcessHelper.ProcessResult pr = exec(asList("kafka-configs.sh",
+ "--zookeeper", zookeeper,
+ "--describe",
+ "--entity-type", "users"))) {
+ if (pr.exitCode() == 0) {
+ Pattern pattern = Pattern.compile("Configs for user-principal '(.*?)' are .*" + mechanism.mechanismName() + "=salt=[a-zA-Z0-9=]+,stored_key=([a-zA-Z0-9/+=]+),server_key=([a-zA-Z0-9/+=]+),iterations=[0-9]+");
+ try {
+ try (FileChannel channel = new FileInputStream(pr.standardOutput()).getChannel()) {
+ MappedByteBuffer byteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, (int) channel.size());
+ CharBuffer cs = Charset.defaultCharset().newDecoder().decode(byteBuffer);
+ Matcher m = pattern.matcher(cs);
+ while (m.find()) {
+ result.add(m.group(1));
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return result;
+ }
+
+ private RuntimeException unexpectedOutput(ProcessHelper.ProcessResult pr) {
+ log.debug("{} standard output:\n~~~\n{}\n~~~", pr, getFile(pr.standardOutput()));
+ log.debug("{} standard error:\n~~~\n{}\n~~~", pr, getFile(pr.standardError()));
+ return new RuntimeException(pr + " exited with code " + pr.exitCode() + " and is missing expected output");
+ }
+
+ boolean matchResult(ProcessHelper.ProcessResult pr, File file, int expectedExitCode, Pattern pattern) {
+ try {
+ if (pr.exitCode() == expectedExitCode) {
+ try (FileChannel channel = new FileInputStream(file).getChannel()) {
+ MappedByteBuffer byteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, (int) channel.size());
+ CharBuffer cs = Charset.defaultCharset().newDecoder().decode(byteBuffer);
+ Matcher m = pattern.matcher(cs);
+ if (m.find()) {
+ log.debug("Found output indicating success: {}", m.group());
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ProcessHelper.ProcessResult exec(List kafkaConfigsOptions) {
+ String cp = System.getProperty("java.class.path");
+ File home = new File(System.getProperty("java.home"));
+ List arguments = new ArrayList(asList(
+ new File(home, "bin/java").getAbsolutePath(),
+ "-cp", cp,
+ ConfigCommand.class.getName()));
+ arguments.addAll(kafkaConfigsOptions);
+
+ try {
+ return ProcessHelper.executeSubprocess(arguments);
+ } catch (IOException e) {
+ throw new RuntimeException("Error starting subprocess " + arguments, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException("Error starting subprocess " + arguments);
+ }
+
+ private static String getFile(File out) {
+ try {
+ return new String(Files.readAllBytes(out.toPath()), Charset.defaultCharset());
+ } catch (IOException e) {
+ e.printStackTrace();
+ return "";
+ }
+ }
+
+ public static void main(String[] args) {
+ ScramShaCredentials scramSha = new ScramShaCredentials("localhost:2181");
+ scramSha.createOrUpdate("tom", "password", -1);
+ scramSha.createOrUpdate("tom", "password", 4096);
+ scramSha.delete("tom");
+ }
+}
+
diff --git a/user-operator/src/main/java/io/strimzi/operator/user/operator/ScramShaCredentialsOperator.java b/user-operator/src/main/java/io/strimzi/operator/user/operator/ScramShaCredentialsOperator.java
new file mode 100644
index 00000000000..6948c56dd1f
--- /dev/null
+++ b/user-operator/src/main/java/io/strimzi/operator/user/operator/ScramShaCredentialsOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.operator.user.operator;
+
+import io.strimzi.operator.common.operator.resource.ReconcileResult;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+
+import java.util.List;
+
+public class ScramShaCredentialsOperator {
+
+ private ScramShaCredentials credsManager;
+
+ private Vertx vertx;
+
+ public ScramShaCredentialsOperator(Vertx vertx, ScramShaCredentials credsManager) {
+ this.credsManager = credsManager;
+ this.vertx = vertx;
+ }
+
+ Future> reconcile(String username, String password) {
+ Future> fut = Future.future();
+ vertx.createSharedWorkerExecutor("kubernetes-ops-pool").executeBlocking(
+ future -> {
+ boolean exists = credsManager.exists(username);
+ if (password != null) {
+ credsManager.createOrUpdate(username, password, -1);
+ future.complete(exists ? ReconcileResult.created(null) : ReconcileResult.patched(null));
+ } else {
+ if (exists) {
+ credsManager.delete(username);
+ future.complete(ReconcileResult.deleted());
+ } else {
+ future.complete(ReconcileResult.noop());
+ }
+ }
+ },
+ false,
+ fut.completer());
+ return fut;
+ }
+
+ public List list() {
+ return credsManager.list();
+ }
+}
diff --git a/user-operator/src/test/java/io/strimzi/operator/user/ResourceUtils.java b/user-operator/src/test/java/io/strimzi/operator/user/ResourceUtils.java
index eb5e3728705..21d443fc3dc 100644
--- a/user-operator/src/test/java/io/strimzi/operator/user/ResourceUtils.java
+++ b/user-operator/src/test/java/io/strimzi/operator/user/ResourceUtils.java
@@ -4,11 +4,16 @@
*/
package io.strimzi.operator.user;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.Secret;
+import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.strimzi.api.kafka.model.AclOperation;
import io.strimzi.api.kafka.model.AclRule;
import io.strimzi.api.kafka.model.KafkaUser;
+import io.strimzi.api.kafka.model.KafkaUserAuthentication;
import io.strimzi.api.kafka.model.KafkaUserAuthorizationSimple;
import io.strimzi.api.kafka.model.KafkaUserBuilder;
+import io.strimzi.api.kafka.model.KafkaUserScramSha512ClientAuthentication;
import io.strimzi.api.kafka.model.KafkaUserTlsClientAuthentication;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.user.model.acl.SimpleAclRule;
@@ -19,17 +24,13 @@
import java.util.Map;
import java.util.Set;
-import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
-import io.fabric8.kubernetes.api.model.Secret;
-import io.fabric8.kubernetes.api.model.SecretBuilder;
-
public class ResourceUtils {
public static final Map LABELS = Collections.singletonMap("foo", "bar");
public static final String NAMESPACE = "namespace";
public static final String NAME = "user";
public static final String CA_NAME = "somename";
- public static KafkaUser createKafkaUser() {
+ public static KafkaUser createKafkaUser(KafkaUserAuthentication authentication) {
return new KafkaUserBuilder()
.withMetadata(
new ObjectMetaBuilder()
@@ -39,7 +40,7 @@ public static KafkaUser createKafkaUser() {
.build()
)
.withNewSpec()
- .withAuthentication(new KafkaUserTlsClientAuthentication())
+ .withAuthentication(authentication)
.withNewKafkaUserAuthorizationSimpleAuthorization()
.addNewAcl()
.withNewAclRuleTopicResourceResource()
@@ -64,6 +65,14 @@ public static KafkaUser createKafkaUser() {
.build();
}
+ public static KafkaUser createKafkaUserTls() {
+ return createKafkaUser(new KafkaUserTlsClientAuthentication());
+ }
+
+ public static KafkaUser createKafkaUserScramSha() {
+ return createKafkaUser(new KafkaUserScramSha512ClientAuthentication());
+ }
+
public static Secret createClientsCa() {
return new SecretBuilder()
.withNewMetadata()
@@ -75,7 +84,7 @@ public static Secret createClientsCa() {
.build();
}
- public static Secret createUserCert() {
+ public static Secret createUserSecretTls() {
return new SecretBuilder()
.withNewMetadata()
.withName(NAME)
@@ -88,6 +97,17 @@ public static Secret createUserCert() {
.build();
}
+ public static Secret createUserSecretScramSha() {
+ return new SecretBuilder()
+ .withNewMetadata()
+ .withName(NAME)
+ .withNamespace(NAMESPACE)
+ .withLabels(Labels.userLabels(LABELS).withKind(KafkaUser.RESOURCE_KIND).toMap())
+ .endMetadata()
+ .addToData("password", "my-password")
+ .build();
+ }
+
public static Set createExpectedSimpleAclRules(KafkaUser user) {
Set simpleAclRules = new HashSet();
diff --git a/user-operator/src/test/java/io/strimzi/operator/user/model/KafkaUserModelTest.java b/user-operator/src/test/java/io/strimzi/operator/user/model/KafkaUserModelTest.java
index cb90943c72e..13a040e68e3 100644
--- a/user-operator/src/test/java/io/strimzi/operator/user/model/KafkaUserModelTest.java
+++ b/user-operator/src/test/java/io/strimzi/operator/user/model/KafkaUserModelTest.java
@@ -4,6 +4,7 @@
*/
package io.strimzi.operator.user.model;
+import io.fabric8.kubernetes.api.model.Secret;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserAuthorizationSimple;
import io.strimzi.api.kafka.model.KafkaUserSpec;
@@ -12,41 +13,43 @@
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.operator.MockCertManager;
import io.strimzi.operator.user.ResourceUtils;
+import io.strimzi.operator.user.operator.PasswordGenerator;
+import org.junit.Test;
import java.util.Base64;
-import io.fabric8.kubernetes.api.model.Secret;
-import org.junit.Test;
-
+import static io.strimzi.test.TestUtils.set;
+import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class KafkaUserModelTest {
- private final KafkaUser user = ResourceUtils.createKafkaUser();
+ private final KafkaUser tlsUser = ResourceUtils.createKafkaUserTls();
+ private final KafkaUser scramShaUser = ResourceUtils.createKafkaUserScramSha();
private final Secret clientsCa = ResourceUtils.createClientsCa();
- private final Secret userCert = ResourceUtils.createUserCert();
private final CertManager mockCertManager = new MockCertManager();
+ private final PasswordGenerator passwordGenerator = new PasswordGenerator(10, "a");
@Test
public void testFromCrd() {
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, null);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, tlsUser, clientsCa, null);
assertEquals(ResourceUtils.NAMESPACE, model.namespace);
assertEquals(ResourceUtils.NAME, model.name);
assertEquals(Labels.userLabels(ResourceUtils.LABELS).withKind(KafkaUser.RESOURCE_KIND), model.labels);
assertEquals(KafkaUserTlsClientAuthentication.TYPE_TLS, model.authentication.getType());
- KafkaUserAuthorizationSimple simple = (KafkaUserAuthorizationSimple) user.getSpec().getAuthorization();
- assertEquals(ResourceUtils.createExpectedSimpleAclRules(user).size(), model.getSimpleAclRules().size());
- assertEquals(ResourceUtils.createExpectedSimpleAclRules(user), model.getSimpleAclRules());
+ KafkaUserAuthorizationSimple simple = (KafkaUserAuthorizationSimple) tlsUser.getSpec().getAuthorization();
+ assertEquals(ResourceUtils.createExpectedSimpleAclRules(tlsUser).size(), model.getSimpleAclRules().size());
+ assertEquals(ResourceUtils.createExpectedSimpleAclRules(tlsUser), model.getSimpleAclRules());
}
@Test
public void testGenerateSecret() {
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, null);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, tlsUser, clientsCa, null);
Secret generated = model.generateSecret();
- System.out.println(generated.getData().keySet());
+ assertEquals(set("ca.crt", "user.crt", "user.key"), generated.getData().keySet());
assertEquals(ResourceUtils.NAME, generated.getMetadata().getName());
assertEquals(ResourceUtils.NAMESPACE, generated.getMetadata().getNamespace());
@@ -55,7 +58,7 @@ public void testGenerateSecret() {
@Test
public void testGenerateCertificateWhenNoExists() {
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, null);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, tlsUser, clientsCa, null);
Secret generated = model.generateSecret();
assertEquals("clients-ca-crt", new String(model.decodeFromSecret(generated, "ca.crt")));
@@ -65,11 +68,12 @@ public void testGenerateCertificateWhenNoExists() {
@Test
public void testGenerateCertificateAtCaChange() {
+ Secret userCert = ResourceUtils.createUserSecretTls();
Secret clientsCa = ResourceUtils.createClientsCa();
clientsCa.getData().put("clients-ca.key", Base64.getEncoder().encodeToString("different-clients-ca-key".getBytes()));
clientsCa.getData().put("clients-ca.crt", Base64.getEncoder().encodeToString("different-clients-ca-crt".getBytes()));
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, userCert);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, tlsUser, clientsCa, userCert);
Secret generated = model.generateSecret();
assertEquals("different-clients-ca-crt", new String(model.decodeFromSecret(generated, "ca.crt")));
@@ -79,7 +83,8 @@ public void testGenerateCertificateAtCaChange() {
@Test
public void testGenerateCertificateKeepExisting() {
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, userCert);
+ Secret userCert = ResourceUtils.createUserSecretTls();
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, tlsUser, clientsCa, userCert);
Secret generated = model.generateSecret();
assertEquals("clients-ca-crt", new String(model.decodeFromSecret(generated, "ca.crt")));
@@ -87,20 +92,75 @@ public void testGenerateCertificateKeepExisting() {
assertEquals("expected-key", new String(model.decodeFromSecret(generated, "user.key")));
}
+ @Test
+ public void testGenerateCertificateExistingScramSha() {
+ Secret userCert = ResourceUtils.createUserSecretScramSha();
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, tlsUser, clientsCa, userCert);
+ Secret generated = model.generateSecret();
+
+ assertEquals("clients-ca-crt", new String(model.decodeFromSecret(generated, "ca.crt")));
+ assertEquals("crt file", new String(model.decodeFromSecret(generated, "user.crt")));
+ assertEquals("key file", new String(model.decodeFromSecret(generated, "user.key")));
+ }
+
+ @Test
+ public void testGeneratePasswordWhenNoSecretExists() {
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, scramShaUser, clientsCa, null);
+ Secret generated = model.generateSecret();
+
+ assertEquals(ResourceUtils.NAME, generated.getMetadata().getName());
+ assertEquals(ResourceUtils.NAMESPACE, generated.getMetadata().getNamespace());
+ assertEquals(Labels.userLabels(ResourceUtils.LABELS).withKind(KafkaUser.RESOURCE_KIND).toMap(), generated.getMetadata().getLabels());
+
+ assertEquals(singleton(KafkaUserModel.KEY_PASSWORD), generated.getData().keySet());
+ assertEquals("aaaaaaaaaa", generated.getData().get(KafkaUserModel.KEY_PASSWORD));
+ }
+
+ @Test
+ public void testGeneratePasswordKeepExistingScramSha() {
+ Secret userCert = ResourceUtils.createUserSecretScramSha();
+ String existing = userCert.getData().get(KafkaUserModel.KEY_PASSWORD);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, scramShaUser, clientsCa, userCert);
+ Secret generated = model.generateSecret();
+
+ assertEquals(ResourceUtils.NAME, generated.getMetadata().getName());
+ assertEquals(ResourceUtils.NAMESPACE, generated.getMetadata().getNamespace());
+ assertEquals(Labels.userLabels(ResourceUtils.LABELS).withKind(KafkaUser.RESOURCE_KIND).toMap(), generated.getMetadata().getLabels());
+
+ assertEquals(singleton(KafkaUserModel.KEY_PASSWORD), generated.getData().keySet());
+ assertEquals(existing, generated.getData().get(KafkaUserModel.KEY_PASSWORD));
+ }
+
+ @Test
+ public void testGeneratePasswordExistingTlsSecret() {
+ Secret userCert = ResourceUtils.createUserSecretTls();
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, scramShaUser, clientsCa, userCert);
+ Secret generated = model.generateSecret();
+
+ assertEquals(ResourceUtils.NAME, generated.getMetadata().getName());
+ assertEquals(ResourceUtils.NAMESPACE, generated.getMetadata().getNamespace());
+ assertEquals(Labels.userLabels(ResourceUtils.LABELS).withKind(KafkaUser.RESOURCE_KIND).toMap(), generated.getMetadata().getLabels());
+
+ assertEquals(singleton("password"), generated.getData().keySet());
+ assertEquals("aaaaaaaaaa", generated.getData().get(KafkaUserModel.KEY_PASSWORD));
+ }
+
@Test
public void testNoTlsAuthn() {
- KafkaUser user = ResourceUtils.createKafkaUser();
+ Secret userCert = ResourceUtils.createUserSecretTls();
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
user.setSpec(new KafkaUserSpec());
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, userCert);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, user, clientsCa, userCert);
assertNull(model.generateSecret());
}
@Test
public void testNoSimpleAuthz() {
- KafkaUser user = ResourceUtils.createKafkaUser();
+ Secret userCert = ResourceUtils.createUserSecretTls();
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
user.setSpec(new KafkaUserSpec());
- KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, user, clientsCa, userCert);
+ KafkaUserModel model = KafkaUserModel.fromCrd(mockCertManager, passwordGenerator, user, clientsCa, userCert);
assertNull(model.getSimpleAclRules());
}
diff --git a/user-operator/src/test/java/io/strimzi/operator/user/operator/KafkaUserOperatorTest.java b/user-operator/src/test/java/io/strimzi/operator/user/operator/KafkaUserOperatorTest.java
index b0f7f84a58b..a58bdd3893f 100644
--- a/user-operator/src/test/java/io/strimzi/operator/user/operator/KafkaUserOperatorTest.java
+++ b/user-operator/src/test/java/io/strimzi/operator/user/operator/KafkaUserOperatorTest.java
@@ -4,6 +4,7 @@
*/
package io.strimzi.operator.user.operator;
+import io.fabric8.kubernetes.api.model.Secret;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.certs.CertManager;
import io.strimzi.operator.common.Reconciliation;
@@ -15,15 +16,6 @@
import io.strimzi.operator.user.ResourceUtils;
import io.strimzi.operator.user.model.KafkaUserModel;
import io.strimzi.operator.user.model.acl.SimpleAclRule;
-
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import io.fabric8.kubernetes.api.model.Secret;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@@ -31,14 +23,22 @@
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
@@ -60,10 +60,11 @@ public static void after() {
}
@Test
- public void testCreateUser(TestContext context) {
+ public void testCreateTlsUser(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
@@ -73,8 +74,10 @@ public void testCreateUser(TestContext context) {
ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
Secret clientsCa = ResourceUtils.createClientsCa();
Async async = context.async();
@@ -122,20 +125,23 @@ public void testUpdateUserNoChange(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretCaptor = ArgumentCaptor.forClass(Secret.class);
when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), secretCaptor.capture())).thenReturn(Future.succeededFuture());
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
Secret clientsCa = ResourceUtils.createClientsCa();
- Secret userCert = ResourceUtils.createUserCert();
+ Secret userCert = ResourceUtils.createUserSecretTls();
Async async = context.async();
op.createOrUpdate(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), user, clientsCa, userCert, res -> {
@@ -187,22 +193,25 @@ public void testUpdateUserNoAuthnAuthz(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretCaptor = ArgumentCaptor.forClass(Secret.class);
when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), secretCaptor.capture())).thenReturn(Future.succeededFuture());
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
user.getSpec().setAuthorization(null);
user.getSpec().setAuthentication(null);
Secret clientsCa = ResourceUtils.createClientsCa();
- Secret userCert = ResourceUtils.createUserCert();
+ Secret userCert = ResourceUtils.createUserSecretTls();
Async async = context.async();
op.createOrUpdate(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), user, clientsCa, userCert, res -> {
@@ -242,6 +251,7 @@ public void testUpdateUserNewCert(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
@@ -252,12 +262,14 @@ public void testUpdateUserNewCert(TestContext context) {
ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
Secret clientsCa = ResourceUtils.createClientsCa();
clientsCa.getData().put("clients-ca.key", Base64.getEncoder().encodeToString("different-clients-ca-key".getBytes()));
clientsCa.getData().put("clients-ca.crt", Base64.getEncoder().encodeToString("different-clients-ca-crt".getBytes()));
- Secret userCert = ResourceUtils.createUserCert();
+ Secret userCert = ResourceUtils.createUserSecretTls();
Async async = context.async();
op.createOrUpdate(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), user, clientsCa, userCert, res -> {
@@ -288,19 +300,22 @@ public void testUpdateUserNewCert(TestContext context) {
}
@Test
- public void testDeleteUser(TestContext context) {
+ public void testDeleteTlsUser(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), isNull())).thenReturn(Future.succeededFuture());
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
when(aclOps.reconcile(aclNameCaptor.capture(), isNull())).thenReturn(Future.succeededFuture());
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
Async async = context.async();
op.delete(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), res -> {
@@ -323,13 +338,14 @@ public void testDeleteUser(TestContext context) {
}
@Test
- public void testReconcileNewUser(TestContext context) {
+ public void testReconcileNewTlsUser(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
Secret clientsCa = ResourceUtils.createClientsCa();
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
@@ -341,6 +357,8 @@ public void testReconcileNewUser(TestContext context) {
ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
when(mockSecretOps.get(eq(clientsCa.getMetadata().getNamespace()), eq(clientsCa.getMetadata().getName()))).thenReturn(clientsCa);
when(mockSecretOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(null);
@@ -387,21 +405,24 @@ public void testReconcileNewUser(TestContext context) {
}
@Test
- public void testReconcileExistingUser(TestContext context) {
+ public void testReconcileExistingTlsUser(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
Secret clientsCa = ResourceUtils.createClientsCa();
- Secret userCert = ResourceUtils.createUserCert();
+ Secret userCert = ResourceUtils.createUserSecretTls();
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretCaptor = ArgumentCaptor.forClass(Secret.class);
when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), secretCaptor.capture())).thenReturn(Future.succeededFuture());
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
@@ -452,20 +473,23 @@ public void testReconcileExistingUser(TestContext context) {
}
@Test
- public void testReconcileDeleteUser(TestContext context) {
+ public void testReconcileDeleteTlsUser(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
- KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
- KafkaUser user = ResourceUtils.createKafkaUser();
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserTls();
Secret clientsCa = ResourceUtils.createClientsCa();
- Secret userCert = ResourceUtils.createUserCert();
+ Secret userCert = ResourceUtils.createUserSecretTls();
ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), isNull())).thenReturn(Future.succeededFuture());
+ when(scramOps.reconcile(any(), any())).thenReturn(Future.succeededFuture());
+
ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
when(aclOps.reconcile(aclNameCaptor.capture(), isNull())).thenReturn(Future.succeededFuture());
@@ -499,37 +523,50 @@ public void testReconcileAll(TestContext context) {
CrdOperator mockCrdOps = mock(CrdOperator.class);
SecretOperator mockSecretOps = mock(SecretOperator.class);
SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
-
- KafkaUser newUser = ResourceUtils.createKafkaUser();
- newUser.getMetadata().setName("new-user");
- KafkaUser existingUser = ResourceUtils.createKafkaUser();
- existingUser.getMetadata().setName("existing-user");
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
+
+ KafkaUser newTlsUser = ResourceUtils.createKafkaUserTls();
+ newTlsUser.getMetadata().setName("new-tls-user");
+ KafkaUser newScramShaUser = ResourceUtils.createKafkaUserScramSha();
+ newScramShaUser.getMetadata().setName("new-scram-sha-user");
+ KafkaUser existingTlsUser = ResourceUtils.createKafkaUserTls();
+ existingTlsUser.getMetadata().setName("existing-tls-user");
Secret clientsCa = ResourceUtils.createClientsCa();
- Secret existingUserCert = ResourceUtils.createUserCert();
- existingUserCert.getMetadata().setName("existing-user");
- Secret deletedUserCert = ResourceUtils.createUserCert();
+ Secret existingTlsUserSecret = ResourceUtils.createUserSecretTls();
+ existingTlsUserSecret.getMetadata().setName("existing-tls-user");
+ Secret existingScramShaUserSecret = ResourceUtils.createUserSecretScramSha();
+ existingScramShaUserSecret.getMetadata().setName("existing-scram-sha-user");
+ KafkaUser existingScramShaUser = ResourceUtils.createKafkaUserTls();
+ existingScramShaUser.getMetadata().setName("existing-scram-sha-user");
+ Secret deletedUserCert = ResourceUtils.createUserSecretTls();
deletedUserCert.getMetadata().setName("deleted-user");
- when(mockCrdOps.list(eq(ResourceUtils.NAMESPACE), eq(Labels.userLabels(ResourceUtils.LABELS)))).thenReturn(Arrays.asList(newUser, existingUser));
- when(mockSecretOps.list(eq(ResourceUtils.NAMESPACE), eq(Labels.userLabels(ResourceUtils.LABELS).withKind(KafkaUser.RESOURCE_KIND)))).thenReturn(Arrays.asList(existingUserCert, deletedUserCert));
- when(aclOps.getUsersWithAcls()).thenReturn(new HashSet(Arrays.asList("existing-user", "second-deleted-user")));
+ when(mockCrdOps.list(eq(ResourceUtils.NAMESPACE), eq(Labels.userLabels(ResourceUtils.LABELS)))).thenReturn(Arrays.asList(newTlsUser, newScramShaUser, existingTlsUser, existingScramShaUser));
+ when(mockSecretOps.list(eq(ResourceUtils.NAMESPACE), eq(Labels.userLabels(ResourceUtils.LABELS).withKind(KafkaUser.RESOURCE_KIND)))).thenReturn(Arrays.asList(existingTlsUserSecret, existingScramShaUserSecret, deletedUserCert));
+ when(aclOps.getUsersWithAcls()).thenReturn(new HashSet(Arrays.asList("existing-tls-user", "second-deleted-user")));
+ when(scramOps.list()).thenReturn(asList("existing-tls-user", "deleted-scram-sha-user"));
- when(mockCrdOps.get(eq(newUser.getMetadata().getNamespace()), eq(newUser.getMetadata().getName()))).thenReturn(newUser);
- when(mockCrdOps.get(eq(existingUser.getMetadata().getNamespace()), eq(existingUser.getMetadata().getName()))).thenReturn(existingUser);
+ when(mockCrdOps.get(eq(newTlsUser.getMetadata().getNamespace()), eq(newTlsUser.getMetadata().getName()))).thenReturn(newTlsUser);
+ when(mockCrdOps.get(eq(newScramShaUser.getMetadata().getNamespace()), eq(newScramShaUser.getMetadata().getName()))).thenReturn(newScramShaUser);
+ when(mockCrdOps.get(eq(existingTlsUser.getMetadata().getNamespace()), eq(existingTlsUser.getMetadata().getName()))).thenReturn(existingTlsUser);
+ when(mockCrdOps.get(eq(existingTlsUser.getMetadata().getNamespace()), eq(existingScramShaUser.getMetadata().getName()))).thenReturn(existingScramShaUser);
when(mockCrdOps.get(eq(deletedUserCert.getMetadata().getNamespace()), eq(deletedUserCert.getMetadata().getName()))).thenReturn(null);
when(mockSecretOps.get(eq(clientsCa.getMetadata().getNamespace()), eq(clientsCa.getMetadata().getName()))).thenReturn(clientsCa);
- when(mockSecretOps.get(eq(newUser.getMetadata().getNamespace()), eq(newUser.getMetadata().getName()))).thenReturn(null);
- when(mockSecretOps.get(eq(existingUser.getMetadata().getNamespace()), eq(existingUser.getMetadata().getName()))).thenReturn(existingUserCert);
+ when(mockSecretOps.get(eq(newTlsUser.getMetadata().getNamespace()), eq(newTlsUser.getMetadata().getName()))).thenReturn(null);
+ when(mockSecretOps.get(eq(newScramShaUser.getMetadata().getNamespace()), eq(newScramShaUser.getMetadata().getName()))).thenReturn(null);
+ when(mockSecretOps.get(eq(existingTlsUser.getMetadata().getNamespace()), eq(existingTlsUser.getMetadata().getName()))).thenReturn(existingTlsUserSecret);
+ when(mockSecretOps.get(eq(existingScramShaUser.getMetadata().getNamespace()), eq(existingScramShaUser.getMetadata().getName()))).thenReturn(existingScramShaUserSecret);
when(mockSecretOps.get(eq(deletedUserCert.getMetadata().getNamespace()), eq(deletedUserCert.getMetadata().getName()))).thenReturn(deletedUserCert);
Set createdOrUpdated = new CopyOnWriteArraySet<>();
Set deleted = new CopyOnWriteArraySet<>();
- Async async = context.async(4);
+ Async async = context.async(7);
KafkaUserOperator op = new KafkaUserOperator(vertx,
mockCertManager,
mockCrdOps,
- mockSecretOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE) {
+ mockSecretOps, scramOps,
+ aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE) {
@Override
public void createOrUpdate(Reconciliation reconciliation, KafkaUser user, Secret clientCa, Secret userSecret, Handler> h) {
@@ -550,7 +587,192 @@ public void delete(Reconciliation reconciliation, Handler h) {
async.await();
- context.assertEquals(new HashSet(asList("new-user", "existing-user")), createdOrUpdated);
- context.assertEquals(new HashSet(asList("deleted-user", "second-deleted-user")), deleted);
+ context.assertEquals(new HashSet(asList("new-tls-user", "existing-tls-user",
+ "new-scram-sha-user", "existing-scram-sha-user")), createdOrUpdated);
+ context.assertEquals(new HashSet(asList("deleted-user", "second-deleted-user", "deleted-scram-sha-user")), deleted);
+ }
+
+ @Test
+ public void testReconcileNewScramShaUser(TestContext context) {
+ CrdOperator mockCrdOps = mock(CrdOperator.class);
+ SecretOperator mockSecretOps = mock(SecretOperator.class);
+ SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
+
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserScramSha();
+
+ ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor secretCaptor = ArgumentCaptor.forClass(Secret.class);
+ when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), secretCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
+ when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ ArgumentCaptor scramUserCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor scramPasswordCaptor = ArgumentCaptor.forClass(String.class);
+ when(scramOps.reconcile(scramUserCaptor.capture(), scramPasswordCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ when(mockSecretOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(null);
+
+ when(mockCrdOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(user);
+
+ Async async = context.async();
+ op.reconcile(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), res -> {
+ context.assertTrue(res.succeeded());
+
+ List capturedNames = secretNameCaptor.getAllValues();
+ context.assertEquals(1, capturedNames.size());
+ context.assertEquals(ResourceUtils.NAME, capturedNames.get(0));
+
+ List capturedNamespaces = secretNamespaceCaptor.getAllValues();
+ context.assertEquals(1, capturedNamespaces.size());
+ context.assertEquals(ResourceUtils.NAMESPACE, capturedNamespaces.get(0));
+
+ List capturedSecrets = secretCaptor.getAllValues();
+
+ context.assertEquals(1, capturedSecrets.size());
+
+ Secret captured = capturedSecrets.get(0);
+ context.assertEquals(user.getMetadata().getName(), captured.getMetadata().getName());
+ context.assertEquals(user.getMetadata().getNamespace(), captured.getMetadata().getNamespace());
+ context.assertEquals(Labels.userLabels(user.getMetadata().getLabels()).withKind(KafkaUser.RESOURCE_KIND).toMap(), captured.getMetadata().getLabels());
+
+ context.assertEquals(scramPasswordCaptor.getValue(), captured.getData().get(KafkaUserModel.KEY_PASSWORD));
+ context.assertTrue(captured.getData().get(KafkaUserModel.KEY_PASSWORD).matches("[a-zA-Z0-9]{12}"));
+
+ List capturedAclNames = aclNameCaptor.getAllValues();
+ context.assertEquals(1, capturedAclNames.size());
+ context.assertEquals(KafkaUserModel.getUserName(ResourceUtils.NAME), capturedAclNames.get(0));
+
+ List> capturedAcls = aclRulesCaptor.getAllValues();
+
+ context.assertEquals(1, capturedAcls.size());
+ Set aclRules = capturedAcls.get(0);
+
+ context.assertEquals(ResourceUtils.createExpectedSimpleAclRules(user).size(), aclRules.size());
+ context.assertEquals(ResourceUtils.createExpectedSimpleAclRules(user), aclRules);
+
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testReconcileExistingScramShaUser(TestContext context) {
+ CrdOperator mockCrdOps = mock(CrdOperator.class);
+ SecretOperator mockSecretOps = mock(SecretOperator.class);
+ SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
+
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserScramSha();
+ Secret userCert = ResourceUtils.createUserSecretScramSha();
+ String password = userCert.getData().get(KafkaUserModel.KEY_PASSWORD);
+
+ ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor secretCaptor = ArgumentCaptor.forClass(Secret.class);
+ when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), secretCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ ArgumentCaptor scramUserCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor scramPasswordCaptor = ArgumentCaptor.forClass(String.class);
+ when(scramOps.reconcile(scramUserCaptor.capture(), scramPasswordCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor> aclRulesCaptor = ArgumentCaptor.forClass(Set.class);
+ when(aclOps.reconcile(aclNameCaptor.capture(), aclRulesCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ when(mockSecretOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(userCert);
+
+ when(mockCrdOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(user);
+
+ Async async = context.async();
+ op.reconcile(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), res -> {
+ context.assertTrue(res.succeeded());
+
+ List capturedNames = secretNameCaptor.getAllValues();
+ context.assertEquals(1, capturedNames.size());
+ context.assertEquals(ResourceUtils.NAME, capturedNames.get(0));
+
+ List capturedNamespaces = secretNamespaceCaptor.getAllValues();
+ context.assertEquals(1, capturedNamespaces.size());
+ context.assertEquals(ResourceUtils.NAMESPACE, capturedNamespaces.get(0));
+
+ List capturedSecrets = secretCaptor.getAllValues();
+
+ context.assertEquals(1, capturedSecrets.size());
+
+ Secret captured = capturedSecrets.get(0);
+ context.assertEquals(user.getMetadata().getName(), captured.getMetadata().getName());
+ context.assertEquals(user.getMetadata().getNamespace(), captured.getMetadata().getNamespace());
+ context.assertEquals(Labels.userLabels(user.getMetadata().getLabels()).withKind(KafkaUser.RESOURCE_KIND).toMap(), captured.getMetadata().getLabels());
+ context.assertEquals(password, captured.getData().get(KafkaUserModel.KEY_PASSWORD));
+ context.assertEquals(password, scramPasswordCaptor.getValue());
+
+ List capturedAclNames = aclNameCaptor.getAllValues();
+ context.assertEquals(1, capturedAclNames.size());
+ context.assertEquals(KafkaUserModel.getUserName(ResourceUtils.NAME), capturedAclNames.get(0));
+
+ List> capturedAcls = aclRulesCaptor.getAllValues();
+
+ context.assertEquals(1, capturedAcls.size());
+ Set aclRules = capturedAcls.get(0);
+
+ context.assertEquals(ResourceUtils.createExpectedSimpleAclRules(user).size(), aclRules.size());
+ context.assertEquals(ResourceUtils.createExpectedSimpleAclRules(user), aclRules);
+
+ async.complete();
+ });
+ }
+
+ @Test
+ public void testReconcileDeleteScramShaUser(TestContext context) {
+ CrdOperator mockCrdOps = mock(CrdOperator.class);
+ SecretOperator mockSecretOps = mock(SecretOperator.class);
+ SimpleAclOperator aclOps = mock(SimpleAclOperator.class);
+ ScramShaCredentialsOperator scramOps = mock(ScramShaCredentialsOperator.class);
+
+ KafkaUserOperator op = new KafkaUserOperator(vertx, mockCertManager, mockCrdOps, mockSecretOps, scramOps, aclOps, ResourceUtils.CA_NAME, ResourceUtils.NAMESPACE);
+ KafkaUser user = ResourceUtils.createKafkaUserScramSha();
+ Secret userCert = ResourceUtils.createUserSecretTls();
+
+ ArgumentCaptor secretNamespaceCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor secretNameCaptor = ArgumentCaptor.forClass(String.class);
+ when(mockSecretOps.reconcile(secretNamespaceCaptor.capture(), secretNameCaptor.capture(), isNull())).thenReturn(Future.succeededFuture());
+
+ ArgumentCaptor scramUserCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor scramPasswordCaptor = ArgumentCaptor.forClass(String.class);
+ when(scramOps.reconcile(scramUserCaptor.capture(), scramPasswordCaptor.capture())).thenReturn(Future.succeededFuture());
+
+ ArgumentCaptor aclNameCaptor = ArgumentCaptor.forClass(String.class);
+ when(aclOps.reconcile(aclNameCaptor.capture(), isNull())).thenReturn(Future.succeededFuture());
+
+ when(mockSecretOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(userCert);
+
+ when(mockCrdOps.get(eq(user.getMetadata().getNamespace()), eq(user.getMetadata().getName()))).thenReturn(null);
+
+ Async async = context.async();
+ op.reconcile(new Reconciliation("test-trigger", ResourceType.USER, ResourceUtils.NAMESPACE, ResourceUtils.NAME), res -> {
+ context.assertTrue(res.succeeded());
+
+ List capturedNames = secretNameCaptor.getAllValues();
+ context.assertEquals(1, capturedNames.size());
+ context.assertEquals(ResourceUtils.NAME, capturedNames.get(0));
+
+ List capturedNamespaces = secretNamespaceCaptor.getAllValues();
+ context.assertEquals(1, capturedNamespaces.size());
+ context.assertEquals(ResourceUtils.NAMESPACE, capturedNamespaces.get(0));
+
+ List capturedAclNames = aclNameCaptor.getAllValues();
+ context.assertEquals(1, capturedAclNames.size());
+ context.assertEquals(KafkaUserModel.getUserName(ResourceUtils.NAME), capturedAclNames.get(0));
+
+ context.assertEquals(singletonList("CN=" + ResourceUtils.NAME), scramUserCaptor.getAllValues());
+ context.assertEquals(singletonList(null), scramPasswordCaptor.getAllValues());
+
+ async.complete();
+ });
}
}
diff --git a/user-operator/src/test/java/io/strimzi/operator/user/operator/PasswordGeneratorTest.java b/user-operator/src/test/java/io/strimzi/operator/user/operator/PasswordGeneratorTest.java
new file mode 100644
index 00000000000..402ad9b5651
--- /dev/null
+++ b/user-operator/src/test/java/io/strimzi/operator/user/operator/PasswordGeneratorTest.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.operator.user.operator;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PasswordGeneratorTest {
+
+ @Test
+ public void length() {
+ PasswordGenerator generator = new PasswordGenerator(10, "a");
+ assertEquals("aaaaaaaaaa", generator.generate());
+ }
+
+ @Test
+ public void alphabet() {
+ PasswordGenerator generator = new PasswordGenerator(10, "ab");
+ assertTrue(generator.generate().matches("[ab]{10}"));
+ }
+}
diff --git a/user-operator/src/test/java/io/strimzi/operator/user/operator/ScramShaCredentialsTest.java b/user-operator/src/test/java/io/strimzi/operator/user/operator/ScramShaCredentialsTest.java
new file mode 100644
index 00000000000..49ecb5e70bb
--- /dev/null
+++ b/user-operator/src/test/java/io/strimzi/operator/user/operator/ScramShaCredentialsTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2018, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.operator.user.operator;
+
+import io.strimzi.test.EmbeddedZooKeeper;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ScramShaCredentialsTest {
+
+ private static EmbeddedZooKeeper zkServer;
+
+ private ScramShaCredentials ss;
+
+ @BeforeClass
+ public static void startZk() throws IOException, InterruptedException {
+ zkServer = new EmbeddedZooKeeper();
+ }
+
+ @AfterClass
+ public static void stopZk() {
+ zkServer.close();
+ }
+
+ @Before
+ public void createSS() {
+ ss = new ScramShaCredentials(zkServer.getZkConnectString());
+ }
+
+ @Test
+ public void normalCreate() {
+ ss.createOrUpdate("normalCreate", "foo-password", 10000);
+ }
+
+ @Test
+ public void doubleCreate() {
+ ss.createOrUpdate("doubleCreate", "foo-password", 10000);
+ ss.createOrUpdate("doubleCreate", "foo-password", 10000);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void tooFewIterations() {
+ ss.createOrUpdate("tooFewIterations", "foo-password", 1);
+ }
+
+ @Test
+ public void normalDelete() {
+ ss.createOrUpdate("normalDelete", "foo-password", 10000);
+ ss.delete("normalDelete");
+ }
+
+ @Test
+ public void doubleDelete() {
+ ss.createOrUpdate("doubleDelete", "foo-password", 10000);
+ ss.delete("doubleDelete");
+ ss.delete("doubleDelete");
+ }
+
+ @Test
+ public void changePassword() {
+ ss.createOrUpdate("changePassword", "changePassword-password", 10000);
+ ss.createOrUpdate("changePassword", "changePassword-password2", 10000);
+ }
+
+ @Test
+ public void userExists() {
+ ss.createOrUpdate("userExists", "foo-password", 10000);
+ assertTrue(ss.exists("userExists"));
+
+ }
+
+ @Test
+ public void userNotExists() {
+ assertFalse(ss.exists("userNotExists"));
+ }
+
+ @Test
+ public void listSome() {
+ ss.createOrUpdate("listSome", "foo-password", 10000);
+ assertTrue(ss.list().contains("listSome"));
+ }
+
+ @Test
+ public void listNone() {
+ for (String user : ss.list()) {
+ ss.delete(user);
+ }
+ assertTrue(ss.list().isEmpty());
+ }
+}