Skip to content

Commit

Permalink
writePredictedSpeculativeRetryPerformanceMetrics computes snapshot once
Browse files Browse the repository at this point in the history
  • Loading branch information
schlosna committed Oct 14, 2024
1 parent ed39b8a commit da116f3
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Snapshot;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DeletionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ public static <T extends AbstractReadExecutor> List<PredictedSpeculativeRetryPer
.collect(Collectors.toList());
}

public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection<Long> latencies, InetAddress extraReplica) {
public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection<Long> latencies, Optional<Snapshot> extraReplicaSnapshot) {
long extraReplicaP99Latency;
if (extraReplicaSnapshot.isPresent()) {
extraReplicaP99Latency = (long) extraReplicaSnapshot.get().get99thPercentile();
} else {
return false;
}

long thresholdTime;
TimeUnit unit;
switch (threshold) {
Expand Down Expand Up @@ -85,13 +92,6 @@ public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection<Long> latenci
// Don't want uninitialized percentile latencies to skew the metrics
return false;
}
long extraReplicaP99Latency;
Optional<Snapshot> extraReplicaSnapshot = DatabaseDescriptor.getEndpointSnitch().getSnapshot(extraReplica);
if (extraReplicaSnapshot.isPresent()) {
extraReplicaP99Latency = (long) extraReplicaSnapshot.get().get99thPercentile();
} else {
return false;
}

thresholdTime = TimeUnit.NANOSECONDS.convert(thresholdTime, unit);
if (isRetryHelpful(latencies, thresholdTime)) {
Expand Down
16 changes: 14 additions & 2 deletions src/java/org/apache/cassandra/service/AbstractReadExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Snapshot;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,7 @@
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.ReadRepairDecision;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -174,8 +176,18 @@ public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismat
*/
public void writePredictedSpeculativeRetryPerformanceMetrics() {
InetAddress extraReplica = Iterables.getLast(targetReplicas);
for (PredictedSpeculativeRetryPerformanceMetrics metrics : getPredSpecRetryMetrics()) {
metrics.maybeWriteMetrics(cfs, this.latencies, extraReplica);
List<PredictedSpeculativeRetryPerformanceMetrics> retryMetrics = getPredSpecRetryMetrics();
int size = retryMetrics.size();
if (size > 0)
{
Optional<Snapshot> extraReplicaSnapshot = DatabaseDescriptor.getEndpointSnitch().getSnapshot(extraReplica);
if (extraReplicaSnapshot.isPresent())
{
for (int i = 0; i < size; i++)
{
retryMetrics.get(i).maybeWriteMetrics(cfs, this.latencies, extraReplicaSnapshot);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ public void testMetricsRegistered() {
@Test
public void testMaybeWriteMetricsIgnoresGreaterThresholds() {
ColumnFamilyStore cfs = spy(MockSchema.newCFS());
Snapshot mockSnapshot = mock(Snapshot.class);
for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, singleLatency, mock(InetAddress.class))).isFalse();
assertThat(specMetrics.maybeWriteMetrics(cfs, singleLatency, Optional.of(mockSnapshot))).isFalse();
}
}

Expand All @@ -183,9 +184,10 @@ public void testMaybeWriteMetricsIncludesLowerThresholds() {
long p99 = (long) cfs.metric.coordinatorReadLatency.getSnapshot().get99thPercentile();
long timestamp = Math.max(TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS), p99) + 100;
List<Long> latencies = ImmutableList.of(timestamp);
Snapshot mockSnapshot = mock(Snapshot.class);

for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isTrue();
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isTrue();
}
}

Expand All @@ -199,19 +201,20 @@ public void testMaybeWriteMetricsHandlesMixedThresholds() {
long p50 = (long) cfs.metric.coordinatorReadLatency.getSnapshot().getMedian();
long timestamp = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
List<Long> latencies = ImmutableList.of(timestamp);
Snapshot mockSnapshot = mock(Snapshot.class);

assertThat(thresholdToMetrics.get(MILLISECONDS_100).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(MILLISECONDS_100).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isTrue();
assertThat(thresholdToMetrics.get(SECONDS_1).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(SECONDS_1).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isTrue();
assertThat(thresholdToMetrics.get(SECONDS_5).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(SECONDS_5).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isFalse();

assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isEqualTo(p50 < timestamp);
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isEqualTo(p95 < timestamp);
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isEqualTo(p99 < timestamp);
}

Expand All @@ -225,9 +228,10 @@ public void testMaybeWriteMetricsUsesCorrectLatencies() {
long p50 = (long) cfs.metric.coordinatorReadLatency.getSnapshot().getMedian();
long timestamp = TimeUnit.NANOSECONDS.convert(10 + p99, TimeUnit.SECONDS);
List<Long> latencies = ImmutableList.of(timestamp);
Snapshot mockSnapshot = mock(Snapshot.class);

for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class));
specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot));
}

verify(thresholdToMetrics.get(MILLISECONDS_100), times(1)).addNano(
Expand All @@ -249,9 +253,10 @@ public void testMaybeWriteMetricIgnoresIfRetryWontHelp() {
long p99 = (long) cfs.metric.coordinatorReadLatency.getSnapshot().get99thPercentile();
long timestamp = Math.max(TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS), p99) + 100;
List<Long> latencies = ImmutableList.of(timestamp, timestamp);
Snapshot mockSnapshot = mock(Snapshot.class);

for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
}
}

Expand All @@ -260,9 +265,10 @@ public void testMaybeWriteMetricsIgnoresUninitializedPercentileThresholds() {
ColumnFamilyStore cfs = spy(MockSchema.newCFS());
long timestamp = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
List<Long> latencies = ImmutableList.of(timestamp);
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
Snapshot mockSnapshot = mock(Snapshot.class);
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
}

@Test
Expand All @@ -272,11 +278,12 @@ public void testMaybeWriteMetricsIgnoresWhenNoSnapshot() {
IEndpointSnitch snitch = spy(new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())));
doReturn(Optional.empty()).when(snitch).getSnapshot(any());
DatabaseDescriptor.setEndpointSnitch(snitch);
Snapshot mockSnapshot = mock(Snapshot.class);

long timestamp = TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS) + 100;
List<Long> latencies = ImmutableList.of(timestamp);
for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
}
}
}

0 comments on commit da116f3

Please sign in to comment.