Skip to content

Commit

Permalink
Add support for the Strimzi Metrics Reporter to Kafka component
Browse files Browse the repository at this point in the history
This patch adds support for the Strimzi Metrics Reporter to Kafka component as described by the following proposal:

https://github.com/strimzi/proposals/blob/main/064-prometheus-metrics-reporter.md

Signed-off-by: Owen <[email protected]>
  • Loading branch information
ocorriga committed Jan 16, 2025
1 parent 58f31ba commit e569be2
Show file tree
Hide file tree
Showing 39 changed files with 1,082 additions and 183 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Support for MirrorMaker 1 has been removed
* Added support to configure `dnsPolicy` and `dnsConfig` using the `template` sections.
* Store Kafka node certificates in separate Secrets, one Secret per pod.
* Added support for Strimzi Metrics Reporter to the Kafka component.

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.strimzi.api.kafka.model.common.metrics;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.ExternalConfigurationReference;
Expand All @@ -31,7 +30,6 @@ public class JmxPrometheusExporterMetrics extends MetricsConfig {
private ExternalConfigurationReference valueFrom;

@Description("ConfigMap entry where the Prometheus JMX Exporter configuration is stored. ")
@JsonProperty(required = true)
public ExternalConfigurationReference getValueFrom() {
return valueFrom;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = JmxPrometheusExporterMetrics.TYPE_JMX_EXPORTER, value = JmxPrometheusExporterMetrics.class),
@JsonSubTypes.Type(name = StrimziMetricsReporter.TYPE_STRIMZI_METRICS_REPORTER, value = StrimziMetricsReporter.class)
})
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
@ToString
public abstract class MetricsConfig implements UnknownPropertyPreserving {
private Map<String, Object> additionalProperties;

@Description("Metrics type. Only 'jmxPrometheusExporter' supported currently.")
@Description("Metrics type. " +
"The supported types are `jmxPrometheusExporter` and `strimziMetricsReporter`. " +
"Type `jmxPrometheusExporter` uses the Prometheus JMX Exporter to expose Kafka JMX metrics in Prometheus format through an HTTP endpoint. " +
"Type `strimziMetricsReporter` uses the Strimzi Metrics Reporter to directly expose Kafka metrics in Prometheus format through an HTTP endpoint.")
public abstract String getType();

@Override
Expand All @@ -45,5 +49,4 @@ public void setAdditionalProperty(String name, Object value) {
}
this.additionalProperties.put(name, value);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.api.kafka.model.common.metrics;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.crdgenerator.annotations.Description;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Strimzi Metrics Reporter.
*/
@Buildable(
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonPropertyOrder({"type", "values"})
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class StrimziMetricsReporter extends MetricsConfig {
public static final String TYPE_STRIMZI_METRICS_REPORTER = "strimziMetricsReporter";

private StrimziMetricsReporterValues values;

@Description("Must be `" + TYPE_STRIMZI_METRICS_REPORTER + "`")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Override
public String getType() {
return TYPE_STRIMZI_METRICS_REPORTER;
}

@Description("Configuration values for the Strimzi Metrics Reporter.")
public StrimziMetricsReporterValues getValues() {
return values;
}

public void setValues(StrimziMetricsReporterValues values) {
this.values = values;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.api.kafka.model.common.metrics;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.crdgenerator.annotations.Description;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Strimzi Metrics Reporter configuration.
*/
@Buildable(
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"allowList"})
@EqualsAndHashCode()
@ToString
public class StrimziMetricsReporterValues implements UnknownPropertyPreserving {
private static final String DEFAULT_REGEX = ".*";

private List<String> allowList = List.of(DEFAULT_REGEX);
private Map<String, Object> additionalProperties;

@Description("A comma separated list of regex patterns to specify the metrics to collect. Default: `" + DEFAULT_REGEX + "`.")
@JsonInclude(value = JsonInclude.Include.NON_NULL)
public List<String> getAllowList() {
return allowList;
}

public void setAllowList(List<String> allowList) {
this.allowList = allowList;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties != null ? this.additionalProperties : Map.of();
}

@Override
public void setAdditionalProperty(String name, Object value) {
if (this.additionalProperties == null) {
this.additionalProperties = new HashMap<>(2);
}
this.additionalProperties.put(name, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class KafkaClusterSpec implements HasConfigurableMetrics, HasConfigurable
+ "cruise.control.metrics.topic, cruise.control.metrics.reporter.bootstrap.servers, "
+ "node.id, process.roles, controller., metadata.log.dir, zookeeper.metadata.migration.enable, " // KRaft options
+ "client.quota.callback.static.kafka.admin., client.quota.callback.static.produce, client.quota.callback.static.fetch, "
+ "client.quota.callback.static.storage.per.volume.limit.min.available., client.quota.callback.static.excluded.principal.name.list";
+ "client.quota.callback.static.storage.per.volume.limit.min.available., client.quota.callback.static.excluded.principal.name.list, "
+ "kafka.metric.reporters, prometheus.metrics.reporter.";

public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "zookeeper.connection.timeout.ms, sasl.server.max.receive.size, "
+ "ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols, ssl.secure.random.implementation, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static Map<String, String> generateMetricsAndLogConfigMapData(Reconciliat
data.put(supportsLogging.logging().configMapKey(), supportsLogging.logging().loggingConfiguration(reconciliation, metricsAndLogging.loggingCm()));
}

if (model instanceof SupportsMetrics supportMetrics) {
if (model instanceof SupportsMetrics supportMetrics && supportMetrics.metrics() != null) {
String parseResult = supportMetrics.metrics().metricsJson(reconciliation, metricsAndLogging.metricsCm());
if (parseResult != null) {
data.put(MetricsModel.CONFIG_MAP_KEY, parseResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyIngressRule;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPeer;
import io.strimzi.api.kafka.model.common.JvmOptions;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
import io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporter;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
import io.strimzi.api.kafka.model.common.template.InternalServiceTemplate;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
Expand All @@ -45,6 +47,7 @@
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
Expand Down Expand Up @@ -95,7 +98,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup
*/
public static final String API_AUTH_CREDENTIALS_FILE = API_AUTH_CONFIG_VOLUME_MOUNT + CruiseControlApiProperties.AUTH_FILE_KEY;

protected static final String ENV_VAR_CRUISE_CONTROL_METRICS_ENABLED = "CRUISE_CONTROL_METRICS_ENABLED";
protected static final String ENV_VAR_CRUISE_CONTROL_JMX_EXPORTER_ENABLED = "CRUISE_CONTROL_JMX_EXPORTER_ENABLED";

/**
* Annotation for rolling a cluster whenever the server configuration has changed.
Expand All @@ -118,7 +121,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
protected Capacity capacity;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
private MetricsModel metrics;
private MetricsModel jmxExporterMetrics;
private LoggingModel logging;
/* test */ CruiseControlConfiguration configuration;

Expand Down Expand Up @@ -217,7 +220,15 @@ public static CruiseControl fromCrd(
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
result.gcLoggingEnabled = ccSpec.getJvmOptions() == null ? JvmOptions.DEFAULT_GC_LOGGING_ENABLED : ccSpec.getJvmOptions().isGcLoggingEnabled();
result.jvmOptions = ccSpec.getJvmOptions();
result.metrics = new MetricsModel(ccSpec);

if (ccSpec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.jmxExporterMetrics = new MetricsModel(ccSpec);
} else if (ccSpec.getMetricsConfig() instanceof StrimziMetricsReporter) {
// Cruise Control own metrics are only exported through JMX
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported for Cruise Control");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported for Cruise Control");
}

result.logging = new LoggingModel(ccSpec, result.getClass().getSimpleName(), true, false);
result.resources = ccSpec.getResources();

Expand Down Expand Up @@ -306,7 +317,7 @@ protected List<ContainerPort> getContainerPortList() {

portList.add(ContainerUtils.createContainerPort(REST_API_PORT_NAME, REST_API_PORT));

if (metrics.isEnabled()) {
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
portList.add(ContainerUtils.createContainerPort(MetricsModel.METRICS_PORT_NAME, MetricsModel.METRICS_PORT));
}

Expand Down Expand Up @@ -394,7 +405,8 @@ public Deployment generateDeployment(Map<String, String> annotations, boolean is
protected List<EnvVar> getEnvVars() {
List<EnvVar> varList = new ArrayList<>();

varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_METRICS_ENABLED, String.valueOf(metrics.isEnabled())));
String jmxMetricsEnabled = jmxExporterMetrics != null && jmxExporterMetrics.isEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_JMX_EXPORTER_ENABLED, jmxMetricsEnabled));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_BOOTSTRAP_SERVERS, KafkaResources.bootstrapServiceName(cluster) + ":" + KafkaCluster.REPLICATION_PORT));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));

Expand Down Expand Up @@ -475,7 +487,7 @@ public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels oper
rules.add(NetworkPolicyUtils.createIngressRule(REST_API_PORT, peers));

// Everyone can access metrics
if (metrics.isEnabled()) {
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
rules.add(NetworkPolicyUtils.createIngressRule(MetricsModel.METRICS_PORT, List.of()));
}

Expand All @@ -500,7 +512,7 @@ public HashLoginServiceApiCredentials apiCredentials() {
* @return Metrics Model instance for configuring Prometheus metrics
*/
public MetricsModel metrics() {
return metrics;
return jmxExporterMetrics;
}

/**
Expand Down
Loading

0 comments on commit e569be2

Please sign in to comment.