Skip to content

Commit

Permalink
enable wal
Browse files Browse the repository at this point in the history
  • Loading branch information
wi11dey committed Nov 7, 2024
1 parent 6199665 commit 12d80d8
Showing 1 changed file with 103 additions and 98 deletions.
201 changes: 103 additions & 98 deletions src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,122 +156,127 @@ public boolean apply(SSTableReader sstable)
long totalKeysWritten = 0;

long estimatedKeys = 0;
try (CompactionController controller = getCompactionController(transaction.originals()))
CompactionController controller = getCompactionController(transaction.originals());
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());

SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());

// SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
// to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
// See CASSANDRA-8019 and CASSANDRA-8399
UUID taskId = null;
String taskIdLoggerMsg;
List<SSTableReader> newSStables;
AbstractCompactionIterable ci = null;
Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
boolean readyToFinish = false;
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());

SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());


// SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
// to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
// See CASSANDRA-8019 and CASSANDRA-8399
UUID taskId = null;
String taskIdLoggerMsg;
List<SSTableReader> newSStables;
AbstractCompactionIterable ci = null;
try (Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals());
taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
{
taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals());
taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
{
long lastCheckObsoletion = start;
long lastCheckObsoletion = start;

if (!controller.cfs.getCompactionStrategy().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());
if (!controller.cfs.getCompactionStrategy().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());

if (collector != null)
collector.beginCompaction(ci);
if (collector != null)
collector.beginCompaction(ci);

boolean readyToFinish = false;
try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
{
estimatedKeys = writer.estimatedKeys();
while (iter.hasNext())
{
estimatedKeys = writer.estimatedKeys();
while (iter.hasNext())
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());

try (AbstractCompactedRow row = iter.next())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
if (writer.append(row))
totalKeysWritten++;

try (AbstractCompactedRow row = iter.next())
if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
{
if (writer.append(row))
totalKeysWritten++;

if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
{
controller.maybeRefreshOverlaps();
lastCheckObsoletion = System.nanoTime();
}
controller.maybeRefreshOverlaps();
lastCheckObsoletion = System.nanoTime();
}
}
}

readyToFinish = true;
newSStables = writer.finish();
} catch (Exception e) {
CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e);
if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0)
{
logger.error("CompactionAwareWriter failed to close correctly for {}/{}. Continuing to compact now can cause resurrection. Exiting",
cfs.keyspace.getName(), cfs.name, e);
System.exit(1);
}
throw exception;
readyToFinish = true;
newSStables = writer.finish();
} catch (Exception e) {
CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e);
if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0)
{
logger.error("CompactionAwareWriter failed to close correctly for {}/{}. Continuing to compact now can cause resurrection. Exiting",
cfs.keyspace.getName(), cfs.name, e);
System.exit(1);
}
throw exception;
}
}
finally
{
Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS);
if (taskId != null)
SystemKeyspace.finishCompaction(taskId);

if (collector != null && ci != null)
collector.finishCompaction(ci);
}

try
{
ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());
}
catch (Exception e)
{
logger.error("Failed to write to the write-ahead log for {}/{}. Continuing to compact now can cause resurrection. Exiting",
cfs.keyspace.getName(), cfs.name, e);
System.exit(1);
}
finally
{
if (!readyToFinish) {
// TODO(wdey): refactor all of the trys
try (Refs closedRefs = refs; CompactionController closedController = controller) {}
}
Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS);
if (taskId != null)
SystemKeyspace.finishCompaction(taskId);

// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = SSTableReader.getTotalBytes(transaction.originals());
long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;

StringBuilder newSSTableNames = new StringBuilder();
for (SSTableReader reader : newSStables)
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
if (collector != null && ci != null)
collector.finishCompaction(ci);
}

if (offline)
{
Refs.release(Refs.selfRefs(newSStables));
}
else
{
double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
long totalSourceRows = 0;
String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten));

// update the metrics
cfs.metric.compactionBytesWritten.inc(endsize);
cfs.metric.compactionsCompleted.inc();
}
try
{
ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());
}
catch (Exception e)
{
logger.error("Failed to write to the write-ahead log for {}/{}. Continuing to compact now can cause resurrection. Exiting",
cfs.keyspace.getName(), cfs.name, e);
System.exit(1);
}
refs.close();
controller.close();
if (taskId != null)
SystemKeyspace.finishCompaction(taskId);

// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = SSTableReader.getTotalBytes(transaction.originals());
long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;

StringBuilder newSSTableNames = new StringBuilder();
for (SSTableReader reader : newSStables)
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");

if (offline)
{
Refs.release(Refs.selfRefs(newSStables));
}
else
{
double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
long totalSourceRows = 0;
String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten));

// update the metrics
cfs.metric.compactionBytesWritten.inc(endsize);
cfs.metric.compactionsCompleted.inc();
}
}

Expand Down

0 comments on commit 12d80d8

Please sign in to comment.