From 694029965821f9b33255f4bb5f50af97c0447460 Mon Sep 17 00:00:00 2001 From: Michael Koepf <47541996+michaelkoepf@users.noreply.github.com> Date: Tue, 14 Jan 2025 18:21:32 +0100 Subject: [PATCH] Harmonization [commit will be amended!] --- .../client/write/WriterMemoryBufferTest.java | 54 ------- .../fluss/memory/LazyMemorySegmentPool.java | 49 +++++-- .../memory/LazyMemorySegmentPoolTest.java | 138 ++++++++++++++++-- 3 files changed, 163 insertions(+), 78 deletions(-) diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java index 426df82e1..fba25fbec 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java @@ -16,7 +16,6 @@ package com.alibaba.fluss.client.write; -import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.exception.BufferExhaustedException; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.memory.MemorySegment; @@ -47,44 +46,6 @@ public class WriterMemoryBufferTest { private final long maxBlockTimeMs = 10; - @Test - void testSimple() throws Exception { - // test the simple non-blocking allocation paths. - long totalMemory = MemorySize.parse("64kb").getBytes(); - int batchSize = (int) MemorySize.parse("1kb").getBytes(); - - WriterMemoryBuffer writerMemoryBuffer = new WriterMemoryBuffer(totalMemory, batchSize); - MemorySegment segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs); - assertThat(segment.size()).isEqualTo(batchSize); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory - batchSize); - segment.putInt(0, 1); - writerMemoryBuffer.deallocate(segment); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - - segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs); - writerMemoryBuffer.deallocate(segment); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - - segment = writerMemoryBuffer.allocate(2 * batchSize, maxBlockTimeMs); - writerMemoryBuffer.deallocate(segment); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - } - - @Test - void testCanAllocateMoreMemoryThanHave() throws Exception { - WriterMemoryBuffer pool = new WriterMemoryBuffer(1024, 512); - MemorySegment segment = pool.allocate(1024, maxBlockTimeMs); - assertThat(segment.size()).isEqualTo(1024); - pool.deallocate(segment); - assertThatThrownBy(() -> pool.allocate(1025, maxBlockTimeMs)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Attempt to allocate memory segment size"); - } - @Test void testDelayedAllocation() throws Exception { WriterMemoryBuffer pool = new WriterMemoryBuffer(5 * 1024, 1024); @@ -223,21 +184,6 @@ protected MemorySegment allocateMemorySegment(int size) { assertThat(pool.getAvailableMemory()).isEqualTo(1024); } - @Test - void testCloseAllocations() throws Exception { - WriterMemoryBuffer pool = new WriterMemoryBuffer(10, 1); - MemorySegment segment = pool.allocate(1, Long.MAX_VALUE); - - // Close the memory segment pool. This should prevent any further allocations. - pool.close(); - - assertThatThrownBy(() -> pool.allocate(1, maxBlockTimeMs)) - .isInstanceOf(FlussRuntimeException.class); - - // ensure de-allocation still works. - pool.deallocate(segment); - } - @Test void testCloseNotifyWaiters() throws Exception { final int numWorkers = 2; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java index 0ddeb45c9..b669e4cbb 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java @@ -24,7 +24,6 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; - import java.io.Closeable; import java.io.EOFException; import java.io.IOException; @@ -45,7 +44,7 @@ @ThreadSafe public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { - private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024; + private static final long DEFAULT_PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024; private static final long DEFAULT_WAIT_TIMEOUT_MS = Long.MAX_VALUE; /** The lock to guard the memory pool. */ @@ -67,7 +66,9 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { private int pageUsage; - LazyMemorySegmentPool(int maxPages, int pageSize, long maxTimeToBlockMs) { + @VisibleForTesting + LazyMemorySegmentPool( + int maxPages, int pageSize, long maxTimeToBlockMs, long perRequestMemorySize) { checkArgument(maxPages > 0, "MaxPages for LazyMemorySegmentPool should be greater than 0."); checkArgument( pageSize >= 64, @@ -75,16 +76,22 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { + pageSize + " bytes."); checkArgument( - PER_REQUEST_MEMORY_SIZE > pageSize, + perRequestMemorySize >= pageSize, + String.format( + "Page size should be less than or equal to per request memory size. Page size is:" + + " %s KB, per request memory size is %s KB.", + pageSize / 1024, perRequestMemorySize / 1024)); + checkArgument( + perRequestMemorySize <= ((long) pageSize * maxPages), String.format( - "Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:" - + " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.", - pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024)); + "Per request memory size must be less than or equal to total memory (page size * max pages). Per request memory size is:" + + " %s KB, total memory is %s KB.", + perRequestMemorySize / 1024, ((long) pageSize * maxPages) / 1024)); this.cachePages = new ArrayList<>(); this.pageUsage = 0; this.maxPages = maxPages; this.pageSize = pageSize; - this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / pageSize())); + this.perRequestPages = Math.max(1, (int) (perRequestMemorySize / pageSize())); this.closed = false; this.waiters = new ArrayDeque<>(); @@ -105,14 +112,16 @@ public static LazyMemorySegmentPool createWriterBufferPool(Configuration conf) { int pageSize = (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes(); int segmentCount = (int) (totalBytes / pageSize); - return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS); + return new LazyMemorySegmentPool( + segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, DEFAULT_PER_REQUEST_MEMORY_SIZE); } public static LazyMemorySegmentPool createServerBufferPool(Configuration conf) { long totalBytes = conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes(); int pageSize = (int) conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes(); int segmentCount = (int) (totalBytes / pageSize); - return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS); + return new LazyMemorySegmentPool( + segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, DEFAULT_PER_REQUEST_MEMORY_SIZE); } @Override @@ -121,8 +130,9 @@ public MemorySegment nextSegment() throws IOException { lock, () -> { checkClosed(); + if (freePages() == 0) { - waitForSegment(1); + waitForSegmentIfSatisfiableOrElseThrow(1); } lazilyAllocatePages(freePages()); @@ -138,8 +148,9 @@ public List allocatePages(int requiredPages) throws IOException { lock, () -> { checkClosed(); + if (freePages() < requiredPages) { - waitForSegment(requiredPages); + waitForSegmentIfSatisfiableOrElseThrow(requiredPages); } lazilyAllocatePages(requiredPages); @@ -157,15 +168,23 @@ private List drain(int numPages) { } private void lazilyAllocatePages(int required) { - if (cachePages.isEmpty()) { - int numPages = Math.min(required, perRequestPages); + if (cachePages.size() < required) { + int numPages = Math.max(required - cachePages.size(), perRequestPages); for (int i = 0; i < numPages; i++) { cachePages.add(MemorySegment.allocateHeapMemory(pageSize)); } } } - private void waitForSegment(int requiredPages) throws EOFException { + private void waitForSegmentIfSatisfiableOrElseThrow(int requiredPages) throws EOFException { + if (maxPages < requiredPages) { // do not wait if the request is impossible to satisfy + throw new EOFException( + String.format( + "Allocation request cannot be satisfied because the number of maximum available pages is " + + "exceeded. Available pages: %d. Request pages: %d", + this.maxPages, requiredPages)); + } + Condition moreMemory = lock.newCondition(); waiters.addLast(moreMemory); try { diff --git a/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java b/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java index eaf36fd83..48d945ae8 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java @@ -16,7 +16,12 @@ package com.alibaba.fluss.memory; +import com.alibaba.fluss.config.MemorySize; +import com.alibaba.fluss.exception.FlussRuntimeException; + import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.EOFException; import java.io.IOException; @@ -32,21 +37,130 @@ /** Test for {@link com.alibaba.fluss.memory.LazyMemorySegmentPool}. */ public class LazyMemorySegmentPoolTest { + @ParameterizedTest + @ValueSource(ints = {1, 2, 7, 13, 19}) // >= 1 + void testSimpleAllocationAndReturn(int perRequestMemorySizeFactor) throws Exception { + // test the simple non-blocking allocation paths. + int totalMemory = Long.valueOf(MemorySize.parse("64kb").getBytes()).intValue(); + int batchSize = (int) MemorySize.parse("1kb").getBytes(); + int perRequestMemorySize = + Long.valueOf(((long) perRequestMemorySizeFactor * batchSize)).intValue(); + int totalSegmentCount = totalMemory / batchSize; + int expectedPerRequestSegmentCount = + Long.valueOf((((long) perRequestMemorySizeFactor * batchSize) / batchSize)) + .intValue(); + + LazyMemorySegmentPool pool = + buildLazyMemorySegmentSource( + totalSegmentCount, batchSize, 10, perRequestMemorySize); + + // allocate one page and check the segment and the memory pool + List maybeSegments = pool.allocatePages(1); + assertThat(maybeSegments.size()).isEqualTo(1); + MemorySegment segment = maybeSegments.get(0); + assertThat(segment.size()).isEqualTo(batchSize); + assertThat(pool.availableMemory()).isEqualTo(totalMemory - batchSize); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount - 1); + segment.putInt(0, 1); + assertThat(segment.getInt(0)).isEqualTo(1); + + // return the page to the memory pool and check the memory pool + pool.returnPage(segment); + assertThat(pool.availableMemory()).isEqualTo(totalMemory); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount); + + // allocate another page and check the segment and the memory pool + maybeSegments = pool.allocatePages(1); + assertThat(maybeSegments.size()).isEqualTo(1); + segment = maybeSegments.get(0); + assertThat(segment.size()).isEqualTo(batchSize); + assertThat(pool.availableMemory()).isEqualTo(totalMemory - batchSize); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount - 1); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount - 1); + + // return the page to the memory pool and check the memory pool + pool.returnPage(segment); + assertThat(pool.availableMemory()).isEqualTo(totalMemory); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount); + + // allocate another 2 pages and check the segments and the memory pool + maybeSegments = pool.allocatePages(2); + assertThat(maybeSegments.size()).isEqualTo(2); + List segments = maybeSegments; + assertThat(pool.availableMemory()).isEqualTo(totalMemory - (2L * batchSize)); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount - 2); + assertThat(pool.getAllCachePages().size()) + .isEqualTo(Math.max(expectedPerRequestSegmentCount - 2, 0)); + pool.returnAll(segments); + assertThat(pool.availableMemory()).isEqualTo(totalMemory); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount); + assertThat(pool.getAllCachePages().size()) + .isEqualTo(Math.max(expectedPerRequestSegmentCount, 2)); + } + + @Test + void testCannotAllocateMorePagesThanAvailable() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(2, 512, 10, 512); + List segment = pool.allocatePages(2); + assertThat(segment.size()).isEqualTo(2); + assertThat(pool.availableMemory()).isEqualTo(0); + assertThat(pool.freePages()).isEqualTo(0); + assertThat(pool.getAllCachePages().size()).isEqualTo(0); + + pool.returnAll(segment); + assertThat(pool.availableMemory()).isEqualTo(1024); + assertThat(pool.freePages()).isEqualTo(2); + assertThat(pool.getAllCachePages().size()).isEqualTo(2); + + assertThatThrownBy(() -> pool.allocatePages(3)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("Available pages: 2. Request pages: 3"); + } + + @Test + void testCloseClearsCachedPages() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 64, 100, 64); + pool.returnPage(pool.nextSegment()); + assertThat(pool.getAllCachePages().size()).isGreaterThan(0); + pool.close(); + assertThat(pool.getAllCachePages()).isEmpty(); + } + + @Test + void testClosePreventsFurtherAllocationButAllowsReturningSegments() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 512, Long.MAX_VALUE, 512); + List segment = pool.allocatePages(1); + + // Close the memory segment pool. This should prevent any further allocations. + pool.close(); + + assertThatThrownBy(() -> pool.allocatePages(1)).isInstanceOf(FlussRuntimeException.class); + assertThatThrownBy(pool::nextSegment).isInstanceOf(FlussRuntimeException.class); + + // ensure de-allocation still works. + pool.returnAll(segment); + } + @Test void testNextSegmentWaiter() throws Exception { - LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64); + LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64, 100, 64); assertThat(source.pageSize()).isEqualTo(64); + assertThat(source.availableMemory()).isEqualTo(64 * 10); assertThat(source.freePages()).isEqualTo(10); MemorySegment ms1 = source.nextSegment(); + assertThat(source.availableMemory()).isEqualTo(64 * 9); assertThat(source.freePages()).isEqualTo(9); MemorySegment ms2 = source.nextSegment(); + assertThat(source.availableMemory()).isEqualTo(64 * 8); assertThat(source.freePages()).isEqualTo(8); for (int i = 0; i < 8; i++) { source.nextSegment(); } + assertThat(source.availableMemory()).isEqualTo(0); assertThat(source.freePages()).isEqualTo(0); assertThatThrownBy(source::nextSegment) @@ -64,20 +178,24 @@ void testNextSegmentWaiter() throws Exception { @Test void testIllegalArgument() { - assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64)) + assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64, 100, 64)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("MaxPages for LazyMemorySegmentPool should be greater than 0."); - assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 32 * 1024 * 1024)) + assertThatThrownBy( + () -> + buildLazyMemorySegmentSource( + 10, 32 * 1024 * 1024, 100, (32 * 1024 * 1024) / 2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Page size should be less than PER_REQUEST_MEMORY_SIZE. " - + "Page size is: 32768 KB, PER_REQUEST_MEMORY_SIZE is 16384 KB."); - assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30)) + "Page size should be less than or equal to per request memory size. " + + "Page size is: 32768 KB, per request memory size is 16384 KB."); + assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30, 100, 30)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Page size should be greater than 64 bytes to include the record batch header, but is 30 bytes."); - LazyMemorySegmentPool lazyMemorySegmentPool = buildLazyMemorySegmentSource(10, 100); + LazyMemorySegmentPool lazyMemorySegmentPool = + buildLazyMemorySegmentSource(10, 100, 100, 100); assertThatThrownBy( () -> lazyMemorySegmentPool.returnAll( @@ -87,8 +205,10 @@ void testIllegalArgument() { .hasMessage("Return too more memories."); } - private LazyMemorySegmentPool buildLazyMemorySegmentSource(int maxPages, int pageSize) { - return new LazyMemorySegmentPool(maxPages, pageSize, 100); + private LazyMemorySegmentPool buildLazyMemorySegmentSource( + int maxPages, int pageSize, long maxTimeToBlockMs, int perRequestMemorySize) { + return new LazyMemorySegmentPool( + maxPages, pageSize, maxTimeToBlockMs, perRequestMemorySize); } private CountDownLatch asyncReturnAll(