Skip to content

Commit

Permalink
Move connect configuration set up to the operator
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Jan 24, 2025
1 parent 984f294 commit c7e3a26
Show file tree
Hide file tree
Showing 11 changed files with 1,015 additions and 300 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Store Kafka node certificates in separate Secrets, one Secret per pod.
* Allow configuring `ssl.principal.mapping.rules` and custom trusted CAs in Kafka brokers with `type: custom` authentication
* Moved HTTP bridge configuration to the ConfigMap setup by the operator.
* Moved Kafka Connect configuration to the ConfigMap created by the operator.

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,20 @@ public class KafkaConnectCluster extends AbstractModel implements SupportsMetric
protected static final String EXTERNAL_CONFIGURATION_VOLUME_MOUNT_BASE_PATH = "/opt/kafka/external-configuration/";
protected static final String EXTERNAL_CONFIGURATION_VOLUME_NAME_PREFIX = "ext-conf-";
protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/kafka/oauth-certs/";
protected static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "kafka-metrics-and-logging";
protected static final String LOG_AND_METRICS_CONFIG_VOLUME_MOUNT = "/opt/kafka/custom-config/";
protected static final String CONNECT_CONFIG_VOLUME_NAME = "kafka-connect-configurations";
protected static final String CONNECT_CONFIG_VOLUME_MOUNT = "/opt/kafka/custom-config/";

// Configuration defaults
private static final Probe DEFAULT_HEALTHCHECK_OPTIONS = new ProbeBuilder().withTimeoutSeconds(5).withInitialDelaySeconds(60).build();

/**
* Key under which the Connect configuration is stored in ConfigMap
*/
static final String KAFKA_CONNECT_CONFIGURATION_FILENAME = "kafka-connect.properties";

// Kafka Connect configuration keys (EnvVariables)
protected static final String ENV_VAR_PREFIX = "KAFKA_CONNECT_";
protected static final String ENV_VAR_KAFKA_CONNECT_CONFIGURATION = "KAFKA_CONNECT_CONFIGURATION";
protected static final String ENV_VAR_KAFKA_CONNECT_METRICS_ENABLED = "KAFKA_CONNECT_METRICS_ENABLED";
protected static final String ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS = "KAFKA_CONNECT_BOOTSTRAP_SERVERS";
protected static final String ENV_VAR_KAFKA_CONNECT_TLS = "KAFKA_CONNECT_TLS";
protected static final String ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS = "KAFKA_CONNECT_TRUSTED_CERTS";
protected static final String ENV_VAR_KAFKA_CONNECT_TLS_AUTH_CERT = "KAFKA_CONNECT_TLS_AUTH_CERT";
protected static final String ENV_VAR_KAFKA_CONNECT_TLS_AUTH_KEY = "KAFKA_CONNECT_TLS_AUTH_KEY";
Expand All @@ -129,7 +131,7 @@ public class KafkaConnectCluster extends AbstractModel implements SupportsMetric
private Rack rack;
private String initImage;
protected String serviceName;
protected String loggingAndMetricsConfigMapName;
protected String connectConfigMapName;

protected String bootstrapServers;
@SuppressWarnings("deprecation") // External Configuration environment variables are deprecated
Expand Down Expand Up @@ -188,7 +190,7 @@ protected KafkaConnectCluster(Reconciliation reconciliation, HasMetadata resourc
super(reconciliation, resource, name, componentType, sharedEnvironmentProvider);

this.serviceName = KafkaConnectResources.serviceName(cluster);
this.loggingAndMetricsConfigMapName = KafkaConnectResources.metricsAndLogConfigMapName(cluster);
this.connectConfigMapName = KafkaConnectResources.metricsAndLogConfigMapName(cluster);
}

/**
Expand Down Expand Up @@ -370,7 +372,7 @@ protected List<ContainerPort> getContainerPortList() {
protected List<Volume> getVolumes(boolean isOpenShift) {
List<Volume> volumeList = new ArrayList<>(2);
volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, loggingAndMetricsConfigMapName));
volumeList.add(VolumeUtils.createConfigMapVolume(CONNECT_CONFIG_VOLUME_NAME, connectConfigMapName));

if (rack != null) {
volumeList.add(VolumeUtils.createEmptyDirVolume(INIT_VOLUME_NAME, "1Mi", "Memory"));
Expand Down Expand Up @@ -434,7 +436,7 @@ private List<Volume> getExternalConfigurationVolumes(boolean isOpenShift) {
protected List<VolumeMount> getVolumeMounts() {
List<VolumeMount> volumeMountList = new ArrayList<>(2);
volumeMountList.add(VolumeUtils.createTempDirVolumeMount());
volumeMountList.add(VolumeUtils.createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT));
volumeMountList.add(VolumeUtils.createVolumeMount(CONNECT_CONFIG_VOLUME_NAME, CONNECT_CONFIG_VOLUME_MOUNT));

if (rack != null) {
volumeMountList.add(VolumeUtils.createVolumeMount(INIT_VOLUME_NAME, INIT_VOLUME_MOUNT));
Expand Down Expand Up @@ -610,9 +612,7 @@ protected String getCommand() {

protected List<EnvVar> getEnvVars() {
List<EnvVar> varList = new ArrayList<>();
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_CONFIGURATION, configuration.getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_METRICS_ENABLED, String.valueOf(metrics.isEnabled())));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_BOOTSTRAP_SERVERS, bootstrapServers));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));

JvmOptionUtils.heapOptions(varList, 75, 0L, jvmOptions, resources);
Expand Down Expand Up @@ -642,8 +642,6 @@ protected List<EnvVar> getEnvVars() {
}

private void populateTLSEnvVars(final List<EnvVar> varList) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_TLS, "true"));

if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CONNECT_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}
Expand Down Expand Up @@ -835,21 +833,38 @@ protected Map<String, String> defaultPodLabels() {
}

/**
* Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging
* or metrics, they will nto be set.
* Generates a ConfigMap containing Connect configurations.
* It also generates the metrics and logging configuration. If this operand doesn't support logging
* or metrics, they will not be set.
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
*
* @return The generated ConfigMap
*/
public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) {
public ConfigMap generateConnectConfigMap(MetricsAndLogging metricsAndLogging) {
// generate the ConfigMap data entries for the metrics and logging configuration
Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging);
// add the ConfigMap data entry for Connect configurations
data.put(
KAFKA_CONNECT_CONFIGURATION_FILENAME,
new KafkaConnectConfigurationBuilder(bootstrapServers)
.withConfigProviders()
.withRestListeners(REST_API_PORT)
.withPluginPath()
.withConfigurations(configuration.getConfiguration())
.withTls(tls)
.withAuthentication(authentication)
.withRackId()
.build()
);

return ConfigMapUtils
.createConfigMap(
loggingAndMetricsConfigMapName,
connectConfigMapName,
namespace,
labels,
ownerReference,
ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)
data
);
}

Expand Down
Loading

0 comments on commit c7e3a26

Please sign in to comment.