Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Paxos metrics #548

Open
wants to merge 7 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ public class ColumnFamilyMetrics
public final LatencyMetrics casPropose;
/** CAS Commit metrics */
public final LatencyMetrics casCommit;
/** CAS Lock metrics */
public final LatencyMetrics casLockWait;
/** CAS Paxos System Read metrics */
public final LatencyMetrics casSystemRead;
/** CAS Paxos System Write metrics */
public final LatencyMetrics casSystemWrite;

/** CAS lock acquired */
public final Meter casLock;

public final Meter casPrepareAttempts;
public final Meter casProposeAttempts;
public final Meter casCommitAttempts;

/** Estimated ratio of droppable tombstones and number of cells in this table */
public final Gauge<Double> droppableTombstoneRatio;
Expand Down Expand Up @@ -721,6 +734,14 @@ public Long getValue()
casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
casLockWait = new LatencyMetrics(factory, "CasLockWait", cfs.keyspace.metric.casLockWait);
casSystemRead = new LatencyMetrics(factory, "CasSystemRead", cfs.keyspace.metric.casSystemRead);
casSystemWrite = new LatencyMetrics(factory, "CasSystemWrite", cfs.keyspace.metric.casSystemWrite);

casLock = Metrics.meter(factory.createMetricName("CasLock"));
casPrepareAttempts = Metrics.meter(factory.createMetricName("CasPrepareAttempts"));
casProposeAttempts = Metrics.meter(factory.createMetricName("CasProposeAttempts"));
casCommitAttempts = Metrics.meter(factory.createMetricName("CasCommitAttempts"));

droppableTombstoneRatio = createColumnFamilyGauge("DroppableTombstoneRatio", cfs::getDroppableTombstoneRatio);
liveTombstoneRatio = createColumnFamilyGauge("LiveTombstoneRatio", cfs::getLiveTombstoneRatio);
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public class KeyspaceMetrics
public final LatencyMetrics casPropose;
/** CAS Commit metrics */
public final LatencyMetrics casCommit;
/** CAS Lock metrics */
public final LatencyMetrics casLockWait;
/** CAS Paxos System Read metrics */
public final LatencyMetrics casSystemRead;
/** CAS Paxos System Write metrics */
public final LatencyMetrics casSystemWrite;
/** Number of mutation requests which were not for a row this node owned */
public final Counter invalidMutations;
/** Number of mutation requests which are valid */
Expand Down Expand Up @@ -271,6 +277,10 @@ public Long getValue(ColumnFamilyMetrics metric)
casPrepare = new LatencyMetrics(factory, "CasPrepare");
casPropose = new LatencyMetrics(factory, "CasPropose");
casCommit = new LatencyMetrics(factory, "CasCommit");
casLockWait = new LatencyMetrics(factory, "CasLockWait");
casSystemRead = new LatencyMetrics(factory, "CasSystemRead");
casSystemWrite = new LatencyMetrics(factory, "CasSystemWrite");

invalidMutations = Metrics.counter(factory.createMetricName("InvalidMutations"));
validMutations = Metrics.counter(factory.createMetricName("ValidMutations"));
}
Expand Down
40 changes: 40 additions & 0 deletions src/java/org/apache/cassandra/service/paxos/PaxosState.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@
package org.apache.cassandra.service.paxos;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.Lock;

import com.google.common.util.concurrent.Striped;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.UUIDGen;

public class PaxosState
{
private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
protected static final Logger logger = LoggerFactory.getLogger(PaxosState.class);

private final Commit promised;
private final Commit accepted;
Expand All @@ -57,10 +62,16 @@ public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit)
public static PrepareResponse prepare(Commit toPrepare)
{
long start = System.nanoTime();
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepareAttempts.mark();
try
{
long startWaitLock = System.nanoTime();
logger.debug("CASPrepare: waiting for key lock {} for cf {} in keyspace", hexKey(toPrepare.key), toPrepare.update.metadata().cfName, toPrepare.update.metadata().ksName);
Lock lock = LOCKS.get(toPrepare.key);
lock.lock();
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casLock.mark();
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casLockWait.addNano(System.nanoTime() - startWaitLock);
logger.debug("CASPrepare: key lock acquired {} for cf {} in keyspace", hexKey(toPrepare.key), toPrepare.update.metadata().cfName, toPrepare.update.metadata().ksName);
try
{
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
Expand All @@ -69,16 +80,25 @@ public static PrepareResponse prepare(Commit toPrepare)
// amount of re-submit will fix this (because the node on which the commit has expired will have a
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
long now = UUIDGen.unixTimestamp(toPrepare.ballot);

long readPaxos = System.nanoTime();
PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now);
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casSystemRead.addNano(System.nanoTime() - readPaxos);
if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
logger.debug("Promising ballot {}", toPrepare.ballot);

long writePaxos = System.nanoTime();
SystemKeyspace.savePaxosPromise(toPrepare);
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casSystemWrite.addNano(System.nanoTime() - writePaxos);

return new PrepareResponse(true, state.accepted, state.mostRecentCommit);
}
else
{
Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised);
logger.debug("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised);
// return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667)
return new PrepareResponse(false, state.promised, state.mostRecentCommit);
}
Expand All @@ -98,23 +118,31 @@ public static PrepareResponse prepare(Commit toPrepare)
public static Boolean propose(Commit proposal)
{
long start = System.nanoTime();
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casProposeAttempts.mark();
try
{
long startWaitLock = System.nanoTime();
logger.debug("CASPropose: waiting for key lock {} for cf {} in keyspace", hexKey(proposal.key), proposal.update.metadata().cfName, proposal.update.metadata().ksName);
Lock lock = LOCKS.get(proposal.key);
lock.lock();
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casLock.mark();
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casLockWait.addNano(System.nanoTime() - startWaitLock);
logger.debug("CASPropose: key lock acquired {} for cf {} in keyspace", hexKey(proposal.key), proposal.update.metadata().cfName, proposal.update.metadata().ksName);
try
{
long now = UUIDGen.unixTimestamp(proposal.ballot);
PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now);
if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
logger.debug("Accepting proposal {}", proposal);
SystemKeyspace.savePaxosProposal(proposal);
return true;
}
else
{
Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised);
logger.debug("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised);
return false;
}
}
Expand All @@ -132,6 +160,7 @@ public static Boolean propose(Commit proposal)
public static void commit(Commit proposal)
{
long start = System.nanoTime();
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommitAttempts.mark();
try
{
// There is no guarantee we will see commits in the right order, because messages
Expand All @@ -144,12 +173,14 @@ public static void commit(Commit proposal)
if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().cfId))
{
Tracing.trace("Committing proposal {}", proposal);
logger.debug("Committing proposal {}", proposal);
Mutation mutation = proposal.makeMutation();
Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
}
else
{
Tracing.trace("Not committing proposal {} as ballot timestamp predates last truncation time", proposal);
logger.debug("Not committing proposal {} as ballot timestamp predates last truncation time", proposal);
}
// We don't need to lock, we're just blindly updating
SystemKeyspace.savePaxosCommit(proposal);
Expand All @@ -159,4 +190,13 @@ public static void commit(Commit proposal)
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start);
}
}

private static String hexKey(ByteBuffer key) {
ByteBuffer duplicateKey = key.duplicate();
StringBuilder result = new StringBuilder();
while (duplicateKey.hasRemaining()) {
result.append(String.format("%02X", duplicateKey.get()));
}
return result.toString();
}
}