Skip to content

Commit

Permalink
Harmonization [commit will be amended!]
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelkoepf committed Jan 14, 2025
1 parent d0b466f commit 6940299
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.alibaba.fluss.client.write;

import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.exception.BufferExhaustedException;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.MemorySegment;
Expand Down Expand Up @@ -47,44 +46,6 @@
public class WriterMemoryBufferTest {
private final long maxBlockTimeMs = 10;

@Test
void testSimple() throws Exception {
// test the simple non-blocking allocation paths.
long totalMemory = MemorySize.parse("64kb").getBytes();
int batchSize = (int) MemorySize.parse("1kb").getBytes();

WriterMemoryBuffer writerMemoryBuffer = new WriterMemoryBuffer(totalMemory, batchSize);
MemorySegment segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs);
assertThat(segment.size()).isEqualTo(batchSize);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory - batchSize);
segment.putInt(0, 1);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);

segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);

segment = writerMemoryBuffer.allocate(2 * batchSize, maxBlockTimeMs);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);
}

@Test
void testCanAllocateMoreMemoryThanHave() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(1024, 512);
MemorySegment segment = pool.allocate(1024, maxBlockTimeMs);
assertThat(segment.size()).isEqualTo(1024);
pool.deallocate(segment);
assertThatThrownBy(() -> pool.allocate(1025, maxBlockTimeMs))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Attempt to allocate memory segment size");
}

@Test
void testDelayedAllocation() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(5 * 1024, 1024);
Expand Down Expand Up @@ -223,21 +184,6 @@ protected MemorySegment allocateMemorySegment(int size) {
assertThat(pool.getAvailableMemory()).isEqualTo(1024);
}

@Test
void testCloseAllocations() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(10, 1);
MemorySegment segment = pool.allocate(1, Long.MAX_VALUE);

// Close the memory segment pool. This should prevent any further allocations.
pool.close();

assertThatThrownBy(() -> pool.allocate(1, maxBlockTimeMs))
.isInstanceOf(FlussRuntimeException.class);

// ensure de-allocation still works.
pool.deallocate(segment);
}

@Test
void testCloseNotifyWaiters() throws Exception {
final int numWorkers = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
Expand All @@ -45,7 +44,7 @@
@ThreadSafe
public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {

private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_WAIT_TIMEOUT_MS = Long.MAX_VALUE;

/** The lock to guard the memory pool. */
Expand All @@ -67,24 +66,32 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {

private int pageUsage;

LazyMemorySegmentPool(int maxPages, int pageSize, long maxTimeToBlockMs) {
@VisibleForTesting
LazyMemorySegmentPool(
int maxPages, int pageSize, long maxTimeToBlockMs, long perRequestMemorySize) {
checkArgument(maxPages > 0, "MaxPages for LazyMemorySegmentPool should be greater than 0.");
checkArgument(
pageSize >= 64,
"Page size should be greater than 64 bytes to include the record batch header, but is "
+ pageSize
+ " bytes.");
checkArgument(
PER_REQUEST_MEMORY_SIZE > pageSize,
perRequestMemorySize >= pageSize,
String.format(
"Page size should be less than or equal to per request memory size. Page size is:"
+ " %s KB, per request memory size is %s KB.",
pageSize / 1024, perRequestMemorySize / 1024));
checkArgument(
perRequestMemorySize <= ((long) pageSize * maxPages),
String.format(
"Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:"
+ " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.",
pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024));
"Per request memory size must be less than or equal to total memory (page size * max pages). Per request memory size is:"
+ " %s KB, total memory is %s KB.",
perRequestMemorySize / 1024, ((long) pageSize * maxPages) / 1024));
this.cachePages = new ArrayList<>();
this.pageUsage = 0;
this.maxPages = maxPages;
this.pageSize = pageSize;
this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / pageSize()));
this.perRequestPages = Math.max(1, (int) (perRequestMemorySize / pageSize()));

this.closed = false;
this.waiters = new ArrayDeque<>();
Expand All @@ -105,14 +112,16 @@ public static LazyMemorySegmentPool createWriterBufferPool(Configuration conf) {

int pageSize = (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes();
int segmentCount = (int) (totalBytes / pageSize);
return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS);
return new LazyMemorySegmentPool(
segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, DEFAULT_PER_REQUEST_MEMORY_SIZE);
}

public static LazyMemorySegmentPool createServerBufferPool(Configuration conf) {
long totalBytes = conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes();
int pageSize = (int) conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes();
int segmentCount = (int) (totalBytes / pageSize);
return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS);
return new LazyMemorySegmentPool(
segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, DEFAULT_PER_REQUEST_MEMORY_SIZE);
}

