Skip to content

Commit

Permalink
writePredictedSpeculativeRetryPerformanceMetrics computes snapshot once
Browse files Browse the repository at this point in the history
  • Loading branch information
schlosna committed Oct 14, 2024
1 parent ed39b8a commit b014b22
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Snapshot;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DeletionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ public static <T extends AbstractReadExecutor> List<PredictedSpeculativeRetryPer
.collect(Collectors.toList());
}

public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection<Long> latencies, InetAddress extraReplica) {
public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection<Long> latencies, Optional<Snapshot> extraReplicaSnapshot) {
long extraReplicaP99Latency;
if (extraReplicaSnapshot.isPresent()) {
extraReplicaP99Latency = (long) extraReplicaSnapshot.get().get99thPercentile();
} else {
return false;
}

long thresholdTime;
TimeUnit unit;
switch (threshold) {
Expand Down Expand Up @@ -85,13 +92,6 @@ public boolean maybeWriteMetrics(ColumnFamilyStore cfs, Collection<Long> latenci
// Don't want uninitialized percentile latencies to skew the metrics
return false;
}
long extraReplicaP99Latency;
Optional<Snapshot> extraReplicaSnapshot = DatabaseDescriptor.getEndpointSnitch().getSnapshot(extraReplica);
if (extraReplicaSnapshot.isPresent()) {
extraReplicaP99Latency = (long) extraReplicaSnapshot.get().get99thPercentile();
} else {
return false;
}

thresholdTime = TimeUnit.NANOSECONDS.convert(thresholdTime, unit);
if (isRetryHelpful(latencies, thresholdTime)) {
Expand Down
16 changes: 14 additions & 2 deletions src/java/org/apache/cassandra/service/AbstractReadExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Snapshot;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,7 @@
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.ReadRepairDecision;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -174,8 +176,18 @@ public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismat
*/
public void writePredictedSpeculativeRetryPerformanceMetrics() {
InetAddress extraReplica = Iterables.getLast(targetReplicas);
for (PredictedSpeculativeRetryPerformanceMetrics metrics : getPredSpecRetryMetrics()) {
metrics.maybeWriteMetrics(cfs, this.latencies, extraReplica);
List<PredictedSpeculativeRetryPerformanceMetrics> retryMetrics = getPredSpecRetryMetrics();
int size = retryMetrics.size();
if (size > 0)
{
Optional<Snapshot> extraReplicaSnapshot = DatabaseDescriptor.getEndpointSnitch().getSnapshot(extraReplica);
if (extraReplicaSnapshot.isPresent())
{
for (int i = 0; i < size; i++)
{
retryMetrics.get(i).maybeWriteMetrics(cfs, this.latencies, extraReplicaSnapshot);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class PredictedSpeculativeRetryPerformanceMetricsTest {
private static final List<InetAddress> replicas = ImmutableList.of(addr1, addr2);
private static final List<Long> singleLatency = ImmutableList.of(0L);
private Map<Threshold, PredictedSpeculativeRetryPerformanceMetrics> thresholdToMetrics;
private Snapshot mockSnapshot;

@BeforeClass
public static void beforeClass()
Expand All @@ -87,7 +88,7 @@ public void before() {
MockSchema.cleanup();
SimpleSnitch ss = new SimpleSnitch();
IEndpointSnitch snitch = spy(new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())));
Snapshot mockSnapshot = mock(Snapshot.class);
mockSnapshot = mock(Snapshot.class);
when(mockSnapshot.get99thPercentile()).thenReturn(100.0);
doReturn(Optional.of(mockSnapshot)).when(snitch).getSnapshot(any());
DatabaseDescriptor.setEndpointSnitch(snitch);
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testMetricsRegistered() {
public void testMaybeWriteMetricsIgnoresGreaterThresholds() {
ColumnFamilyStore cfs = spy(MockSchema.newCFS());
for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, singleLatency, mock(InetAddress.class))).isFalse();
assertThat(specMetrics.maybeWriteMetrics(cfs, singleLatency, Optional.of(mockSnapshot))).isFalse();
}
}

Expand All @@ -185,7 +186,7 @@ public void testMaybeWriteMetricsIncludesLowerThresholds() {
List<Long> latencies = ImmutableList.of(timestamp);

for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isTrue();
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isTrue();
}
}

Expand All @@ -200,18 +201,18 @@ public void testMaybeWriteMetricsHandlesMixedThresholds() {
long timestamp = TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
List<Long> latencies = ImmutableList.of(timestamp);

assertThat(thresholdToMetrics.get(MILLISECONDS_100).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(MILLISECONDS_100).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isTrue();
assertThat(thresholdToMetrics.get(SECONDS_1).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(SECONDS_1).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isTrue();
assertThat(thresholdToMetrics.get(SECONDS_5).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(SECONDS_5).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isFalse();

assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isEqualTo(p50 < timestamp);
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isEqualTo(p95 < timestamp);
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class)))
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot)))
.isEqualTo(p99 < timestamp);
}

Expand All @@ -227,7 +228,7 @@ public void testMaybeWriteMetricsUsesCorrectLatencies() {
List<Long> latencies = ImmutableList.of(timestamp);

for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class));
specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot));
}

verify(thresholdToMetrics.get(MILLISECONDS_100), times(1)).addNano(
Expand All @@ -251,7 +252,7 @@ public void testMaybeWriteMetricIgnoresIfRetryWontHelp() {
List<Long> latencies = ImmutableList.of(timestamp, timestamp);

for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
}
}

Expand All @@ -260,9 +261,9 @@ public void testMaybeWriteMetricsIgnoresUninitializedPercentileThresholds() {
ColumnFamilyStore cfs = spy(MockSchema.newCFS());
long timestamp = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
List<Long> latencies = ImmutableList.of(timestamp);
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(thresholdToMetrics.get(P99).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
assertThat(thresholdToMetrics.get(P95).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
assertThat(thresholdToMetrics.get(P50).maybeWriteMetrics(cfs, latencies, Optional.of(mockSnapshot))).isFalse();
}

@Test
Expand All @@ -276,7 +277,7 @@ public void testMaybeWriteMetricsIgnoresWhenNoSnapshot() {
long timestamp = TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS) + 100;
List<Long> latencies = ImmutableList.of(timestamp);
for (PredictedSpeculativeRetryPerformanceMetrics specMetrics : thresholdToMetrics.values()) {
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, mock(InetAddress.class))).isFalse();
assertThat(specMetrics.maybeWriteMetrics(cfs, latencies, Optional.empty())).isFalse();
}
}
}

0 comments on commit b014b22

Please sign in to comment.