From abc7c8f919cc9e32d5961cf86d261c5858e69b1b Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 11 Apr 2024 11:45:37 +0300 Subject: [PATCH] refactor: decouple original size from chunking Chunking is defined by original chunk size, not original content size. This PR removed the scenario where these two sizes are conflated (e.g. passing original size zero to disable chunking, etc.). --- .../tieredstorage/RemoteStorageManager.java | 21 +++--- .../BaseTransformChunkEnumeration.java | 10 ++- .../transform/TransformFinisher.java | 44 +++++++----- .../transform/TransformsEndToEndTest.java | 68 ++++++++++--------- 4 files changed, 79 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index bac574a43..46c5a7d8f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -457,8 +457,19 @@ InputStream transformIndex(final IndexType indexType, ); } final var transformFinisher = new TransformFinisher(transformEnum, size); + // Getting next element and expecting that it is the only one. + // No need to get a sequenced input stream final var inputStream = transformFinisher.nextElement(); - segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size()); + final var chunkIndex = transformFinisher.chunkIndex(); + // by getting a chunk index, means that the transformation is completed. + if (chunkIndex == null) { + throw new IllegalStateException("Chunking disabled when single chunk is expected"); + } + if (chunkIndex.chunks().size() != 1) { + // not expected, as next element run once. But for safety + throw new IllegalStateException("Number of chunks different than 1, single chunk is expected"); + } + segmentIndexBuilder.add(indexType, chunkIndex.chunks().get(0).range().size()); return inputStream; } else { segmentIndexBuilder.add(indexType, 0); @@ -466,14 +477,6 @@ InputStream transformIndex(final IndexType indexType, } } - private Chunk singleChunk(final ChunkIndex chunkIndex) { - final var chunks = chunkIndex.chunks(); - if (chunks.size() != 1) { - throw new IllegalStateException("Single chunk expected when transforming indexes"); - } - return chunks.get(0); - } - void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ChunkIndex chunkIndex, final SegmentIndexesV1 segmentIndexes, diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java index 3c781eb81..fde97243a 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java @@ -32,12 +32,10 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration private byte[] chunk = null; - public BaseTransformChunkEnumeration(final InputStream inputStream) { - this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); - - this.originalChunkSize = 0; - } - + /** + * @param inputStream original content + * @param originalChunkSize chunk size from the original content. If zero, it disables chunking. + */ public BaseTransformChunkEnumeration(final InputStream inputStream, final int originalChunkSize) { this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java index 6f1d9c50f..44c6b50cb 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java @@ -27,8 +27,6 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder; import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder; -// TODO test transforms and detransforms with property-based tests - /** * The transformation finisher. * @@ -36,34 +34,46 @@ * so that it could be used in {@link SequenceInputStream}. * *

It's responsible for building the chunk index. + * The chunk index is empty (i.e. null) if chunking has been disabled (i.e. chunk size is zero), + * but could also have a single chunk if the chunk size is equal or higher to the original file size. + * Otherwise, the chunk index will contain more than one chunk. */ public class TransformFinisher implements Enumeration { private final TransformChunkEnumeration inner; private final AbstractChunkIndexBuilder chunkIndexBuilder; - private final int originalFileSize; private ChunkIndex chunkIndex = null; - public TransformFinisher(final TransformChunkEnumeration inner) { - this(inner, 0); - } - - public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) { + public TransformFinisher( + final TransformChunkEnumeration inner, + final int originalFileSize + ) { this.inner = Objects.requireNonNull(inner, "inner cannot be null"); - this.originalFileSize = originalFileSize; if (originalFileSize < 0) { throw new IllegalArgumentException( "originalFileSize must be non-negative, " + originalFileSize + " given"); } + this.chunkIndexBuilder = chunkIndexBuilder(inner, inner.originalChunkSize(), originalFileSize); + } + + private static AbstractChunkIndexBuilder chunkIndexBuilder( + final TransformChunkEnumeration inner, + final int originalChunkSize, + final int originalFileSize + ) { final Integer transformedChunkSize = inner.transformedChunkSize(); - if (originalFileSize == 0) { - this.chunkIndexBuilder = null; - } else if (transformedChunkSize == null) { - this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize); + if (transformedChunkSize == null) { + return new VariableSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize + ); } else { - this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder( - inner.originalChunkSize(), originalFileSize, transformedChunkSize); + return new FixedSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize, + transformedChunkSize + ); } } @@ -75,7 +85,7 @@ public boolean hasMoreElements() { @Override public InputStream nextElement() { final var chunk = inner.nextElement(); - if (chunkIndexBuilder != null) { + if (inner.originalChunkSize() > 0) { if (hasMoreElements()) { this.chunkIndexBuilder.addChunk(chunk.length); } else { @@ -87,7 +97,7 @@ public InputStream nextElement() { } public ChunkIndex chunkIndex() { - if (chunkIndex == null && originalFileSize > 0) { + if (chunkIndex == null && inner.originalChunkSize() > 0) { throw new IllegalStateException("Chunk index was not built, was finisher used?"); } return this.chunkIndex; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java index cad7355d0..21c0dff1e 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java @@ -70,39 +70,43 @@ void compressionAndEncryption(final int chunkSize) throws IOException { private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException { // Transform. - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - new ByteArrayInputStream(original), chunkSize); - if (compression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryption) { - transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier); - } - final var transformFinisher = chunkSize == 0 - ? new TransformFinisher(transformEnum) - : new TransformFinisher(transformEnum, ORIGINAL_SIZE); - final byte[] uploadedData; - final ChunkIndex chunkIndex; - try (final var sis = transformFinisher.toInputStream()) { - uploadedData = sis.readAllBytes(); - chunkIndex = transformFinisher.chunkIndex(); - } + try (final var inputStream = new ByteArrayInputStream(original)) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize); + if (compression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryption) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + AesKeyAwareTest::encryptionCipherSupplier + ); + } + final var transformFinisher = new TransformFinisher(transformEnum, ORIGINAL_SIZE); + final byte[] uploadedData; + final ChunkIndex chunkIndex; + try (final var sis = transformFinisher.toInputStream()) { + uploadedData = sis.readAllBytes(); + chunkIndex = transformFinisher.chunkIndex(); + } - // Detransform. - DetransformChunkEnumeration detransformEnum = chunkIndex == null - ? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData)) - : new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks()); - if (encryption) { - detransformEnum = new DecryptionChunkEnumeration( - detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); - } - if (compression) { - detransformEnum = new DecompressionChunkEnumeration(detransformEnum); - } - final var detransformFinisher = new DetransformFinisher(detransformEnum); - try (final var sis = detransformFinisher.toInputStream()) { - final byte[] downloaded = sis.readAllBytes(); - assertThat(downloaded).isEqualTo(original); + // Detransform. + try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) { + DetransformChunkEnumeration detransformEnum = chunkIndex == null + ? new BaseDetransformChunkEnumeration(uploadedStream) + : new BaseDetransformChunkEnumeration(uploadedStream, chunkIndex.chunks()); + if (encryption) { + detransformEnum = new DecryptionChunkEnumeration( + detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); + } + if (compression) { + detransformEnum = new DecompressionChunkEnumeration(detransformEnum); + } + final var detransformFinisher = new DetransformFinisher(detransformEnum); + try (final var sis = detransformFinisher.toInputStream()) { + final byte[] downloaded = sis.readAllBytes(); + assertThat(downloaded).isEqualTo(original); + } + } } } }