From 0179de10de772241564d5eba6996d7e85fed7ed4 Mon Sep 17 00:00:00 2001 From: Michael Koepf <47541996+michaelkoepf@users.noreply.github.com> Date: Tue, 14 Jan 2025 18:21:32 +0100 Subject: [PATCH] Harmonization [commit will be amended!] --- .../client/write/WriterMemoryBufferTest.java | 114 ----- .../alibaba/fluss/config/ConfigOptions.java | 16 + .../fluss/memory/LazyMemorySegmentPool.java | 87 ++-- .../memory/LazyMemorySegmentPoolTest.java | 429 +++++++++++++++++- 4 files changed, 486 insertions(+), 160 deletions(-) diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java index 426df82e1..311ea8c33 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/WriterMemoryBufferTest.java @@ -16,7 +16,6 @@ package com.alibaba.fluss.client.write; -import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.exception.BufferExhaustedException; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.memory.MemorySegment; @@ -29,16 +28,13 @@ import java.util.Deque; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; -import static com.alibaba.fluss.utils.function.ThrowingRunnable.unchecked; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -47,70 +43,6 @@ public class WriterMemoryBufferTest { private final long maxBlockTimeMs = 10; - @Test - void testSimple() throws Exception { - // test the simple non-blocking allocation paths. - long totalMemory = MemorySize.parse("64kb").getBytes(); - int batchSize = (int) MemorySize.parse("1kb").getBytes(); - - WriterMemoryBuffer writerMemoryBuffer = new WriterMemoryBuffer(totalMemory, batchSize); - MemorySegment segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs); - assertThat(segment.size()).isEqualTo(batchSize); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory - batchSize); - segment.putInt(0, 1); - writerMemoryBuffer.deallocate(segment); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - - segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs); - writerMemoryBuffer.deallocate(segment); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - - segment = writerMemoryBuffer.allocate(2 * batchSize, maxBlockTimeMs); - writerMemoryBuffer.deallocate(segment); - assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory); - assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize); - } - - @Test - void testCanAllocateMoreMemoryThanHave() throws Exception { - WriterMemoryBuffer pool = new WriterMemoryBuffer(1024, 512); - MemorySegment segment = pool.allocate(1024, maxBlockTimeMs); - assertThat(segment.size()).isEqualTo(1024); - pool.deallocate(segment); - assertThatThrownBy(() -> pool.allocate(1025, maxBlockTimeMs)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Attempt to allocate memory segment size"); - } - - @Test - void testDelayedAllocation() throws Exception { - WriterMemoryBuffer pool = new WriterMemoryBuffer(5 * 1024, 1024); - MemorySegment segment = pool.allocate(1024, maxBlockTimeMs); - CountDownLatch doDealloc = asyncDeallocate(pool, segment); - CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); - assertThat(allocation.getCount()).isEqualTo(1); - doDealloc.countDown(); // return the memory. - assertThat(allocation.await(1, TimeUnit.SECONDS)).isTrue(); - } - - @Test - void testBufferExhaustedExceptionIsThrown() throws Exception { - // If there is not enough memory to allocate and the elapsed time is greater than the max - // specified block time. - WriterMemoryBuffer pool = new WriterMemoryBuffer(2, 1); - pool.allocate(1, maxBlockTimeMs); - assertThatThrownBy(() -> pool.allocate(2, maxBlockTimeMs)) - .isInstanceOf(BufferExhaustedException.class) - .hasMessageContaining( - "Failed to allocate 2 bytes within the configured max blocking time 10 ms." - + " Total memory: 2 bytes. Available memory: 1 bytes. page size: 1 bytes"); - assertThat(pool.queued()).isEqualTo(0); - assertThat(pool.getAvailableMemory()).isEqualTo(1); - } - @Test void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception { WriterMemoryBuffer pool = new WriterMemoryBuffer(2, 1); @@ -223,21 +155,6 @@ protected MemorySegment allocateMemorySegment(int size) { assertThat(pool.getAvailableMemory()).isEqualTo(1024); } - @Test - void testCloseAllocations() throws Exception { - WriterMemoryBuffer pool = new WriterMemoryBuffer(10, 1); - MemorySegment segment = pool.allocate(1, Long.MAX_VALUE); - - // Close the memory segment pool. This should prevent any further allocations. - pool.close(); - - assertThatThrownBy(() -> pool.allocate(1, maxBlockTimeMs)) - .isInstanceOf(FlussRuntimeException.class); - - // ensure de-allocation still works. - pool.deallocate(segment); - } - @Test void testCloseNotifyWaiters() throws Exception { final int numWorkers = 2; @@ -316,35 +233,4 @@ public void run() { } } } - - private CountDownLatch asyncDeallocate( - final WriterMemoryBuffer pool, final MemorySegment segment) { - final CountDownLatch latch = new CountDownLatch(1); - Thread thread = - new Thread( - unchecked( - () -> { - latch.await(); - pool.deallocate(segment); - })); - thread.start(); - return latch; - } - - private CountDownLatch asyncAllocate(final WriterMemoryBuffer pool, final int size) { - final CountDownLatch completed = new CountDownLatch(1); - Thread thread = - new Thread( - () -> { - try { - pool.allocate(size, maxBlockTimeMs); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - completed.countDown(); - } - }); - thread.start(); - return completed; - } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 8dde8a3f5..59db045b7 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -252,6 +252,14 @@ public class ConfigOptions { + SERVER_BUFFER_MEMORY_SIZE.key() + "')."); + // TODO: #323/#324 + // noDefaultValue() leads to NPE in some code parts, hence we default to 0b + public static final ConfigOption SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE = + key("server.buffer.per-request-memory-size") + .memoryType() + .defaultValue(MemorySize.parse("0b")) + .withDescription(""); + // ------------------------------------------------------------------ // ZooKeeper Settings // ------------------------------------------------------------------ @@ -620,6 +628,14 @@ public class ConfigOptions { "The writer or walBuilder will attempt to batch records together into one batch for" + " the same bucket. This helps performance on both the client and the server."); + // TODO: #323/#324 + // noDefaultValue() leads to NPE in some code parts, hence we default to 0b + public static final ConfigOption CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE = + key("client.writer.per-request-memory-size") + .memoryType() + .defaultValue(MemorySize.parse("0b")) + .withDescription(""); + @Deprecated public static final ConfigOption CLIENT_WRITER_LEGACY_BATCH_SIZE = key("client.writer.legacy.batch-size") diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java index 0ddeb45c9..351963f43 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.annotation.VisibleForTesting; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.exception.FlussRuntimeException; import javax.annotation.concurrent.GuardedBy; @@ -45,7 +46,6 @@ @ThreadSafe public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { - private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024; private static final long DEFAULT_WAIT_TIMEOUT_MS = Long.MAX_VALUE; /** The lock to guard the memory pool. */ @@ -54,12 +54,13 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { @GuardedBy("lock") private final List cachePages; + @VisibleForTesting @GuardedBy("lock") - private final Deque waiters; + final Deque waiters; private final int pageSize; private final int maxPages; - private final int perRequestPages; + @VisibleForTesting final int perRequestPages; private final long maxTimeToBlockMs; @GuardedBy("lock") @@ -67,7 +68,9 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { private int pageUsage; - LazyMemorySegmentPool(int maxPages, int pageSize, long maxTimeToBlockMs) { + @VisibleForTesting + LazyMemorySegmentPool( + int maxPages, int pageSize, long maxTimeToBlockMs, long perRequestMemorySize) { checkArgument(maxPages > 0, "MaxPages for LazyMemorySegmentPool should be greater than 0."); checkArgument( pageSize >= 64, @@ -75,16 +78,22 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { + pageSize + " bytes."); checkArgument( - PER_REQUEST_MEMORY_SIZE > pageSize, + perRequestMemorySize >= pageSize, String.format( - "Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:" - + " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.", - pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024)); + "Page size should be less than or equal to per request memory size. Page size is:" + + " %s KB, per request memory size is %s KB.", + pageSize / 1024, perRequestMemorySize / 1024)); + checkArgument( + perRequestMemorySize <= ((long) pageSize * maxPages), + String.format( + "Per request memory size must be less than or equal to total memory (page size * max pages). Per request memory size is:" + + " %s KB, total memory is %s KB.", + perRequestMemorySize / 1024, ((long) pageSize * maxPages) / 1024)); this.cachePages = new ArrayList<>(); this.pageUsage = 0; this.maxPages = maxPages; this.pageSize = pageSize; - this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / pageSize())); + this.perRequestPages = Math.max(1, (int) (perRequestMemorySize / pageSize())); this.closed = false; this.waiters = new ArrayDeque<>(); @@ -102,34 +111,33 @@ public static LazyMemorySegmentPool createWriterBufferPool(Configuration conf) { totalBytes, ConfigOptions.CLIENT_WRITER_BATCH_SIZE.key(), batchSize)); - int pageSize = (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes(); + long perRequestMemorySize = + conf.getOptional(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE) + .filter(s -> s.getBytes() > 0) + .map(MemorySize::getBytes) + .orElse((long) pageSize); int segmentCount = (int) (totalBytes / pageSize); - return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS); + return new LazyMemorySegmentPool( + segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, perRequestMemorySize); } public static LazyMemorySegmentPool createServerBufferPool(Configuration conf) { long totalBytes = conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes(); int pageSize = (int) conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes(); + long perRequestMemorySize = + conf.getOptional(ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE) + .filter(s -> s.getBytes() > 0) + .map(MemorySize::getBytes) + .orElse((long) pageSize); int segmentCount = (int) (totalBytes / pageSize); - return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS); + return new LazyMemorySegmentPool( + segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, perRequestMemorySize); } @Override public MemorySegment nextSegment() throws IOException { - return inLock( - lock, - () -> { - checkClosed(); - if (freePages() == 0) { - waitForSegment(1); - } - - lazilyAllocatePages(freePages()); - - this.pageUsage++; - return cachePages.remove(this.cachePages.size() - 1); - }); + return inLock(lock, () -> allocatePages(1).get(0)); } @Override @@ -138,8 +146,9 @@ public List allocatePages(int requiredPages) throws IOException { lock, () -> { checkClosed(); + if (freePages() < requiredPages) { - waitForSegment(requiredPages); + waitForSegmentOrElseThrow(requiredPages); } lazilyAllocatePages(requiredPages); @@ -156,16 +165,25 @@ private List drain(int numPages) { return pages; } - private void lazilyAllocatePages(int required) { - if (cachePages.isEmpty()) { - int numPages = Math.min(required, perRequestPages); + @VisibleForTesting + protected void lazilyAllocatePages(int required) { + if (cachePages.size() < required) { + int numPages = Math.max(required - cachePages.size(), perRequestPages); for (int i = 0; i < numPages; i++) { cachePages.add(MemorySegment.allocateHeapMemory(pageSize)); } } } - private void waitForSegment(int requiredPages) throws EOFException { + private void waitForSegmentOrElseThrow(int requiredPages) throws EOFException { + if (maxPages < requiredPages) { // immediately fail if the request is impossible to satisfy + throw new EOFException( + String.format( + "Allocation request cannot be satisfied because the number of maximum available pages is " + + "exceeded. Available pages: %d. Requested pages: %d", + this.maxPages, requiredPages)); + } + Condition moreMemory = lock.newCondition(); waiters.addLast(moreMemory); try { @@ -181,7 +199,7 @@ private void waitForSegment(int requiredPages) throws EOFException { + pageSize + " bytes. Available pages: " + freePages() - + ". Request pages: " + + ". Requested pages: " + requiredPages); } checkClosed(); @@ -199,6 +217,7 @@ public int pageSize() { return pageSize; } + @Override public int totalSize() { return maxPages * pageSize; } @@ -216,10 +235,11 @@ public void returnAll(List memory) { inLock( lock, () -> { - pageUsage -= memory.size(); - if (this.pageUsage < 0) { + final int newPageUsage = pageUsage - memory.size(); + if (newPageUsage < 0) { throw new RuntimeException("Return too more memories."); } + pageUsage = newPageUsage; cachePages.addAll(memory); for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) { waiters.pollFirst().signal(); @@ -237,6 +257,7 @@ public long availableMemory() { return ((long) freePages()) * pageSize; } + @Override public void close() { inLock( lock, diff --git a/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java b/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java index eaf36fd83..7eb179045 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java @@ -16,15 +16,31 @@ package com.alibaba.fluss.memory; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.MemorySize; +import com.alibaba.fluss.exception.FlussRuntimeException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.EOFException; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; import static com.alibaba.fluss.utils.function.ThrowingRunnable.unchecked; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -32,28 +48,374 @@ /** Test for {@link com.alibaba.fluss.memory.LazyMemorySegmentPool}. */ public class LazyMemorySegmentPoolTest { + @Nested + class FactoryMethodTest { + + private Configuration conf; + + @BeforeEach + void setUp() throws Exception { + conf = new Configuration(); + } + + @Test + void testCreateWriterBufferPoolWithDefaultValuesReturnsCorrectInstance() { + LazyMemorySegmentPool pool = LazyMemorySegmentPool.createWriterBufferPool(conf); + assertThat(pool).isNotNull(); + assertThat(pool.totalSize()) + .isEqualTo(conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.pageSize()) + .isEqualTo(conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes()); + assertThat(pool.availableMemory()) + .isEqualTo(conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.freePages()) + .isEqualTo( + conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE).getBytes() + / conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE) + .getBytes()); + assertThat(pool.getAllCachePages().size()).isEqualTo(0); + assertThat(pool.perRequestPages).isEqualTo(1); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 7, 13, 19}) // >= 1 + void testCreateReaderBufferPoolWithCustomValuesReturnsCorrectInstance( + int perRequestMemorySizeFactor) { + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("128kb")); + conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("64kb")); + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, MemorySize.parse("2kb")); + conf.set( + ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE, + MemorySize.parse((perRequestMemorySizeFactor * 2) + "kb")); + + LazyMemorySegmentPool pool = LazyMemorySegmentPool.createWriterBufferPool(conf); + assertThat(pool).isNotNull(); + assertThat(pool.totalSize()) + .isEqualTo(conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.pageSize()) + .isEqualTo(conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes()); + assertThat(pool.availableMemory()) + .isEqualTo(conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.freePages()) + .isEqualTo( + conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE).getBytes() + / conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE) + .getBytes()); + assertThat(pool.getAllCachePages().size()).isEqualTo(0); + assertThat(pool.perRequestPages).isEqualTo(perRequestMemorySizeFactor); + } + + @Test + void testCreateWriterBufferPoolIllegalArguments() { + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("96kb")); + conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("64kb")); + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, MemorySize.parse("4kb")); + conf.set(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE, MemorySize.parse("8kb")); + + assertThatThrownBy(() -> LazyMemorySegmentPool.createWriterBufferPool(conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("should be at least twice of batch size"); + } + + @Test + void testCreateServerBufferPoolWithDefaultValuesReturnsCorrectInstance() { + LazyMemorySegmentPool pool = LazyMemorySegmentPool.createServerBufferPool(conf); + assertThat(pool).isNotNull(); + assertThat(pool.totalSize()) + .isEqualTo(conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.pageSize()) + .isEqualTo(conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes()); + assertThat(pool.availableMemory()) + .isEqualTo(conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.freePages()) + .isEqualTo( + conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes() + / conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes()); + assertThat(pool.getAllCachePages().size()).isEqualTo(0); + assertThat(pool.perRequestPages).isEqualTo(1); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 7, 13, 19}) // >= 1 + void testCreateServerBufferPoolWithCustomValuesReturnsCorrectInstance( + int perRequestMemorySizeFactor) { + conf.set(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE, MemorySize.parse("128kb")); + conf.set(ConfigOptions.SERVER_BUFFER_PAGE_SIZE, MemorySize.parse("2kb")); + conf.set( + ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE, + MemorySize.parse(perRequestMemorySizeFactor * 2 + "kb")); + + LazyMemorySegmentPool pool = LazyMemorySegmentPool.createServerBufferPool(conf); + assertThat(pool).isNotNull(); + assertThat(pool.totalSize()) + .isEqualTo(conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.pageSize()) + .isEqualTo(conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes()); + assertThat(pool.availableMemory()) + .isEqualTo(conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes()); + assertThat(pool.freePages()) + .isEqualTo( + conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes() + / conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes()); + assertThat(pool.getAllCachePages().size()).isEqualTo(0); + assertThat(pool.perRequestPages).isEqualTo(perRequestMemorySizeFactor); + } + } + + @Test + void testCloseClearsCachedPages() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 64, 100, 64); + pool.returnPage(pool.nextSegment()); + assertThat(pool.getAllCachePages().size()).isGreaterThan(0); + pool.close(); + assertThat(pool.getAllCachePages()).isEmpty(); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 7, 13, 19}) // >= 1 + void testSimpleAllocateAndReturnPages(int perRequestMemorySizeFactor) throws Exception { + // test the simple non-blocking allocation paths. + int totalMemory = Long.valueOf(MemorySize.parse("64kb").getBytes()).intValue(); + int pageSize = (int) MemorySize.parse("1kb").getBytes(); + int perRequestMemorySize = + Long.valueOf(((long) perRequestMemorySizeFactor * pageSize)).intValue(); + int totalSegmentCount = totalMemory / pageSize; + int expectedPerRequestSegmentCount = + Long.valueOf((((long) perRequestMemorySizeFactor * pageSize) / pageSize)) + .intValue(); + + LazyMemorySegmentPool pool = + buildLazyMemorySegmentSource(totalSegmentCount, pageSize, 10, perRequestMemorySize); + + // allocate one page and check the segment and the memory pool + List maybeSegments = pool.allocatePages(1); + assertThat(maybeSegments.size()).isEqualTo(1); + MemorySegment segment = maybeSegments.get(0); + assertThat(segment.size()).isEqualTo(pageSize); + assertThat(pool.availableMemory()).isEqualTo(totalMemory - pageSize); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount - 1); + segment.putInt(0, 1); + assertThat(segment.getInt(0)).isEqualTo(1); + + // return the page to the memory pool and check the memory pool + pool.returnPage(segment); + assertThat(pool.availableMemory()).isEqualTo(totalMemory); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount); + + // allocate another page and check the segment and the memory pool + maybeSegments = pool.allocatePages(1); + assertThat(maybeSegments.size()).isEqualTo(1); + segment = maybeSegments.get(0); + assertThat(segment.size()).isEqualTo(pageSize); + assertThat(pool.availableMemory()).isEqualTo(totalMemory - pageSize); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount - 1); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount - 1); + + // return the page to the memory pool and check the memory pool + pool.returnPage(segment); + assertThat(pool.availableMemory()).isEqualTo(totalMemory); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount); + assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount); + + // allocate another 2 pages and check the segments and the memory pool + maybeSegments = pool.allocatePages(2); + assertThat(maybeSegments.size()).isEqualTo(2); + List segments = maybeSegments; + assertThat(pool.availableMemory()).isEqualTo(totalMemory - (2L * pageSize)); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount - 2); + assertThat(pool.getAllCachePages().size()) + .isEqualTo(Math.max(expectedPerRequestSegmentCount - 2, 0)); + pool.returnAll(segments); + assertThat(pool.availableMemory()).isEqualTo(totalMemory); + assertThat(pool.freePages()).isEqualTo(totalSegmentCount); + assertThat(pool.getAllCachePages().size()) + .isEqualTo(Math.max(expectedPerRequestSegmentCount, 2)); + } + + @Test + void testClosePreventsFurtherAllocationButAllowsReturningPages() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 512, Long.MAX_VALUE, 512); + List segment = pool.allocatePages(1); + + // Close the memory segment pool. This should prevent any further allocations. + pool.close(); + + assertThatThrownBy(() -> pool.allocatePages(1)).isInstanceOf(FlussRuntimeException.class); + assertThatThrownBy(pool::nextSegment).isInstanceOf(FlussRuntimeException.class); + + // ensure de-allocation still works. + pool.returnAll(segment); + } + + @Test + void testCannotAllocateMorePagesThanAvailable() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(2, 512, 10, 512); + List segment = pool.allocatePages(2); + assertThat(segment.size()).isEqualTo(2); + assertThat(pool.availableMemory()).isEqualTo(0); + assertThat(pool.freePages()).isEqualTo(0); + assertThat(pool.getAllCachePages().size()).isEqualTo(0); + + pool.returnAll(segment); + assertThat(pool.availableMemory()).isEqualTo(1024); + assertThat(pool.freePages()).isEqualTo(2); + assertThat(pool.getAllCachePages().size()).isEqualTo(2); + + assertThatThrownBy(() -> pool.allocatePages(3)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("Available pages: 2. Requested pages: 3"); + } + + @Test + void testDelayedAllocationRequestingMultiplePages() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(5, 1024, Long.MAX_VALUE, 1024); + MemorySegment segment = pool.nextSegment(); + CountDownLatch doDealloc = asyncReturnAll(pool, Collections.singletonList(segment)); + // should block until memory is returned + CountDownLatch allocation = asyncAllocatePages(pool, 5); + assertThat(allocation.getCount()).isEqualTo(1); + // return the memory + doDealloc.countDown(); + // give pool enough time to return memory to make the test stable + assertThat(allocation.await(2, TimeUnit.SECONDS)).isTrue(); + assertThat(pool.availableMemory()).isEqualTo(0); + } + + @Test + void testUnsatisfiableRequestDoesNotBlockAndThrowsException() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 1024, Long.MAX_VALUE, 1024); + assertThatThrownBy(() -> pool.allocatePages(2)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("Available pages: 1. Requested pages: 2"); + assertThat(pool.queued()).isEqualTo(0); + assertThat(pool.freePages()).isEqualTo(1); + } + + @ParameterizedTest + @ValueSource(ints = {10, 100, 500, 1000, 2500}) + void testBlockTimeIsRespected(int maxTimeToBlockMs) throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 1024, maxTimeToBlockMs, 1024); + pool.nextSegment(); + + long start = System.currentTimeMillis(); + assertThatThrownBy( + () -> + // will block for at least maxTimeToBlockMs + pool.allocatePages(1)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("Available pages: 0. Requested pages: 1"); + assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(maxTimeToBlockMs); + + assertThat(pool.queued()).isEqualTo(0); + assertThat(pool.freePages()).isEqualTo(0); + } + + @Test + void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception { + LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(2, 1024, 5000, 1024); + pool.allocatePages(1); + Thread t1 = new Thread(new MemorySegmentPoolAllocator(pool)); + Thread t2 = new Thread(new MemorySegmentPoolAllocator(pool)); + // start thread t1 which will try to allocate more memory on to the memory segment + t1.start(); + // retry until condition variable c1 associated with pool.allocate() by thread t1 inserted + // in the waiters queue. + retry(Duration.ofSeconds(1), () -> assertThat(pool.queued()).isEqualTo(1)); + Deque waiters = pool.waiters; + // get the condition object associated with pool.allocate() by thread t1. + Condition c1 = waiters.getFirst(); + // start thread t2 which will try to allocate more memory on to the memory segment pool. + t2.start(); + // retry until condition variable c2 associated with pool.allocate() by thread t2 inserted + // in the waiters queue. The waiters queue will have 2 entries c1 and c2. + retry(Duration.ofSeconds(1), () -> assertThat(pool.queued()).isEqualTo(2)); + t1.interrupt(); + // retry until queue has only 1 entry + retry(Duration.ofSeconds(1), () -> assertThat(pool.queued()).isEqualTo(1)); + // get the condition object associated with allocate() by thread t2 + Condition c2 = waiters.getLast(); + t2.interrupt(); + assertThat(c1).isNotEqualTo(c2); + t1.join(); + t2.join(); + // both allocate() called by threads t1 and t2 should have been interrupted and the waiters + // queue should be empty. + assertThat(pool.queued()).isEqualTo(0); + } + + @Test + void testOutOfMemoryOnAllocation() { + LazyMemorySegmentPool pool = + new LazyMemorySegmentPool(1, 1024, 10, 1024) { + @Override + protected void lazilyAllocatePages(int required) { + throw new OutOfMemoryError(); + } + }; + + assertThatThrownBy(() -> pool.allocatePages(1)).isInstanceOf(OutOfMemoryError.class); + assertThatThrownBy(pool::nextSegment).isInstanceOf(OutOfMemoryError.class); + + assertThat(pool.availableMemory()).isEqualTo(1024); + } + + @Test + void testCloseNotifyWaiters() throws Exception { + final int numWorkers = 2; + + LazyMemorySegmentPool pool = new LazyMemorySegmentPool(1, 64, Long.MAX_VALUE, 64); + MemorySegment segment = pool.allocatePages(1).get(0); + + ExecutorService executor = Executors.newFixedThreadPool(numWorkers); + Callable work = + () -> { + assertThatThrownBy(() -> pool.allocatePages(1)) + .isInstanceOf(FlussRuntimeException.class); + assertThatThrownBy(pool::nextSegment).isInstanceOf(FlussRuntimeException.class); + return null; + }; + + for (int i = 0; i < numWorkers; ++i) { + executor.submit(work); + } + + retry(Duration.ofSeconds(15), () -> assertThat(pool.queued()).isEqualTo(numWorkers)); + + // Close the buffer pool. This should notify all waiters. + pool.close(); + + retry(Duration.ofSeconds(15), () -> assertThat(pool.queued()).isEqualTo(0)); + + pool.returnPage(segment); + assertThat(pool.availableMemory()).isEqualTo(64); + } + @Test void testNextSegmentWaiter() throws Exception { - LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64); + LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64, 100, 64); assertThat(source.pageSize()).isEqualTo(64); + assertThat(source.availableMemory()).isEqualTo(64 * 10); assertThat(source.freePages()).isEqualTo(10); MemorySegment ms1 = source.nextSegment(); + assertThat(source.availableMemory()).isEqualTo(64 * 9); assertThat(source.freePages()).isEqualTo(9); MemorySegment ms2 = source.nextSegment(); + assertThat(source.availableMemory()).isEqualTo(64 * 8); assertThat(source.freePages()).isEqualTo(8); for (int i = 0; i < 8; i++) { source.nextSegment(); } + assertThat(source.availableMemory()).isEqualTo(0); assertThat(source.freePages()).isEqualTo(0); assertThatThrownBy(source::nextSegment) .isInstanceOf(EOFException.class) .hasMessage( "Failed to allocate new segment within the configured max blocking time 100 ms. " - + "Total memory: 640 bytes. Page size: 64 bytes. Available pages: 0. Request pages: 1"); + + "Total memory: 640 bytes. Page size: 64 bytes. Available pages: 0. Requested pages: 1"); CountDownLatch returnAllLatch = asyncReturnAll(source, Arrays.asList(ms1, ms2)); CountDownLatch getNextSegmentLatch = asyncGetNextSegment(source); @@ -63,21 +425,25 @@ void testNextSegmentWaiter() throws Exception { } @Test - void testIllegalArgument() { - assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64)) + void testConstructorIllegalArgument() { + assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64, 100, 64)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("MaxPages for LazyMemorySegmentPool should be greater than 0."); - assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 32 * 1024 * 1024)) + assertThatThrownBy( + () -> + buildLazyMemorySegmentSource( + 10, 32 * 1024 * 1024, 100, (32 * 1024 * 1024) / 2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Page size should be less than PER_REQUEST_MEMORY_SIZE. " - + "Page size is: 32768 KB, PER_REQUEST_MEMORY_SIZE is 16384 KB."); - assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30)) + "Page size should be less than or equal to per request memory size. " + + "Page size is: 32768 KB, per request memory size is 16384 KB."); + assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30, 100, 30)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Page size should be greater than 64 bytes to include the record batch header, but is 30 bytes."); - LazyMemorySegmentPool lazyMemorySegmentPool = buildLazyMemorySegmentSource(10, 100); + LazyMemorySegmentPool lazyMemorySegmentPool = + buildLazyMemorySegmentSource(10, 100, 100, 100); assertThatThrownBy( () -> lazyMemorySegmentPool.returnAll( @@ -87,11 +453,28 @@ void testIllegalArgument() { .hasMessage("Return too more memories."); } - private LazyMemorySegmentPool buildLazyMemorySegmentSource(int maxPages, int pageSize) { - return new LazyMemorySegmentPool(maxPages, pageSize, 100); + private static LazyMemorySegmentPool buildLazyMemorySegmentSource( + int maxPages, int pageSize, long maxTimeToBlockMs, int perRequestMemorySize) { + return new LazyMemorySegmentPool( + maxPages, pageSize, maxTimeToBlockMs, perRequestMemorySize); + } + + private static class MemorySegmentPoolAllocator implements Runnable { + LazyMemorySegmentPool pool; + long maxBlockTimeMs; + + MemorySegmentPoolAllocator(LazyMemorySegmentPool pool) { + this.pool = pool; + } + + @Override + public void run() { + assertThatThrownBy(() -> pool.allocatePages(2)) + .isInstanceOfAny(FlussRuntimeException.class, InterruptedException.class); + } } - private CountDownLatch asyncReturnAll( + private static CountDownLatch asyncReturnAll( LazyMemorySegmentPool source, List segments) { CountDownLatch latch = new CountDownLatch(1); Thread thread = @@ -105,7 +488,7 @@ private CountDownLatch asyncReturnAll( return latch; } - private CountDownLatch asyncGetNextSegment(LazyMemorySegmentPool source) { + private static CountDownLatch asyncGetNextSegment(LazyMemorySegmentPool source) { final CountDownLatch completed = new CountDownLatch(1); Thread thread = new Thread( @@ -123,4 +506,24 @@ private CountDownLatch asyncGetNextSegment(LazyMemorySegmentPool source) { thread.start(); return completed; } + + private static CountDownLatch asyncAllocatePages( + LazyMemorySegmentPool source, int requiredPages) { + final CountDownLatch completed = new CountDownLatch(1); + Thread thread = + new Thread( + () -> { + try { + try { + source.allocatePages(requiredPages); + } catch (IOException e) { + throw new RuntimeException(e); + } + } finally { + completed.countDown(); + } + }); + thread.start(); + return completed; + } }