diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java index fabc47d9d4..e0668aa5f3 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java @@ -25,13 +25,14 @@ import javax.management.*; import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.ToDoubleFunction; import static java.util.Collections.emptyList; @@ -88,126 +89,141 @@ private static MBeanServer getMBeanServer() { @Override public void bindTo(MeterRegistry registry) { - registerMetricsEventually("consumer-fetch-manager-metrics", (o, tags) -> { + registerMetricsEventually(registry, "consumer-fetch-manager-metrics", (o, tags) -> { + List meters = new ArrayList<>(); + // metrics reported per consumer, topic and partition if (tags.stream().anyMatch(t -> t.getKey().equals("topic")) && tags.stream().anyMatch(t -> t.getKey().equals("partition"))) { - registerGaugeForObject(registry, o, "records-lag", tags, "The latest lag of the partition", "records"); - registerGaugeForObject(registry, o, "records-lag-avg", tags, "The average lag of the partition", "records"); - registerGaugeForObject(registry, o, "records-lag-max", tags, "The maximum lag in terms of number of records for any partition in this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.", "records"); + meters.add(registerGaugeForObject(registry, o, "records-lag", tags, "The latest lag of the partition", "records")); + meters.add(registerGaugeForObject(registry, o, "records-lag-avg", tags, "The average lag of the partition", "records")); + meters.add(registerGaugeForObject(registry, o, "records-lag-max", tags, "The maximum lag in terms of number of records for any partition in this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.", "records")); if (kafkaMajorVersion(tags) >= 2) { // KAFKA-6184 - registerGaugeForObject(registry, o, "records-lead", tags, "The latest lead of the partition.", "records"); - registerGaugeForObject(registry, o, "records-lead-min", tags, "The min lead of the partition. The lag between the consumer offset and the start offset of the log. If this gets close to zero, it's an indication that the consumer may lose data soon.", "records"); - registerGaugeForObject(registry, o, "records-lead-avg", tags, "The average lead of the partition.", "records"); + meters.add(registerGaugeForObject(registry, o, "records-lead", tags, "The latest lead of the partition.", "records")); + meters.add(registerGaugeForObject(registry, o, "records-lead-min", tags, "The min lead of the partition. The lag between the consumer offset and the start offset of the log. If this gets close to zero, it's an indication that the consumer may lose data soon.", "records")); + meters.add(registerGaugeForObject(registry, o, "records-lead-avg", tags, "The average lead of the partition.", "records")); } // metrics reported per consumer and topic } else if (tags.stream().anyMatch(t -> t.getKey().equals("topic"))) { - registerGaugeForObject(registry, o, "fetch-size-avg", tags, "The average number of bytes fetched per request.", BaseUnits.BYTES); - registerGaugeForObject(registry, o, "fetch-size-max", tags, "The maximum number of bytes fetched per request.", BaseUnits.BYTES); - registerGaugeForObject(registry, o, "records-per-request-avg", tags, "The average number of records in each request.", "records"); - registerFunctionCounterForObject(registry, o, "bytes-consumed-total", tags, "The total number of bytes consumed.", BaseUnits.BYTES); - registerFunctionCounterForObject(registry, o, "records-consumed-total", tags, "The total number of records consumed.", "records"); + meters.add(registerGaugeForObject(registry, o, "fetch-size-avg", tags, "The average number of bytes fetched per request.", BaseUnits.BYTES)); + meters.add(registerGaugeForObject(registry, o, "fetch-size-max", tags, "The maximum number of bytes fetched per request.", BaseUnits.BYTES)); + meters.add(registerGaugeForObject(registry, o, "records-per-request-avg", tags, "The average number of records in each request.", "records")); + meters.add(registerFunctionCounterForObject(registry, o, "bytes-consumed-total", tags, "The total number of bytes consumed.", BaseUnits.BYTES)); + meters.add(registerFunctionCounterForObject(registry, o, "records-consumed-total", tags, "The total number of records consumed.", "records")); // metrics reported just per consumer } else { - registerFunctionCounterForObject(registry, o, "fetch-total", tags, "The number of fetch requests.", "requests"); - registerTimeGaugeForObject(registry, o, "fetch-latency-avg", tags, "The average time taken for a fetch request."); - registerTimeGaugeForObject(registry, o, "fetch-latency-max", tags, "The max time taken for a fetch request."); - registerTimeGaugeForObject(registry, o, "fetch-throttle-time-avg", tags, "The average throttle time. When quotas are enabled, the broker may delay fetch requests in order to throttle a consumer which has exceeded its limit. This metric indicates how throttling time has been added to fetch requests on average."); - registerTimeGaugeForObject(registry, o, "fetch-throttle-time-max", tags, "The maximum throttle time."); + meters.add(registerFunctionCounterForObject(registry, o, "fetch-total", tags, "The number of fetch requests.", "requests")); + meters.add(registerTimeGaugeForObject(registry, o, "fetch-latency-avg", tags, "The average time taken for a fetch request.")); + meters.add(registerTimeGaugeForObject(registry, o, "fetch-latency-max", tags, "The max time taken for a fetch request.")); + meters.add(registerTimeGaugeForObject(registry, o, "fetch-throttle-time-avg", tags, "The average throttle time. When quotas are enabled, the broker may delay fetch requests in order to throttle a consumer which has exceeded its limit. This metric indicates how throttling time has been added to fetch requests on average.")); + meters.add(registerTimeGaugeForObject(registry, o, "fetch-throttle-time-max", tags, "The maximum throttle time.")); } + return meters; }); - registerMetricsEventually("consumer-coordinator-metrics", (o, tags) -> { - registerGaugeForObject(registry, o, "assigned-partitions", tags, "The number of partitions currently assigned to this consumer.", "partitions"); - registerGaugeForObject(registry, o, "commit-rate", tags, "The number of commit calls per second.", "commits"); - registerGaugeForObject(registry, o, "join-rate", tags, "The number of group joins per second. Group joining is the first phase of the rebalance protocol. A large value indicates that the consumer group is unstable and will likely be coupled with increased lag.", "joins"); - registerGaugeForObject(registry, o, "sync-rate", tags, "The number of group syncs per second. Group synchronization is the second and last phase of the rebalance protocol. A large value indicates group instability.", "syncs"); - registerGaugeForObject(registry, o, "heartbeat-rate", tags, "The average number of heartbeats per second. After a rebalance, the consumer sends heartbeats to the coordinator to keep itself active in the group. You may see a lower rate than configured if the processing loop is taking more time to handle message batches. Usually this is OK as long as you see no increase in the join rate.", "heartbeats"); - - registerTimeGaugeForObject(registry, o, "commit-latency-avg", tags, "The average time taken for a commit request."); - registerTimeGaugeForObject(registry, o, "commit-latency-max", tags, "The max time taken for a commit request."); - registerTimeGaugeForObject(registry, o, "join-time-avg", tags, "The average time taken for a group rejoin. This value can get as high as the configured session timeout for the consumer, but should usually be lower."); - registerTimeGaugeForObject(registry, o, "join-time-max", tags, "The max time taken for a group rejoin. This value should not get much higher than the configured session timeout for the consumer."); - registerTimeGaugeForObject(registry, o, "sync-time-avg", tags, "The average time taken for a group sync."); - registerTimeGaugeForObject(registry, o, "sync-time-max", tags, "The max time taken for a group sync."); - registerTimeGaugeForObject(registry, o, "heartbeat-response-time-max", tags, "The max time taken to receive a response to a heartbeat request."); - registerTimeGaugeForObject(registry, o, "last-heartbeat-seconds-ago", "last-heartbeat", tags, - "The time since the last controller heartbeat.", TimeUnit.SECONDS); + registerMetricsEventually(registry, "consumer-coordinator-metrics", (o, tags) -> { + List meters = new ArrayList<>(); + + meters.add(registerGaugeForObject(registry, o, "assigned-partitions", tags, "The number of partitions currently assigned to this consumer.", "partitions")); + meters.add(registerGaugeForObject(registry, o, "commit-rate", tags, "The number of commit calls per second.", "commits")); + meters.add(registerGaugeForObject(registry, o, "join-rate", tags, "The number of group joins per second. Group joining is the first phase of the rebalance protocol. A large value indicates that the consumer group is unstable and will likely be coupled with increased lag.", "joins")); + meters.add(registerGaugeForObject(registry, o, "sync-rate", tags, "The number of group syncs per second. Group synchronization is the second and last phase of the rebalance protocol. A large value indicates group instability.", "syncs")); + meters.add(registerGaugeForObject(registry, o, "heartbeat-rate", tags, "The average number of heartbeats per second. After a rebalance, the consumer sends heartbeats to the coordinator to keep itself active in the group. You may see a lower rate than configured if the processing loop is taking more time to handle message batches. Usually this is OK as long as you see no increase in the join rate.", "heartbeats")); + + meters.add(registerTimeGaugeForObject(registry, o, "commit-latency-avg", tags, "The average time taken for a commit request.")); + meters.add(registerTimeGaugeForObject(registry, o, "commit-latency-max", tags, "The max time taken for a commit request.")); + meters.add(registerTimeGaugeForObject(registry, o, "join-time-avg", tags, "The average time taken for a group rejoin. This value can get as high as the configured session timeout for the consumer, but should usually be lower.")); + meters.add(registerTimeGaugeForObject(registry, o, "join-time-max", tags, "The max time taken for a group rejoin. This value should not get much higher than the configured session timeout for the consumer.")); + meters.add(registerTimeGaugeForObject(registry, o, "sync-time-avg", tags, "The average time taken for a group sync.")); + meters.add(registerTimeGaugeForObject(registry, o, "sync-time-max", tags, "The max time taken for a group sync.")); + meters.add(registerTimeGaugeForObject(registry, o, "heartbeat-response-time-max", tags, "The max time taken to receive a response to a heartbeat request.")); + meters.add(registerTimeGaugeForObject(registry, o, "last-heartbeat-seconds-ago", "last-heartbeat", tags, + "The time since the last controller heartbeat.", TimeUnit.SECONDS)); + return meters; }); - registerMetricsEventually("consumer-metrics", (o, tags) -> { - registerGaugeForObject(registry, o, "connection-count", tags, "The current number of active connections.", "connections"); - registerGaugeForObject(registry, o, "connection-creation-total", tags, "New connections established.", "connections"); - registerGaugeForObject(registry, o, "connection-close-total", tags, "Connections closed.", "connections"); - registerGaugeForObject(registry, o, "io-ratio", tags, "The fraction of time the I/O thread spent doing I/O.", null); - registerGaugeForObject(registry, o, "io-wait-ratio", tags, "The fraction of time the I/O thread spent waiting.", null); - registerGaugeForObject(registry, o, "select-total", tags, "Number of times the I/O layer checked for new I/O to perform.", null); - - registerTimeGaugeForObject(registry, o, "io-time-ns-avg", "io-time-avg", tags, - "The average length of time for I/O per select call.", TimeUnit.NANOSECONDS); - registerTimeGaugeForObject(registry, o, "io-wait-time-ns-avg", "io-wait-time-avg", tags, + registerMetricsEventually(registry, "consumer-metrics", (o, tags) -> { + List meters = new ArrayList<>(); + + meters.add(registerGaugeForObject(registry, o, "connection-count", tags, "The current number of active connections.", "connections")); + meters.add(registerGaugeForObject(registry, o, "connection-creation-total", tags, "New connections established.", "connections")); + meters.add(registerGaugeForObject(registry, o, "connection-close-total", tags, "Connections closed.", "connections")); + meters.add(registerGaugeForObject(registry, o, "io-ratio", tags, "The fraction of time the I/O thread spent doing I/O.", null)); + meters.add(registerGaugeForObject(registry, o, "io-wait-ratio", tags, "The fraction of time the I/O thread spent waiting.", null)); + meters.add(registerGaugeForObject(registry, o, "select-total", tags, "Number of times the I/O layer checked for new I/O to perform.", null)); + + meters.add(registerTimeGaugeForObject(registry, o, "io-time-ns-avg", "io-time-avg", tags, + "The average length of time for I/O per select call.", TimeUnit.NANOSECONDS)); + meters.add(registerTimeGaugeForObject(registry, o, "io-wait-time-ns-avg", "io-wait-time-avg", tags, "The average length of time the I/O thread spent waiting for a socket to be ready for reads or writes.", - TimeUnit.NANOSECONDS); + TimeUnit.NANOSECONDS)); if (kafkaMajorVersion(tags) >= 2) { - registerGaugeForObject(registry, o, "successful-authentication-total", "authentication-attempts", - Tags.concat(tags, "result", "successful"), "The number of successful authentication attempts.", null); - registerGaugeForObject(registry, o, "failed-authentication-total", "authentication-attempts", - Tags.concat(tags, "result", "failed"), "The number of failed authentication attempts.", null); + meters.add(registerGaugeForObject(registry, o, "successful-authentication-total", "authentication-attempts", + Tags.concat(tags, "result", "successful"), "The number of successful authentication attempts.", null)); + meters.add(registerGaugeForObject(registry, o, "failed-authentication-total", "authentication-attempts", + Tags.concat(tags, "result", "failed"), "The number of failed authentication attempts.", null)); - registerGaugeForObject(registry, o, "network-io-total", tags, "", BaseUnits.BYTES); - registerGaugeForObject(registry, o, "outgoing-byte-total", tags, "", BaseUnits.BYTES); - registerGaugeForObject(registry, o, "request-total", tags, "", "requests"); - registerGaugeForObject(registry, o, "response-total", tags, "", "responses"); + meters.add(registerGaugeForObject(registry, o, "network-io-total", tags, "", BaseUnits.BYTES)); + meters.add(registerGaugeForObject(registry, o, "outgoing-byte-total", tags, "", BaseUnits.BYTES)); + meters.add(registerGaugeForObject(registry, o, "request-total", tags, "", "requests")); + meters.add(registerGaugeForObject(registry, o, "response-total", tags, "", "responses")); - registerTimeGaugeForObject(registry, o, "io-waittime-total", "io-wait-time-total", tags, + meters.add(registerTimeGaugeForObject(registry, o, "io-waittime-total", "io-wait-time-total", tags, "Time spent on the I/O thread waiting for a socket to be ready for reads or writes.", - TimeUnit.NANOSECONDS); - registerTimeGaugeForObject(registry, o, "iotime-total", "io-time-total", tags, - "Time spent in I/O during select calls.", TimeUnit.NANOSECONDS); + TimeUnit.NANOSECONDS)); + meters.add(registerTimeGaugeForObject(registry, o, "iotime-total", "io-time-total", tags, + "Time spent in I/O during select calls.", TimeUnit.NANOSECONDS)); } + return meters; }); } - private void registerGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, String meterName, Tags allTags, String description, @Nullable String baseUnit) { - final AtomicReference gauge = new AtomicReference<>(); - gauge.set(Gauge + private Gauge registerGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, String meterName, Tags allTags, String description, @Nullable String baseUnit) { + final AtomicReference gaugeReference = new AtomicReference<>(); + Gauge gauge = Gauge .builder(METRIC_NAME_PREFIX + meterName, mBeanServer, - getJmxAttribute(registry, gauge, o, jmxMetricName)) + getJmxAttribute(registry, gaugeReference, o, jmxMetricName)) .description(description) .baseUnit(baseUnit) .tags(allTags) - .register(registry)); + .register(registry); + gaugeReference.set(gauge); + return gauge; } - private void registerGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, Tags allTags, String description, @Nullable String baseUnit) { - registerGaugeForObject(registry, o, jmxMetricName, sanitize(jmxMetricName), allTags, description, baseUnit); + private Gauge registerGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, Tags allTags, String description, @Nullable String baseUnit) { + return registerGaugeForObject(registry, o, jmxMetricName, sanitize(jmxMetricName), allTags, description, baseUnit); } - private void registerFunctionCounterForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, Tags allTags, String description, @Nullable String baseUnit) { - final AtomicReference counter = new AtomicReference<>(); - counter.set(FunctionCounter + private FunctionCounter registerFunctionCounterForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, Tags allTags, String description, @Nullable String baseUnit) { + final AtomicReference counterReference = new AtomicReference<>(); + FunctionCounter counter = FunctionCounter .builder(METRIC_NAME_PREFIX + sanitize(jmxMetricName), mBeanServer, - getJmxAttribute(registry, counter, o, jmxMetricName)) + getJmxAttribute(registry, counterReference, o, jmxMetricName)) .description(description) .baseUnit(baseUnit) .tags(allTags) - .register(registry)); + .register(registry); + counterReference.set(counter); + return counter; } - private void registerTimeGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, + private TimeGauge registerTimeGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, String meterName, Tags allTags, String description, TimeUnit timeUnit) { - final AtomicReference timeGauge = new AtomicReference<>(); - timeGauge.set(TimeGauge.builder(METRIC_NAME_PREFIX + meterName, mBeanServer, timeUnit, - getJmxAttribute(registry, timeGauge, o, jmxMetricName)) + final AtomicReference timeGaugeReference = new AtomicReference<>(); + TimeGauge timeGauge = TimeGauge.builder(METRIC_NAME_PREFIX + meterName, mBeanServer, timeUnit, + getJmxAttribute(registry, timeGaugeReference, o, jmxMetricName)) .description(description) .tags(allTags) - .register(registry)); + .register(registry); + timeGaugeReference.set(timeGauge); + return timeGauge; } - private void registerTimeGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, + private TimeGauge registerTimeGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, String meterName, Tags allTags, String description) { - registerTimeGaugeForObject(registry, o, jmxMetricName, meterName, allTags, description, TimeUnit.MILLISECONDS); + return registerTimeGaugeForObject(registry, o, jmxMetricName, meterName, allTags, description, TimeUnit.MILLISECONDS); } private ToDoubleFunction getJmxAttribute(MeterRegistry registry, AtomicReference meter, @@ -220,8 +236,8 @@ private ToDoubleFunction getJmxAttribute(MeterRegistry registry, At }); } - private void registerTimeGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, Tags allTags, String description) { - registerTimeGaugeForObject(registry, o, jmxMetricName, sanitize(jmxMetricName), allTags, description); + private TimeGauge registerTimeGaugeForObject(MeterRegistry registry, ObjectName o, String jmxMetricName, Tags allTags, String description) { + return registerTimeGaugeForObject(registry, o, jmxMetricName, sanitize(jmxMetricName), allTags, description); } int kafkaMajorVersion(Tags tags) { @@ -242,12 +258,13 @@ int kafkaMajorVersion(Tags tags) { return kafkaMajorVersion; } - private void registerMetricsEventually(String type, BiConsumer perObject) { + private void registerMetricsEventually(MeterRegistry registry, String type, BiFunction> perObject) { try { Set objs = mBeanServer.queryNames(new ObjectName(JMX_DOMAIN + ":type=" + type + ",*"), null); if (!objs.isEmpty()) { for (ObjectName o : objs) { - perObject.accept(o, Tags.concat(tags, nameTag(o))); + List meters = perObject.apply(o, Tags.concat(tags, nameTag(o))); + addUnregistrationListener(registry, type, o, meters); } return; } @@ -255,7 +272,7 @@ private void registerMetricsEventually(String type, BiConsumer throw new RuntimeException("Error registering Kafka JMX based metrics", e); } - registerNotificationListener(type, perObject); + registerNotificationListener(registry, type, perObject); } /** @@ -264,25 +281,47 @@ private void registerMetricsEventually(String type, BiConsumer * @param type The Kafka JMX type to listen for. * @param perObject Metric registration handler when a new MBean is created. */ - private void registerNotificationListener(String type, BiConsumer perObject) { - NotificationListener notificationListener = (notification, handback) -> { + private void registerNotificationListener(MeterRegistry registry, String type, BiFunction> perObject) { + NotificationListener registrationListener = (notification, handback) -> { MBeanServerNotification mbs = (MBeanServerNotification) notification; ObjectName o = mbs.getMBeanName(); - perObject.accept(o, Tags.concat(tags, nameTag(o))); + List meters = perObject.apply(o, Tags.concat(tags, nameTag(o))); + addUnregistrationListener(registry, type, o, meters); }; + NotificationFilter registrationFilter = createNotificationFilter(type, + MBeanServerNotification.REGISTRATION_NOTIFICATION); + addNotificationListener(registrationListener, registrationFilter); + } - NotificationFilter filter = (NotificationFilter) notification -> { - if (!MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(notification.getType())) + private void addUnregistrationListener(MeterRegistry registry, String type, ObjectName o, List meters) { + NotificationListener unregistrationListener = (notification2, handback2) -> { + MBeanServerNotification mbs2 = (MBeanServerNotification) notification2; + ObjectName o2 = mbs2.getMBeanName(); + if (o2.equals(o)) { + meters.stream().forEach(registry::remove); + } + }; + NotificationFilter unregistrationFilter = createNotificationFilter(type, + MBeanServerNotification.UNREGISTRATION_NOTIFICATION); + addNotificationListener(unregistrationListener, unregistrationFilter); + } + + private NotificationFilter createNotificationFilter(String type, String notificationType) { + return (NotificationFilter) notification -> { + if (!notificationType.equals(notification.getType())) { return false; + } ObjectName obj = ((MBeanServerNotification) notification).getMBeanName(); return obj.getDomain().equals(JMX_DOMAIN) && obj.getKeyProperty("type").equals(type); }; + } + private void addNotificationListener(NotificationListener listener, NotificationFilter filter) { try { - mBeanServer.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, notificationListener, filter, null); + mBeanServer.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, listener, filter, null); notificationListenerCleanUpRunnables.add(() -> { try { - mBeanServer.removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME, notificationListener); + mBeanServer.removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME, listener); } catch (InstanceNotFoundException | ListenerNotFoundException ignored) { } }); @@ -328,4 +367,5 @@ private static String sanitize(String value) { public void close() { notificationListenerCleanUpRunnables.forEach(Runnable::run); } + } diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java index d45e4994ae..62f1e523e4 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetricsTest.java @@ -15,6 +15,7 @@ */ package io.micrometer.core.instrument.binder.kafka; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -32,6 +33,12 @@ import static org.assertj.core.api.Assertions.assertThat; +/** + * Tests for {@link KafkaConsumerMetrics}. + * + * @author Jon Schneider + * @author Johnny Lim + */ class KafkaConsumerMetricsTest { private final static String TOPIC = "my-example-topic"; private final static String BOOTSTRAP_SERVERS = "localhost:9092"; @@ -48,16 +55,19 @@ void verifyConsumerMetricsWithExpectedTags() { kafkaConsumerMetrics.bindTo(registry); // consumer coordinator metrics - registry.get("kafka.consumer.assigned.partitions").tags(tags).gauge(); + Gauge assignedPartitions = registry.get("kafka.consumer.assigned.partitions").tags(tags).gauge(); + assertThat(assignedPartitions.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); // global connection metrics - registry.get("kafka.consumer.connection.count").tags(tags).gauge(); + Gauge connectionCount = registry.get("kafka.consumer.connection.count").tags(tags).gauge(); + assertThat(connectionCount.getId().getTag("client.id")).startsWith("consumer-" + consumerCount); } } @Test void metricsReportedPerMultipleConsumers() { - try (Consumer consumer = createConsumer(); Consumer consumer2 = createConsumer()) { + try (Consumer consumer = createConsumer(); + Consumer consumer2 = createConsumer()) { MeterRegistry registry = new SimpleMeterRegistry(); kafkaConsumerMetrics.bindTo(registry); @@ -107,6 +117,30 @@ void returnsNegativeKafkaMajorVersionForEmptyTags() { } } + @Test + void consumerBeforeBindingWhenClosedShouldRemoveMeters() { + MeterRegistry registry = new SimpleMeterRegistry(); + try (Consumer consumer = createConsumer()) { + kafkaConsumerMetrics.bindTo(registry); + + Gauge gauge = registry.get("kafka.consumer.assigned.partitions").gauge(); + assertThat(gauge.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); + } + assertThat(registry.find("kafka.consumer.assigned.partitions").gauge()).isNull(); + } + + @Test + void consumerAfterBindingWhenClosedShouldRemoveMeters() { + MeterRegistry registry = new SimpleMeterRegistry(); + kafkaConsumerMetrics.bindTo(registry); + + try (Consumer consumer = createConsumer()) { + Gauge gauge = registry.get("kafka.consumer.assigned.partitions").gauge(); + assertThat(gauge.getId().getTag("client.id")).isEqualTo("consumer-" + consumerCount); + } + assertThat(registry.find("kafka.consumer.assigned.partitions").gauge()).isNull(); + } + private Consumer createConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); @@ -120,4 +154,4 @@ private Consumer createConsumer() { return consumer; } -} \ No newline at end of file +}