diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 4e63304976..3168cd2824 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -156,122 +156,127 @@ public boolean apply(SSTableReader sstable) long totalKeysWritten = 0; long estimatedKeys = 0; - try (CompactionController controller = getCompactionController(transaction.originals())) + CompactionController controller = getCompactionController(transaction.originals()); + Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); + + SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); + + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 + UUID taskId = null; + String taskIdLoggerMsg; + List newSStables; + AbstractCompactionIterable ci = null; + Refs refs = Refs.ref(actuallyCompact); + boolean readyToFinish = false; + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - - SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); - - - // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references - // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. - // See CASSANDRA-8019 and CASSANDRA-8399 - UUID taskId = null; - String taskIdLoggerMsg; - List newSStables; - AbstractCompactionIterable ci = null; - try (Refs refs = Refs.ref(actuallyCompact); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); + taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); + logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); + ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); + try (CloseableIterator iter = ci.iterator()) { - taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); - taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); - logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); - ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); - try (CloseableIterator iter = ci.iterator()) - { - long lastCheckObsoletion = start; + long lastCheckObsoletion = start; - if (!controller.cfs.getCompactionStrategy().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (!controller.cfs.getCompactionStrategy().isActive) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (collector != null) - collector.beginCompaction(ci); + if (collector != null) + collector.beginCompaction(ci); - boolean readyToFinish = false; - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + { + estimatedKeys = writer.estimatedKeys(); + while (iter.hasNext()) { - estimatedKeys = writer.estimatedKeys(); - while (iter.hasNext()) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + try (AbstractCompactedRow row = iter.next()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (writer.append(row)) + totalKeysWritten++; - try (AbstractCompactedRow row = iter.next()) + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { - if (writer.append(row)) - totalKeysWritten++; - - if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) - { - controller.maybeRefreshOverlaps(); - lastCheckObsoletion = System.nanoTime(); - } + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); } } + } - readyToFinish = true; - newSStables = writer.finish(); - } catch (Exception e) { - CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); - if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) - { - logger.error("CompactionAwareWriter failed to close correctly for {}/{}. Continuing to compact now can cause resurrection. Exiting", - cfs.keyspace.getName(), cfs.name, e); - System.exit(1); - } - throw exception; + readyToFinish = true; + newSStables = writer.finish(); + } catch (Exception e) { + CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); + if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) + { + logger.error("CompactionAwareWriter failed to close correctly for {}/{}. Continuing to compact now can cause resurrection. Exiting", + cfs.keyspace.getName(), cfs.name, e); + System.exit(1); } + throw exception; } } - finally - { - Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); - - if (collector != null && ci != null) - collector.finishCompaction(ci); - } - - try - { - ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); - } - catch (Exception e) - { - logger.error("Failed to write to the write-ahead log for {}/{}. Continuing to compact now can cause resurrection. Exiting", - cfs.keyspace.getName(), cfs.name, e); - System.exit(1); + } + finally + { + if (!readyToFinish) { + // TODO(wdey): refactor all of the trys + try (Refs closedRefs = refs; CompactionController closedController = controller) {} } + Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(transaction.originals()); - long endsize = SSTableReader.getTotalBytes(newSStables); - double ratio = (double) endsize / (double) startsize; - - StringBuilder newSSTableNames = new StringBuilder(); - for (SSTableReader reader : newSStables) - newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + if (collector != null && ci != null) + collector.finishCompaction(ci); + } - if (offline) - { - Refs.release(Refs.selfRefs(newSStables)); - } - else - { - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); - logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); - - // update the metrics - cfs.metric.compactionBytesWritten.inc(endsize); - cfs.metric.compactionsCompleted.inc(); - } + try + { + ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); + } + catch (Exception e) + { + logger.error("Failed to write to the write-ahead log for {}/{}. Continuing to compact now can cause resurrection. Exiting", + cfs.keyspace.getName(), cfs.name, e); + System.exit(1); + } + refs.close(); + controller.close(); + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); + + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(transaction.originals()); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + if (offline) + { + Refs.release(Refs.selfRefs(newSStables)); + } + else + { + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); + logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); + + // update the metrics + cfs.metric.compactionBytesWritten.inc(endsize); + cfs.metric.compactionsCompleted.inc(); } }