diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 9ca9c465aa..d7382f0940 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -254,7 +254,7 @@ public void receiveTiming(InetAddress host, long latency) // this is cheap public Optional getSnapshot(InetAddress endpoint) { ExponentiallyDecayingReservoir endpointSamples = samples.get(endpoint); if (endpointSamples != null) { - return Optional.ofNullable(samples.get(endpoint).getSnapshot()); + return Optional.ofNullable(endpointSamples.getSnapshot()); } else { return Optional.empty(); } diff --git a/src/java/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetrics.java b/src/java/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetrics.java index bfb65dea24..b020bd7729 100644 --- a/src/java/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetrics.java @@ -50,7 +50,14 @@ public static List latencies, InetAddress extraReplica) { + public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection latencies, Optional extraReplicaSnapshot) { + long extraReplicaP99Latency; + if (extraReplicaSnapshot.isPresent()) { + extraReplicaP99Latency = (long) extraReplicaSnapshot.get().get99thPercentile(); + } else { + return false; + } + long thresholdTime; TimeUnit unit; switch (threshold) { @@ -85,13 +92,6 @@ public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection latenci // Don't want uninitialized percentile latencies to skew the metrics return false; } - long extraReplicaP99Latency; - Optional extraReplicaSnapshot = DatabaseDescriptor.getEndpointSnitch().getSnapshot(extraReplica); - if (extraReplicaSnapshot.isPresent()) { - extraReplicaP99Latency = (long) extraReplicaSnapshot.get().get99thPercentile(); - } else { - return false; - } thresholdTime = TimeUnit.NANOSECONDS.convert(thresholdTime, unit); if (isRetryHelpful(latencies, thresholdTime)) { diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 23dea3bd7e..dd69d12d77 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Snapshot; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import org.slf4j.Logger; @@ -33,6 +34,7 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.config.ReadRepairDecision; import org.apache.cassandra.db.ColumnFamilyStore; @@ -174,8 +176,18 @@ public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismat */ public void writePredictedSpeculativeRetryPerformanceMetrics() { InetAddress extraReplica = Iterables.getLast(targetReplicas); - for (PredictedSpeculativeRetryPerformanceMetrics metrics : getPredSpecRetryMetrics()) { - metrics.maybeWriteMetrics(cfs, this.latencies, extraReplica); + List retryMetrics = getPredSpecRetryMetrics(); + int size = retryMetrics.size(); + if (size > 0) + { + Optional extraReplicaSnapshot = DatabaseDescriptor.getEndpointSnitch().getSnapshot(extraReplica); + if (extraReplicaSnapshot.isPresent()) + { + for (int i = 0; i < size; i++) + { + retryMetrics.get(i).maybeWriteMetrics(cfs, this.latencies, extraReplicaSnapshot); + } + } } } diff --git a/test/unit/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetricsTest.java b/test/unit/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetricsTest.java index 8a1625a899..183d04cb38 100644 --- a/test/unit/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/PredictedSpeculativeRetryPerformanceMetricsTest.java @@ -67,6 +67,7 @@ public class PredictedSpeculativeRetryPerformanceMetricsTest { private static final List replicas = ImmutableList.of(addr1, addr2); private static final List singleLatency = ImmutableList.of(0L); private Map thresholdToMetrics; + private Snapshot mockSnapshot; @BeforeClass public static void beforeClass() @@ -87,7 +88,7 @@ public void before() { MockSchema.cleanup(); SimpleSnitch ss = new SimpleSnitch(); IEndpointSnitch snitch = spy(new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()))); - Snapshot mockSnapshot = mock(Snapshot.class); + mockSnapshot = mock(Snapshot.class); when(mockSnapshot.get99thPercentile()).thenReturn(100.0); doReturn(Optional.of(mockSnapshot)).when(snitch).getSnapshot(any()); DatabaseDescriptor.setEndpointSnitch(snitch); @@ -172,7 +173,7 @@ public void testMetricsRegistered() { public void testMaybeWriteMetricsIgnoresGreaterThresholds() { ColumnFamilyStore cfs = spy(MockSchema.newCFS()); for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) { - assertThat(specMetrics.maybeWriteMetrics(cfs, singleLatency, mock(InetAddress.class))).isFalse(); + assertThat(specMetrics.maybeWriteMetrics(cfs, singleLatency, Optional.of(mockSnapshot))).isFalse(); } } @@ -185,7 +186,7 @@ public void testMaybeWriteMetricsIncludesLowerThresholds() { List latencies = ImmutableList.of(timestamp); for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) { - assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isTrue(); + assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isTrue(); } } @@ -200,18 +201,18 @@ public void testMaybeWriteMetricsHandlesMixedThresholds() { long timestamp = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS); List latencies = ImmutableList.of(timestamp); - assertThat(thresholdToMetrics.get(MILLISECONDS_100).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))) + assertThat(thresholdToMetrics.get(MILLISECONDS_100).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))) .isTrue(); - assertThat(thresholdToMetrics.get(SECONDS_1).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))) + assertThat(thresholdToMetrics.get(SECONDS_1).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))) .isTrue(); - assertThat(thresholdToMetrics.get(SECONDS_5).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))) + assertThat(thresholdToMetrics.get(SECONDS_5).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))) .isFalse(); - assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))) + assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))) .isEqualTo(p50 < timestamp); - assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))) + assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))) .isEqualTo(p95 < timestamp); - assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))) + assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))) .isEqualTo(p99 < timestamp); } @@ -227,7 +228,7 @@ public void testMaybeWriteMetricsUsesCorrectLatencies() { List latencies = ImmutableList.of(timestamp); for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) { - specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)); + specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)); } verify(thresholdToMetrics.get(MILLISECONDS_100), times(1)).addNano( @@ -251,7 +252,7 @@ public void testMaybeWriteMetricIgnoresIfRetryWontHelp() { List latencies = ImmutableList.of(timestamp, timestamp); for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) { - assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse(); + assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse(); } } @@ -260,9 +261,9 @@ public void testMaybeWriteMetricsIgnoresUninitializedPercentileThresholds() { ColumnFamilyStore cfs = spy(MockSchema.newCFS()); long timestamp = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS); List latencies = ImmutableList.of(timestamp); - assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse(); - assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse(); - assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse(); + assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse(); + assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse(); + assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse(); } @Test @@ -276,7 +277,7 @@ public void testMaybeWriteMetricsIgnoresWhenNoSnapshot() { long timestamp = TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS) + 100; List latencies = ImmutableList.of(timestamp); for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) { - assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse(); + assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.empty())).isFalse(); } } }