From 7ef2f27202ba7abe37a53048235bedf7a2dde1af Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 17 Oct 2024 00:55:37 -0400 Subject: [PATCH 01/19] compaction task write-ahead log --- .../db/compaction/CompactionTask.java | 181 +++++++++--------- 1 file changed, 91 insertions(+), 90 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 89ea4d36ee..c158695c1e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; +import java.util.stream.Collectors; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -156,116 +157,116 @@ public boolean apply(SSTableReader sstable) long totalKeysWritten = 0; long estimatedKeys = 0; - try (CompactionController controller = getCompactionController(transaction.originals())) + CompactionController controller = getCompactionController(transaction.originals()); + Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); + + SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); + + + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 + boolean abortFailed = false; + UUID taskId = null; + String taskIdLoggerMsg; + List newSStables; + AbstractCompactionIterable ci = null; + Refs refs = Refs.ref(actuallyCompact); + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - - SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); - - - // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references - // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. - // See CASSANDRA-8019 and CASSANDRA-8399 - boolean abortFailed = false; - UUID taskId = null; - String taskIdLoggerMsg; - List newSStables; - AbstractCompactionIterable ci = null; - try (Refs refs = Refs.ref(actuallyCompact); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); + taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); + logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); + ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); + try (CloseableIterator iter = ci.iterator()) { - taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); - taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); - logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); - ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); - try (CloseableIterator iter = ci.iterator()) - { - long lastCheckObsoletion = start; + long lastCheckObsoletion = start; - if (!controller.cfs.getCompactionStrategy().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (!controller.cfs.getCompactionStrategy().isActive) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (collector != null) - collector.beginCompaction(ci); + if (collector != null) + collector.beginCompaction(ci); - boolean readyToFinish = false; - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + boolean readyToFinish = false; + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + { + estimatedKeys = writer.estimatedKeys(); + while (iter.hasNext()) { - estimatedKeys = writer.estimatedKeys(); - while (iter.hasNext()) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + try (AbstractCompactedRow row = iter.next()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (writer.append(row)) + totalKeysWritten++; - try (AbstractCompactedRow row = iter.next()) + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { - if (writer.append(row)) - totalKeysWritten++; - - if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) - { - controller.maybeRefreshOverlaps(); - lastCheckObsoletion = System.nanoTime(); - } + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); } } + } - readyToFinish = true; - newSStables = writer.finish(); - } catch (Exception e) { - CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); - if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) - { - abortFailed = true; - logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " + - "system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " + - "compaction-product sstables are marked final while others remain tmp", - cfs.keyspace.getName(), cfs.name, e); - } - throw exception; + readyToFinish = true; + newSStables = writer.finish(); + } catch (Exception e) { + CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); + if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) + { + abortFailed = true; + logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " + + "system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " + + "compaction-product sstables are marked final while others remain tmp", + cfs.keyspace.getName(), cfs.name, e); } + throw exception; } } - finally - { - Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); - if (taskId != null && (!abortFailed)) - SystemKeyspace.finishCompaction(taskId); + } + finally + { + Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); + if (taskId != null && (!abortFailed)) + SystemKeyspace.finishCompaction(taskId); - if (collector != null && ci != null) - collector.finishCompaction(ci); - } + if (collector != null && ci != null) + collector.finishCompaction(ci); + } - ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); + ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); + refs.close(); + controller.close(); - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(transaction.originals()); - long endsize = SSTableReader.getTotalBytes(newSStables); - double ratio = (double) endsize / (double) startsize; + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(transaction.originals()); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; - StringBuilder newSSTableNames = new StringBuilder(); - for (SSTableReader reader : newSStables) - newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); - if (offline) - { - Refs.release(Refs.selfRefs(newSStables)); - } - else - { - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); - logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); - - // update the metrics - cfs.metric.compactionBytesWritten.inc(endsize); - cfs.metric.compactionsCompleted.inc(); - } + if (offline) + { + Refs.release(Refs.selfRefs(newSStables)); + } + else + { + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); + logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); + + // update the metrics + cfs.metric.compactionBytesWritten.inc(endsize); + cfs.metric.compactionsCompleted.inc(); } } From 3a242cc820d265e67949b327680a39e3b7bf9bf6 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 17 Oct 2024 01:03:36 -0400 Subject: [PATCH 02/19] define interface --- .../com/palantir/cassandra/db/ColumnFamilyStoreManager.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java index 291aa37f01..17f456b196 100644 --- a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java +++ b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java @@ -25,6 +25,7 @@ import com.palantir.cassandra.db.compaction.IColumnFamilyStoreWriteAheadLogger; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.utils.Pair; public class ColumnFamilyStoreManager implements IColumnFamilyStoreValidator, IColumnFamilyStoreWriteAheadLogger @@ -77,4 +78,8 @@ public void markForDeletion(CFMetaData cfMetaData, Set descriptors) { writeAheadLogger.markForDeletion(cfMetaData, descriptors); } + + public synchronized void markForDeletion(CFMetaData cfMetaData, Set collect) + { + } } From f94dc767398451c143fffff5beaf8b9e0abf71d8 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 17 Oct 2024 01:07:03 -0400 Subject: [PATCH 03/19] first roll cutover --- .../palantir/cassandra/db/ColumnFamilyStoreManager.java | 9 ++++++++- .../org/apache/cassandra/service/CassandraDaemon.java | 5 ++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java index 17f456b196..d637624f24 100644 --- a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java +++ b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java @@ -79,7 +79,14 @@ public void markForDeletion(CFMetaData cfMetaData, Set descriptors) writeAheadLogger.markForDeletion(cfMetaData, descriptors); } - public synchronized void markForDeletion(CFMetaData cfMetaData, Set collect) + public void markForDeletion(CFMetaData cfMetaData, Set collect) { + // consumer should synchronize on ks+cf + throw new UnsupportedOperationException("not yet implemented"); + } + + public boolean shouldRemoveUnusedSstables() { + // TODO(wdey): delegate + return true; } } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 16ead7921e..cfd311e065 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -61,6 +61,7 @@ import com.palantir.cassandra.concurrent.LocalReadRunnableTimeoutWatcher; import com.palantir.cassandra.db.BootstrappingSafetyException; +import com.palantir.cassandra.db.ColumnFamilyStoreManager; import com.palantir.cassandra.settings.DisableClientInterfaceSetting; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.Safe; @@ -290,7 +291,9 @@ private void completeSetupMayThrowSstableException() { for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) { - ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); + if (ColumnFamilyStoreManager.instance.shouldRemoveUnusedSstables()) { + ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); + } ColumnFamilyStore.scrubDataDirectories(cfm); } } From d1d15283a2ca48baebe414797b50e77256786096 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 17 Oct 2024 01:10:05 -0400 Subject: [PATCH 04/19] warning is incorrect --- src/java/org/apache/cassandra/db/compaction/CompactionTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index c158695c1e..8e9154bdd9 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -101,6 +101,7 @@ public boolean reduceScopeForLimitedSpace(long expectedSize) * which are properly serialized. * Caller is in charge of marking/unmarking the sstables as compacting. */ + @SuppressWarnings("resource") // It is dangerous to close refs for a failed transaction protected void runMayThrow() throws Exception { // The collection of sstables passed may be empty (but not null); even if From 052e5d9f4c97c467fbddd255090b3882d027edef Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 17 Oct 2024 18:16:10 -0400 Subject: [PATCH 05/19] bump test timeout --- .../org/apache/cassandra/io/sstable/SSTableRewriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 1dbf22b485..6839dd2f50 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -938,7 +938,7 @@ public static void truncate(ColumnFamilyStore cfs) { cfs.truncateBlocking(); SSTableDeletingTask.waitForDeletions(); - Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(1L,TimeUnit.SECONDS); assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); validateCFS(cfs); From f499efad6cf368c5e28f28ff0b67c94f75585f8f Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 17 Oct 2024 18:18:35 -0400 Subject: [PATCH 06/19] stop asserting no sstables --- .../org/apache/cassandra/io/sstable/SSTableRewriterTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 6839dd2f50..ba6bd31bcb 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -938,9 +938,6 @@ public static void truncate(ColumnFamilyStore cfs) { cfs.truncateBlocking(); SSTableDeletingTask.waitForDeletions(); - Uninterruptibles.sleepUninterruptibly(1L,TimeUnit.SECONDS); - assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); - assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); validateCFS(cfs); } From 0de53614c46bab92c5326fda9c6766e254749392 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Fri, 18 Oct 2024 00:36:30 -0400 Subject: [PATCH 07/19] more unassertions --- .../org/apache/cassandra/io/sstable/SSTableRewriterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index ba6bd31bcb..5d57a6dce4 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -938,7 +938,6 @@ public static void truncate(ColumnFamilyStore cfs) { cfs.truncateBlocking(); SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); } public static SSTableReader writeFile(ColumnFamilyStore cfs, int count) From 874d608a025fbfef7857030f1159e95b0231f2a7 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Fri, 18 Oct 2024 09:45:51 -0400 Subject: [PATCH 08/19] even more --- .../org/apache/cassandra/io/sstable/SSTableRewriterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 5d57a6dce4..7936b529af 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -727,7 +727,6 @@ private void testAbortHelper(boolean earlyException, boolean offline) throws Exc filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); } - assertEquals(0, filecount); truncate(cfs); } From 7faebe2b1ef27fd55fc64edb92651ebfbee48613 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Fri, 18 Oct 2024 17:21:06 -0400 Subject: [PATCH 09/19] NON_COMPACTING flag --- src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 60f2d25dd5..31291b16b1 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -107,8 +107,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); + private static final boolean DRY_RUN_NON_COMPACTING_CLEANUP = Boolean.parseBoolean(System.getProperty( + "palantir_cassandra.dry_run_non_compacting_cleanup", "true")); private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean( - "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); + "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, @@ -763,7 +765,8 @@ public static void removeUnusedSstables(CFMetaData metadata, Map Descriptor desc = sstableFiles.getKey(); if (completedAncestors.contains(desc.generation)) { - if (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty()) + if (DRY_RUN_NON_COMPACTING_CLEANUP + || (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())) { logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc), SafeArg.of("keyspace", desc.ksname), SafeArg.of("cf", desc.cfname), From 67885fe7fbda286faf226140ad2f407707eb0c0f Mon Sep 17 00:00:00 2001 From: Will Dey Date: Fri, 18 Oct 2024 19:09:19 -0400 Subject: [PATCH 10/19] fix tests --- src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 3 ++- test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 31291b16b1..0426a9d436 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -107,7 +107,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); - private static final boolean DRY_RUN_NON_COMPACTING_CLEANUP = Boolean.parseBoolean(System.getProperty( + @VisibleForTesting + static boolean DRY_RUN_NON_COMPACTING_CLEANUP = Boolean.parseBoolean(System.getProperty( "palantir_cassandra.dry_run_non_compacting_cleanup", "true")); private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean( "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index a4f25844ce..c9fe96c617 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -1959,6 +1959,7 @@ protected SSTableWriter getWriter() @Test public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException { + ColumnFamilyStore.DRY_RUN_NON_COMPACTING_CLEANUP = false; final String ks = KEYSPACE1; final String cf = CF_STANDARD7; From 86eec404d4f8a2191f292f53ab2cdfe610e08004 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Mon, 21 Oct 2024 11:29:11 -0400 Subject: [PATCH 11/19] add shouldSkipAncestorCleanup --- src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 5 +---- test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0426a9d436..17dee2312c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -107,9 +107,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); - @VisibleForTesting - static boolean DRY_RUN_NON_COMPACTING_CLEANUP = Boolean.parseBoolean(System.getProperty( - "palantir_cassandra.dry_run_non_compacting_cleanup", "true")); private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean( "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); @@ -766,7 +763,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map Descriptor desc = sstableFiles.getKey(); if (completedAncestors.contains(desc.generation)) { - if (DRY_RUN_NON_COMPACTING_CLEANUP + if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanup() || (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())) { logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc), diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index c9fe96c617..a4f25844ce 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -1959,7 +1959,6 @@ protected SSTableWriter getWriter() @Test public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException { - ColumnFamilyStore.DRY_RUN_NON_COMPACTING_CLEANUP = false; final String ks = KEYSPACE1; final String cf = CF_STANDARD7; From 88e1ae609b5fee17fb920c3a50fe39945aefad5b Mon Sep 17 00:00:00 2001 From: Will Dey Date: Mon, 21 Oct 2024 13:16:58 -0400 Subject: [PATCH 12/19] Release refs if fail before readyToFinish --- .../org/apache/cassandra/db/compaction/CompactionTask.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 8e9154bdd9..fa310f5440 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -173,6 +173,7 @@ public boolean apply(SSTableReader sstable) List newSStables; AbstractCompactionIterable ci = null; Refs refs = Refs.ref(actuallyCompact); + boolean readyToFinish = false; try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); @@ -189,7 +190,6 @@ public boolean apply(SSTableReader sstable) if (collector != null) collector.beginCompaction(ci); - boolean readyToFinish = false; try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) { estimatedKeys = writer.estimatedKeys(); @@ -229,6 +229,10 @@ public boolean apply(SSTableReader sstable) } finally { + if (!readyToFinish) { + // TODO(wdey): refactor all of the trys + try (Refs closedRefs = refs; CompactionController closedController = controller) {} + } Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); if (taskId != null && (!abortFailed)) SystemKeyspace.finishCompaction(taskId); From 0d6280f7b899b3bd2be0ea5757797814a9b23c2a Mon Sep 17 00:00:00 2001 From: Will Dey Date: Mon, 21 Oct 2024 13:18:18 -0400 Subject: [PATCH 13/19] re-enable test assertions --- .../org/apache/cassandra/io/sstable/SSTableRewriterTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 7936b529af..1dbf22b485 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -727,6 +727,7 @@ private void testAbortHelper(boolean earlyException, boolean offline) throws Exc filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); } + assertEquals(0, filecount); truncate(cfs); } @@ -937,6 +938,10 @@ public static void truncate(ColumnFamilyStore cfs) { cfs.truncateBlocking(); SSTableDeletingTask.waitForDeletions(); + Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS); + assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); + validateCFS(cfs); } public static SSTableReader writeFile(ColumnFamilyStore cfs, int count) From 42d381e43ecee4348f8316f636693cf94e9848b2 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Mon, 21 Oct 2024 14:26:11 -0400 Subject: [PATCH 14/19] use obsolescence from LifecycleTransaction --- .../org/apache/cassandra/db/lifecycle/LifecycleTransaction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 54ebfd2083..2657f2cf3b 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -60,7 +60,7 @@ public static class State // readers that are either brand new, update a previous new reader, or update one of the original readers final Set update = new HashSet<>(); // disjoint from update, represents a subset of originals that is no longer needed - final Set obsolete = new HashSet<>(); + public final Set obsolete = new HashSet<>(); void log(State staged) { From 116c9b9f7e9bb25ce0d330c2ff8d7bbee9f9de99 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Tue, 22 Oct 2024 15:27:45 -0400 Subject: [PATCH 15/19] tests --- .../cassandra/db/ColumnFamilyStoreTest.java | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index a4f25844ce..51cd4d4fcf 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -118,6 +118,7 @@ public class ColumnFamilyStoreTest public static final String CF_STANDARD5 = "Standard5"; public static final String CF_STANDARD6 = "Standard6"; public static final String CF_STANDARD7 = "Standard7"; + public static final String CF_STANDARD8 = "Standard8"; public static final String CF_STANDARDINT = "StandardInteger1"; public static final String CF_SUPER1 = "Super1"; public static final String CF_SUPER6 = "Super6"; @@ -148,6 +149,7 @@ public static void defineSchema() throws ConfigurationException SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD6), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD7), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD8), SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEX1, true), SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEX2, false), SchemaLoader.superCFMD(KEYSPACE1, CF_SUPER1, LongType.instance), @@ -1984,7 +1986,8 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); }; - try { + try + { ColumnFamilyStoreManager.instance.registerValidator(validator); ColumnFamilyStore.removeUnusedSstables(cfmeta, ImmutableMap.of()); } @@ -2002,6 +2005,65 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException assertEquals(expected, sstables.keySet()); } + @Test + public void testShouldSkipAncestorCleanupSkipsAncestorCleanup() throws IOException + { + final String ks = KEYSPACE1; + final String cf = CF_STANDARD8; + + final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf).disableAutoCompaction(); + Directories dir = new Directories(cfmeta); + + int gen1 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); + int gen2 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); + int gen3 = writeNextGenerationSstable(ImmutableSet.of(gen1, gen2), dir, cfmeta); + int gen4 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); + int gen5 = writeNextGenerationSstable(ImmutableSet.of(gen4), dir, cfmeta); + + Map> sstables = dir.sstableLister().list(); + Descriptor sstable3Desc = sstables.keySet().iterator().next().withGeneration(gen3); + assertEquals(5, sstables.size()); + assertTrue(sstables.containsKey(sstable3Desc)); + + IColumnFamilyStoreValidator validator = new IColumnFamilyStoreValidator() + { + @Override + public Map> filterValidAncestors(CFMetaData _cfMetaData, Map> sstableToCompletedAncestors, Map _unfinishedCompactions) + { + Set allowedGenerations = ImmutableSet.of(gen1, gen2, gen4, gen5); + return sstableToCompletedAncestors.entrySet().stream() + .filter(entry -> allowedGenerations.contains(entry.getKey().generation)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public boolean shouldSkipAncestorCleanup() + { + return true; + } + }; + + try + { + ColumnFamilyStoreManager.instance.registerValidator(validator); + ColumnFamilyStore.removeUnusedSstables(cfmeta, ImmutableMap.of()); + } + finally + { + ColumnFamilyStoreManager.instance.unregisterValidator(); + } + + sstables = dir.sstableLister().list(); + ImmutableSet expected = ImmutableSet.of( + sstable3Desc.withGeneration(gen1), + sstable3Desc.withGeneration(gen2), + sstable3Desc.withGeneration(gen3), + sstable3Desc.withGeneration(gen4), + sstable3Desc.withGeneration(gen5)); + assertEquals(expected, sstables.keySet()); + } + @Test public void testLoadNewSSTablesAvoidsOverwrites() throws Throwable { From 64c6959018f678be2d1c6f906b507019c6303351 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Wed, 23 Oct 2024 21:10:42 -0400 Subject: [PATCH 16/19] comment --- .../org/apache/cassandra/db/lifecycle/LifecycleTransaction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 2657f2cf3b..54ebfd2083 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -60,7 +60,7 @@ public static class State // readers that are either brand new, update a previous new reader, or update one of the original readers final Set update = new HashSet<>(); // disjoint from update, represents a subset of originals that is no longer needed - public final Set obsolete = new HashSet<>(); + final Set obsolete = new HashSet<>(); void log(State staged) { From 9153a57a07590c6ef421a5e787da76aa82997ef4 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Wed, 23 Oct 2024 21:12:49 -0400 Subject: [PATCH 17/19] comment --- .../com/palantir/cassandra/db/ColumnFamilyStoreManager.java | 1 - src/java/org/apache/cassandra/db/compaction/CompactionTask.java | 1 - src/java/org/apache/cassandra/service/CassandraDaemon.java | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java index d637624f24..86669cbed0 100644 --- a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java +++ b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java @@ -25,7 +25,6 @@ import com.palantir.cassandra.db.compaction.IColumnFamilyStoreWriteAheadLogger; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.utils.Pair; public class ColumnFamilyStoreManager implements IColumnFamilyStoreValidator, IColumnFamilyStoreWriteAheadLogger diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index fa310f5440..c13aa0c737 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -25,7 +25,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.stream.Collectors; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index cfd311e065..529e6ef1fb 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -291,7 +291,7 @@ private void completeSetupMayThrowSstableException() { for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) { - if (ColumnFamilyStoreManager.instance.shouldRemoveUnusedSstables()) { + if (ColumnFamilyStoreManager.instance.shouldRemoveUnusedSstablesBasedOnAncestorMetadata()) { ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); } ColumnFamilyStore.scrubDataDirectories(cfm); From 41a8aba57a40afc911cf35b13be042b9ba912a29 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 31 Oct 2024 10:40:13 -0400 Subject: [PATCH 18/19] dual writing wal --- .../db/ColumnFamilyStoreManager.java | 6 -- .../db/CompactionsInProgressFlusher.java | 29 +++++++--- .../cassandra/db/ColumnFamilyStore.java | 6 +- .../apache/cassandra/db/SystemKeyspace.java | 57 +++++++++++++++---- .../db/compaction/CompactionTask.java | 6 +- .../cassandra/service/CassandraDaemon.java | 5 +- .../cassandra/db/ColumnFamilyStoreTest.java | 6 +- .../db/compaction/CompactionsTest.java | 10 ++-- 8 files changed, 83 insertions(+), 42 deletions(-) diff --git a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java index 86669cbed0..d6da5c7be7 100644 --- a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java +++ b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java @@ -78,12 +78,6 @@ public void markForDeletion(CFMetaData cfMetaData, Set descriptors) writeAheadLogger.markForDeletion(cfMetaData, descriptors); } - public void markForDeletion(CFMetaData cfMetaData, Set collect) - { - // consumer should synchronize on ks+cf - throw new UnsupportedOperationException("not yet implemented"); - } - public boolean shouldRemoveUnusedSstables() { // TODO(wdey): delegate return true; diff --git a/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java b/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java index 4f1881e86b..29110bc200 100644 --- a/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java +++ b/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java @@ -16,6 +16,7 @@ package com.palantir.cassandra.db; +import java.util.EnumMap; import java.util.function.Supplier; import org.apache.cassandra.db.Keyspace; @@ -52,15 +53,25 @@ public class CompactionsInProgressFlusher { private static final boolean COALESCE_FLUSHES = Boolean.getBoolean("palantir_cassandra.coalesce_cip_flushes"); - public static final CompactionsInProgressFlusher INSTANCE = new CompactionsInProgressFlusher(); - - private final Supplier flusher = () -> FBUtilities.waitOnFuture( - Keyspace.open(SystemKeyspace.NAME) - .getColumnFamilyStore(SystemKeyspace.COMPACTIONS_IN_PROGRESS) - .forceFlush("CompactionsInProgressFlusher")); - private final Supplier coalescingFlusher = new CoalescingSupplier(flusher); - - private CompactionsInProgressFlusher() { } + public static final EnumMap INSTANCES = new EnumMap<>(SystemKeyspace.CompactionsInProgressTable.class); + + static { + INSTANCES.put(SystemKeyspace.CompactionsInProgressTable.DEFAULT, new CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable.DEFAULT)); + INSTANCES.put(SystemKeyspace.CompactionsInProgressTable.WAL, new CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable.WAL)); + } + + private final SystemKeyspace.CompactionsInProgressTable compactionsInProgressTable; + private final Supplier flusher; + private final Supplier coalescingFlusher; + + private CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable compactionsInProgressTable) { + this.compactionsInProgressTable = compactionsInProgressTable; + flusher = () -> FBUtilities.waitOnFuture( + Keyspace.open(SystemKeyspace.NAME) + .getColumnFamilyStore(compactionsInProgressTable.toString()) + .forceFlush("CompactionsInProgressFlusher")); + coalescingFlusher = new CoalescingSupplier<>(flusher); + } public ReplayPosition forceBlockingFlush() { if (COALESCE_FLUSHES) { diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 17dee2312c..fbca406966 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -755,7 +755,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map completedAncestors.addAll(ancestors); } } - cleanedUnfinishedCompactions.forEach(SystemKeyspace::finishCompaction); + cleanedUnfinishedCompactions.forEach(uuid -> SystemKeyspace.finishCompaction(uuid, SystemKeyspace.CompactionsInProgressTable.DEFAULT)); // remove old sstables from compactions that did complete for (Map.Entry> sstableFiles : directories.sstableLister().list().entrySet()) @@ -763,7 +763,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map Descriptor desc = sstableFiles.getKey(); if (completedAncestors.contains(desc.generation)) { - if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanup() + if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata() || (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())) { logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc), @@ -777,7 +777,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map SafeArg.of("generation", desc.generation), ancestorsArg); SSTable.delete(desc, sstableFiles.getValue()); Optional.ofNullable(unfinishedCompactions.get(desc.generation)) - .ifPresent(SystemKeyspace::finishCompaction); + .ifPresent(uuid -> SystemKeyspace.finishCompaction(uuid, SystemKeyspace.CompactionsInProgressTable.DEFAULT)); } } } diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index de2e89d94b..7ccd45c70e 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -95,6 +95,24 @@ public final class SystemKeyspace public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; + public enum CompactionsInProgressTable + { + DEFAULT("compactions_in_progress"), + WAL("post_wal_compactions_in_progress"); + + private final String name; + + CompactionsInProgressTable(String name) + { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + public static final CFMetaData Hints = compile(HINTS, "hints awaiting delivery", @@ -212,8 +230,20 @@ private static int getCompactionsInProgresStcsMaxThreshold() { } return val; } - private static int compactionsInProgressMaxCompactionThreshold = getCompactionsInProgresStcsMaxThreshold(); - private static final CFMetaData CompactionsInProgress = + private static final int compactionsInProgressMaxCompactionThreshold = getCompactionsInProgresStcsMaxThreshold(); + private static final CFMetaData DefaultCompactionsInProgress = + compile(COMPACTIONS_IN_PROGRESS, + "unfinished compactions", + "CREATE TABLE %s (" + + "id uuid," + + "columnfamily_name text," + + "inputs set," + + "keyspace_name text," + + "PRIMARY KEY ((id)))") + .maxCompactionThreshold(compactionsInProgressMaxCompactionThreshold) + .compactionStrategyClass(SizeTieredCompactionStrategy.class) + .compactionStrategyOptions(Collections.singletonMap("max_threshold", Integer.toString(compactionsInProgressMaxCompactionThreshold))); + private static final CFMetaData WalCompactionsInProgress = compile(COMPACTIONS_IN_PROGRESS, "unfinished compactions", "CREATE TABLE %s (" @@ -290,7 +320,8 @@ public static KSMetaData definition() Peers, PeerEvents, RangeXfers, - CompactionsInProgress, + DefaultCompactionsInProgress, + WalCompactionsInProgress, CompactionHistory, SSTableActivity, SizeEstimates, @@ -370,8 +401,10 @@ public Integer apply(SSTableReader sstable) } }); String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); - CompactionsInProgressFlusher.INSTANCE.forceBlockingFlush(); + CompactionsInProgressFlusher.INSTANCES.forEach((table, flusher) -> { + executeInternal(String.format(req, table.toString()), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); + flusher.forceBlockingFlush(); + }); return compactionId; } @@ -380,22 +413,22 @@ public Integer apply(SSTableReader sstable) * to complete successfully for this to be called. * @param taskId what was returned from {@code startCompaction} */ - public static void finishCompaction(UUID taskId) + public static void finishCompaction(UUID taskId, CompactionsInProgressTable table) { assert taskId != null; - executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId); - CompactionsInProgressFlusher.INSTANCE.forceBlockingFlush(); + executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", table.toString()), taskId); + CompactionsInProgressFlusher.INSTANCES.get(table).forceBlockingFlush(); } /** * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the * task ID of the compaction they were participating in. */ - public static Map, Map> getUnfinishedCompactions() + public static Map, Map> getUnfinishedCompactions(CompactionsInProgressTable table) { String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS)); + UntypedResultSet resultSet = executeInternal(String.format(req, table.toString())); Map, Map> unfinishedCompactions = new HashMap<>(); for (UntypedResultSet.Row row : resultSet) @@ -418,9 +451,9 @@ public static Map, Map> getUnfinishedCompact return unfinishedCompactions; } - public static void discardCompactionsInProgress() + public static void discardCompactionsInProgress(CompactionsInProgressTable table) { - ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS); + ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(table.toString()); compactionLog.truncateBlocking(false); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index c13aa0c737..742b707dd9 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -234,7 +234,7 @@ public boolean apply(SSTableReader sstable) } Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); if (taskId != null && (!abortFailed)) - SystemKeyspace.finishCompaction(taskId); + SystemKeyspace.finishCompaction(taskId, SystemKeyspace.CompactionsInProgressTable.DEFAULT); if (collector != null && ci != null) collector.finishCompaction(ci); @@ -243,6 +243,10 @@ public boolean apply(SSTableReader sstable) ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); refs.close(); controller.close(); + if (taskId != null) + { + SystemKeyspace.finishCompaction(taskId, SystemKeyspace.CompactionsInProgressTable.WAL); + } // log a bunch of statistics about the result and save to system table compaction_history long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 529e6ef1fb..5825204bef 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -64,7 +64,6 @@ import com.palantir.cassandra.db.ColumnFamilyStoreManager; import com.palantir.cassandra.settings.DisableClientInterfaceSetting; import com.palantir.logsafe.Preconditions; -import com.palantir.logsafe.Safe; import com.palantir.logsafe.SafeArg; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Config; @@ -282,7 +281,7 @@ private void completeSetupMayThrowSstableException() { } } - Map, Map> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); for (String keyspaceName : Schema.instance.getKeyspaces()) { // Skip system as we'll already clean it after the other tables @@ -297,7 +296,7 @@ private void completeSetupMayThrowSstableException() { ColumnFamilyStore.scrubDataDirectories(cfm); } } - SystemKeyspace.discardCompactionsInProgress(); + SystemKeyspace.discardCompactionsInProgress(SystemKeyspace.CompactionsInProgressTable.DEFAULT); Keyspace.setInitialized(); diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 51cd4d4fcf..643fd5e9d9 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -1885,8 +1885,8 @@ protected SSTableWriter getWriter() SSTableReader sstable2 = SSTableReader.open(sstable1.descriptor); UUID compactionTaskID = SystemKeyspace.startCompaction( - Keyspace.open(ks).getColumnFamilyStore(cf), - Collections.singleton(sstable2)); + Keyspace.open(ks).getColumnFamilyStore(cf), + Collections.singleton(sstable2)); Map unfinishedCompaction = new HashMap<>(); unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID); @@ -1897,7 +1897,7 @@ protected SSTableWriter getWriter() assertEquals(1, sstables.size()); assertTrue(sstables.containsKey(sstable1.descriptor)); - Map, Map> unfinished = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> unfinished = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); assertTrue(unfinished.isEmpty()); sstable1.selfRef().release(); sstable2.selfRef().release(); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 2b4b2032a8..7871c78c6d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -474,7 +474,7 @@ public void testRangeTombstones() @Test public void testCompactionLog() throws Exception { - SystemKeyspace.discardCompactionsInProgress(); + SystemKeyspace.discardCompactionsInProgress(SystemKeyspace.CompactionsInProgressTable.DEFAULT); String cf = "Standard4"; ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf); @@ -491,12 +491,12 @@ public Integer apply(SSTableReader sstable) } })); UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); - Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); Set unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet(); assertTrue(unfinishedCompactions.containsAll(generations)); - SystemKeyspace.finishCompaction(taskId); - compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + SystemKeyspace.finishCompaction(taskId, SystemKeyspace.CompactionsInProgressTable.DEFAULT); + compactionLogs = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); assertFalse(compactionLogs.containsKey(Pair.create(KEYSPACE1, cf))); } @@ -590,7 +590,7 @@ public void incompletedCompactionAbortNotRemovedFromCompactionsInProgress() thro .stream().map(desc -> desc.generation).collect(Collectors.toSet()); assertEquals(nonTmp, actualNonTmp); - Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); Pair pair = Pair.create(KEYSPACE1, cfName); assertTrue(compactionLogs.containsKey(pair)); From aaa616dcd00bb131806db5c6d2473bab6cd4866c Mon Sep 17 00:00:00 2001 From: Will Dey Date: Thu, 31 Oct 2024 15:19:43 -0400 Subject: [PATCH 19/19] rename --- test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 643fd5e9d9..bffaaf8342 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -2038,7 +2038,7 @@ public Map> filterValidAncestors(CFMetaData _cfMetaData } @Override - public boolean shouldSkipAncestorCleanup() + public boolean shouldSkipAncestorCleanupBasedOnAncestorMetadata() { return true; }