@Override
Expand All @@ -121,8 +130,9 @@ public MemorySegment nextSegment() throws IOException {
lock,
() -> {
checkClosed();

if (freePages() == 0) {
waitForSegment(1);
waitForSegmentIfSatisfiableOrElseThrow(1);
}

lazilyAllocatePages(freePages());
Expand All @@ -138,8 +148,9 @@ public List<MemorySegment> allocatePages(int requiredPages) throws IOException {
lock,
() -> {
checkClosed();

if (freePages() < requiredPages) {
waitForSegment(requiredPages);
waitForSegmentIfSatisfiableOrElseThrow(requiredPages);
}

lazilyAllocatePages(requiredPages);
Expand All @@ -157,15 +168,23 @@ private List<MemorySegment> drain(int numPages) {
}

private void lazilyAllocatePages(int required) {
if (cachePages.isEmpty()) {
int numPages = Math.min(required, perRequestPages);
if (cachePages.size() < required) {
int numPages = Math.max(required - cachePages.size(), perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}
}

private void waitForSegment(int requiredPages) throws EOFException {
private void waitForSegmentIfSatisfiableOrElseThrow(int requiredPages) throws EOFException {
if (maxPages < requiredPages) { // do not wait if the request is impossible to satisfy
throw new EOFException(
String.format(
"Allocation request cannot be satisfied because the number of maximum available pages is "
+ "exceeded. Available pages: %d. Request pages: %d",
this.maxPages, requiredPages));
}

Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package com.alibaba.fluss.memory;

import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.exception.FlussRuntimeException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -32,21 +37,130 @@
/** Test for {@link com.alibaba.fluss.memory.LazyMemorySegmentPool}. */
public class LazyMemorySegmentPoolTest {

@ParameterizedTest
@ValueSource(ints = {1, 2, 7, 13, 19}) // >= 1
void testSimpleAllocationAndReturn(int perRequestMemorySizeFactor) throws Exception {
// test the simple non-blocking allocation paths.
int totalMemory = Long.valueOf(MemorySize.parse("64kb").getBytes()).intValue();
int batchSize = (int) MemorySize.parse("1kb").getBytes();
int perRequestMemorySize =
Long.valueOf(((long) perRequestMemorySizeFactor * batchSize)).intValue();
int totalSegmentCount = totalMemory / batchSize;
int expectedPerRequestSegmentCount =
Long.valueOf((((long) perRequestMemorySizeFactor * batchSize) / batchSize))
.intValue();

LazyMemorySegmentPool pool =
buildLazyMemorySegmentSource(
totalSegmentCount, batchSize, 10, perRequestMemorySize);

// allocate one page and check the segment and the memory pool
List<MemorySegment> maybeSegments = pool.allocatePages(1);
assertThat(maybeSegments.size()).isEqualTo(1);
MemorySegment segment = maybeSegments.get(0);
assertThat(segment.size()).isEqualTo(batchSize);
assertThat(pool.availableMemory()).isEqualTo(totalMemory - batchSize);
assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount - 1);
segment.putInt(0, 1);
assertThat(segment.getInt(0)).isEqualTo(1);

// return the page to the memory pool and check the memory pool
pool.returnPage(segment);
assertThat(pool.availableMemory()).isEqualTo(totalMemory);
assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount);

// allocate another page and check the segment and the memory pool
maybeSegments = pool.allocatePages(1);
assertThat(maybeSegments.size()).isEqualTo(1);
segment = maybeSegments.get(0);
assertThat(segment.size()).isEqualTo(batchSize);
assertThat(pool.availableMemory()).isEqualTo(totalMemory - batchSize);
assertThat(pool.freePages()).isEqualTo(totalSegmentCount - 1);
assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount - 1);

// return the page to the memory pool and check the memory pool
pool.returnPage(segment);
assertThat(pool.availableMemory()).isEqualTo(totalMemory);
assertThat(pool.freePages()).isEqualTo(totalSegmentCount);
assertThat(pool.getAllCachePages().size()).isEqualTo(expectedPerRequestSegmentCount);

// allocate another 2 pages and check the segments and the memory pool
maybeSegments = pool.allocatePages(2);
assertThat(maybeSegments.size()).isEqualTo(2);
List<MemorySegment> segments = maybeSegments;
assertThat(pool.availableMemory()).isEqualTo(totalMemory - (2L * batchSize));
assertThat(pool.freePages()).isEqualTo(totalSegmentCount - 2);
assertThat(pool.getAllCachePages().size())
.isEqualTo(Math.max(expectedPerRequestSegmentCount - 2, 0));
pool.returnAll(segments);
assertThat(pool.availableMemory()).isEqualTo(totalMemory);
assertThat(pool.freePages()).isEqualTo(totalSegmentCount);
assertThat(pool.getAllCachePages().size())
.isEqualTo(Math.max(expectedPerRequestSegmentCount, 2));
}

@Test
void testCannotAllocateMorePagesThanAvailable() throws Exception {
LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(2, 512, 10, 512);
List<MemorySegment> segment = pool.allocatePages(2);
assertThat(segment.size()).isEqualTo(2);
assertThat(pool.availableMemory()).isEqualTo(0);
assertThat(pool.freePages()).isEqualTo(0);
assertThat(pool.getAllCachePages().size()).isEqualTo(0);

pool.returnAll(segment);
assertThat(pool.availableMemory()).isEqualTo(1024);
assertThat(pool.freePages()).isEqualTo(2);
assertThat(pool.getAllCachePages().size()).isEqualTo(2);

assertThatThrownBy(() -> pool.allocatePages(3))
.isInstanceOf(EOFException.class)
.hasMessageContaining("Available pages: 2. Request pages: 3");
}

@Test
void testCloseClearsCachedPages() throws Exception {
LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 64, 100, 64);
pool.returnPage(pool.nextSegment());
assertThat(pool.getAllCachePages().size()).isGreaterThan(0);
pool.close();
assertThat(pool.getAllCachePages()).isEmpty();
}

@Test
void testClosePreventsFurtherAllocationButAllowsReturningSegments() throws Exception {
LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 512, Long.MAX_VALUE, 512);
List<MemorySegment> segment = pool.allocatePages(1);

// Close the memory segment pool. This should prevent any further allocations.
pool.close();

assertThatThrownBy(() -> pool.allocatePages(1)).isInstanceOf(FlussRuntimeException.class);
assertThatThrownBy(pool::nextSegment).isInstanceOf(FlussRuntimeException.class);

// ensure de-allocation still works.
pool.returnAll(segment);
}

@Test
void testNextSegmentWaiter() throws Exception {
LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64);
LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64, 100, 64);
assertThat(source.pageSize()).isEqualTo(64);
assertThat(source.availableMemory()).isEqualTo(64 * 10);
assertThat(source.freePages()).isEqualTo(10);

MemorySegment ms1 = source.nextSegment();
assertThat(source.availableMemory()).isEqualTo(64 * 9);
assertThat(source.freePages()).isEqualTo(9);

MemorySegment ms2 = source.nextSegment();
assertThat(source.availableMemory()).isEqualTo(64 * 8);
assertThat(source.freePages()).isEqualTo(8);

for (int i = 0; i < 8; i++) {
source.nextSegment();
}
assertThat(source.availableMemory()).isEqualTo(0);
assertThat(source.freePages()).isEqualTo(0);

assertThatThrownBy(source::nextSegment)
Expand All @@ -64,20 +178,24 @@ void testNextSegmentWaiter() throws Exception {

@Test
void testIllegalArgument() {
assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64))
assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64, 100, 64))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("MaxPages for LazyMemorySegmentPool should be greater than 0.");
assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 32 * 1024 * 1024))
assertThatThrownBy(
() ->
buildLazyMemorySegmentSource(
10, 32 * 1024 * 1024, 100, (32 * 1024 * 1024) / 2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Page size should be less than PER_REQUEST_MEMORY_SIZE. "
+ "Page size is: 32768 KB, PER_REQUEST_MEMORY_SIZE is 16384 KB.");
assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30))
"Page size should be less than or equal to per request memory size. "
+ "Page size is: 32768 KB, per request memory size is 16384 KB.");
assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30, 100, 30))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Page size should be greater than 64 bytes to include the record batch header, but is 30 bytes.");

LazyMemorySegmentPool lazyMemorySegmentPool = buildLazyMemorySegmentSource(10, 100);
LazyMemorySegmentPool lazyMemorySegmentPool =
buildLazyMemorySegmentSource(10, 100, 100, 100);
assertThatThrownBy(
() ->
lazyMemorySegmentPool.returnAll(
Expand All @@ -87,8 +205,10 @@ void testIllegalArgument() {
.hasMessage("Return too more memories.");
}

private LazyMemorySegmentPool buildLazyMemorySegmentSource(int maxPages, int pageSize) {
return new LazyMemorySegmentPool(maxPages, pageSize, 100);
private LazyMemorySegmentPool buildLazyMemorySegmentSource(
int maxPages, int pageSize, long maxTimeToBlockMs, int perRequestMemorySize) {
return new LazyMemorySegmentPool(
maxPages, pageSize, maxTimeToBlockMs, perRequestMemorySize);
}

private CountDownLatch asyncReturnAll(
Expand Down

0 comments on commit 6940299

Please sign in to comment.