Skip to content

Commit

Permalink
KAFKA-9106 make metrics exposed via jmx configurable (apache#7674)
Browse files Browse the repository at this point in the history
Reviewers: Colin P. McCabe <[email protected]>, Rajini Sivaram <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
xvrl authored Feb 13, 2020
1 parent ea72ede commit 7e1c39f
Show file tree
Hide file tree
Showing 40 changed files with 447 additions and 121 deletions.
5 changes: 5 additions & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" />

<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
<disallow class="com.yammer.metrics.Metrics" />
<allow pkg="com.yammer.metrics"/>

<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="kafka.admin" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
reporters.add(new JmxReporter(JMX_PREFIX));
JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
metrics = new Metrics(metricConfig, reporters, time);
String metricGrpPrefix = "admin-client";
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,9 @@ private static Metrics buildMetrics(ConsumerConfig config, Time time, String cli
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
return new Metrics(metricConfig, reporters, time);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
jmxReporter.configure(userProvidedConfigs);
reporters.add(jmxReporter);
this.metrics = new Metrics(metricConfig, reporters, time);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Sanitizer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,16 +38,32 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

/**
* Register metrics in JMX as dynamic mbeans based on the metric names
*/
public class JmxReporter implements MetricsReporter {

public static final String METRICS_CONFIG_PREFIX = "metrics.jmx.";

public static final String BLACKLIST_CONFIG = METRICS_CONFIG_PREFIX + "blacklist";
public static final String WHITELIST_CONFIG = METRICS_CONFIG_PREFIX + "whitelist";

public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(WHITELIST_CONFIG,
BLACKLIST_CONFIG);

public static final String DEFAULT_WHITELIST = ".*";
public static final String DEFAULT_BLACKLIST = "";

private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
private static final Object LOCK = new Object();
private String prefix;
private final Map<String, KafkaMbean> mbeans = new HashMap<>();
private Predicate<String> mbeanPredicate = s -> true;

public JmxReporter() {
this("");
Expand All @@ -59,15 +77,46 @@ public JmxReporter(String prefix) {
}

@Override
public void configure(Map<String, ?> configs) {}
public void configure(Map<String, ?> configs) {
reconfigure(configs);
}

@Override
public Set<String> reconfigurableConfigs() {
return RECONFIGURABLE_CONFIGS;
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
compilePredicate(configs);
}

@Override
public void reconfigure(Map<String, ?> configs) {
synchronized (LOCK) {
this.mbeanPredicate = JmxReporter.compilePredicate(configs);

mbeans.forEach((name, mbean) -> {
if (mbeanPredicate.test(name)) {
reregister(mbean);
} else {
unregister(mbean);
}
});
}
}

@Override
public void init(List<KafkaMetric> metrics) {
synchronized (LOCK) {
for (KafkaMetric metric : metrics)
addAttribute(metric);
for (KafkaMbean mbean : mbeans.values())
reregister(mbean);

mbeans.forEach((name, mbean) -> {
if (mbeanPredicate.test(name)) {
reregister(mbean);
}
});
}
}

Expand All @@ -78,8 +127,10 @@ public boolean containsMbean(String mbeanName) {
@Override
public void metricChange(KafkaMetric metric) {
synchronized (LOCK) {
KafkaMbean mbean = addAttribute(metric);
reregister(mbean);
String mbeanName = addAttribute(metric);
if (mbeanName != null && mbeanPredicate.test(mbeanName)) {
reregister(mbeans.get(mbeanName));
}
}
}

Expand All @@ -93,7 +144,7 @@ public void metricRemoval(KafkaMetric metric) {
if (mbean.metrics.isEmpty()) {
unregister(mbean);
mbeans.remove(mBeanName);
} else
} else if (mbeanPredicate.test(mBeanName))
reregister(mbean);
}
}
Expand All @@ -107,15 +158,15 @@ private KafkaMbean removeAttribute(KafkaMetric metric, String mBeanName) {
return mbean;
}

private KafkaMbean addAttribute(KafkaMetric metric) {
private String addAttribute(KafkaMetric metric) {
try {
MetricName metricName = metric.metricName();
String mBeanName = getMBeanName(prefix, metricName);
if (!this.mbeans.containsKey(mBeanName))
mbeans.put(mBeanName, new KafkaMbean(mBeanName));
KafkaMbean mbean = this.mbeans.get(mBeanName);
mbean.setAttribute(metricName.name(), metric);
return mbean;
return mBeanName;
} catch (JMException e) {
throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
}
Expand Down Expand Up @@ -244,4 +295,27 @@ public AttributeList setAttributes(AttributeList list) {

}

public static Predicate<String> compilePredicate(Map<String, ?> configs) {
String whitelist = (String) configs.get(WHITELIST_CONFIG);
String blacklist = (String) configs.get(BLACKLIST_CONFIG);

if (whitelist == null) {
whitelist = DEFAULT_WHITELIST;
}

if (blacklist == null) {
blacklist = DEFAULT_BLACKLIST;
}

try {
Pattern whitelistPattern = Pattern.compile(whitelist);
Pattern blacklistPattern = Pattern.compile(blacklist);

return s -> whitelistPattern.matcher(s).matches()
&& !blacklistPattern.matcher(s).matches();
} catch (PatternSyntaxException e) {
throw new ConfigException("JMX filter for configuration" + METRICS_CONFIG_PREFIX
+ ".(whitelist/blacklist) is not a valid regular expression");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
*/
package org.apache.kafka.common.metrics;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;

/**
* A plugin interface to allow things to listen as new metrics are created so they can be reported.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
public interface MetricsReporter extends Configurable, AutoCloseable {
public interface MetricsReporter extends Reconfigurable, AutoCloseable {

/**
* This is called when the reporter is first registered to initially register all existing metrics
Expand All @@ -50,4 +54,15 @@ public interface MetricsReporter extends Configurable, AutoCloseable {
*/
void close();

// default methods for backwards compatibility with reporters that only implement Configurable
default Set<String> reconfigurableConfigs() {
return Collections.emptySet();
}

default void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
}

default void reconfigure(Map<String, ?> configs) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -111,4 +113,46 @@ public void testJmxRegistrationSanitization() throws Exception {
metrics.close();
}
}

@Test
public void testPredicateAndDynamicReload() throws Exception {
Metrics metrics = new Metrics();
MBeanServer server = ManagementFactory.getPlatformMBeanServer();

Map<String, String> configs = new HashMap<>();

configs.put(JmxReporter.BLACKLIST_CONFIG,
JmxReporter.getMBeanName("", metrics.metricName("pack.bean2.total", "grp2")));

try {
JmxReporter reporter = new JmxReporter();
reporter.configure(configs);
metrics.addReporter(reporter);

Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(metrics.metricName("pack.bean2.avg", "grp1"), new Avg());
sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new CumulativeSum());
sensor.record();

assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
assertEquals(1.0, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean2.avg"));
assertFalse(server.isRegistered(new ObjectName(":type=grp2")));

sensor.record();

configs.put(JmxReporter.BLACKLIST_CONFIG,
JmxReporter.getMBeanName("", metrics.metricName("pack.bean2.avg", "grp1")));

reporter.reconfigure(configs);

assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
assertTrue(server.isRegistered(new ObjectName(":type=grp2")));
assertEquals(2.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total"));

metrics.removeMetric(metrics.metricName("pack.bean2.total", "grp2"));
assertFalse(server.isRegistered(new ObjectName(":type=grp2")));
} finally {
metrics.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ Map<String, Object> sourceAdminConfig() {
List<MetricsReporter> metricsReporters() {
List<MetricsReporter> reporters = getConfiguredInstances(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter("kafka.connect.mirror"));
JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
jmxReporter.configure(this.originals());
reporters.add(jmxReporter);
return reporters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,20 @@ public class ConnectMetrics {
* @param time the time; may not be null
*/
public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
this(workerId, time, config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG),
config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG),
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class));
}

public ConnectMetrics(String workerId, Time time, int numSamples, long sampleWindowMs, String metricsRecordingLevel,
List<MetricsReporter> reporters) {
this.workerId = workerId;
this.time = time;

int numSamples = config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG);
long sampleWindowMs = config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG);
String metricsRecordingLevel = config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG);
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);

MetricConfig metricConfig = new MetricConfig().samples(numSamples)
.timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
Sensor.RecordingLevel.forName(metricsRecordingLevel));
reporters.add(new JmxReporter(JMX_PREFIX));
JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
this.metrics = new Metrics(metricConfig, reporters, time);
LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics, time.milliseconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ public WorkerGroupMember(DistributedConfig config,
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.util.ConnectorTaskId;

import java.util.ArrayList;

/**
* Contains various sensors used for monitoring errors.
*/
Expand All @@ -45,13 +43,6 @@ public class ErrorHandlingMetrics {
private final Sensor dlqProduceFailures;
private long lastErrorTime = 0;

// for testing only
public ErrorHandlingMetrics() {
this(new ConnectorTaskId("noop-connector", -1),
new ConnectMetrics("noop-worker", new SystemTime(), 2, 3000, Sensor.RecordingLevel.INFO.toString(),
new ArrayList<>()));
}

public ErrorHandlingMetrics(ConnectorTaskId id, ConnectMetrics connectMetrics) {

ConnectMetricsRegistry registry = connectMetrics.registry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@
import static org.junit.Assert.assertTrue;

@PowerMockIgnore({"javax.management.*",
"org.apache.log4j.*",
"org.apache.kafka.connect.runtime.isolation.*"})
"org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic";
Expand Down
Loading

0 comments on commit 7e1c39f

Please sign in to comment.