Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][test] Harmonization of LazyMemorySegmentPoolTest and WriterMemoryBufferTest #324

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.alibaba.fluss.client.write;

import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.exception.BufferExhaustedException;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.MemorySegment;
Expand All @@ -29,16 +28,13 @@
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;

import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
import static com.alibaba.fluss.utils.function.ThrowingRunnable.unchecked;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -47,70 +43,6 @@
public class WriterMemoryBufferTest {
private final long maxBlockTimeMs = 10;

@Test
void testSimple() throws Exception {
// test the simple non-blocking allocation paths.
long totalMemory = MemorySize.parse("64kb").getBytes();
int batchSize = (int) MemorySize.parse("1kb").getBytes();

WriterMemoryBuffer writerMemoryBuffer = new WriterMemoryBuffer(totalMemory, batchSize);
MemorySegment segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs);
assertThat(segment.size()).isEqualTo(batchSize);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory - batchSize);
segment.putInt(0, 1);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);

segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);

segment = writerMemoryBuffer.allocate(2 * batchSize, maxBlockTimeMs);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);
}

@Test
void testCanAllocateMoreMemoryThanHave() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(1024, 512);
MemorySegment segment = pool.allocate(1024, maxBlockTimeMs);
assertThat(segment.size()).isEqualTo(1024);
pool.deallocate(segment);
assertThatThrownBy(() -> pool.allocate(1025, maxBlockTimeMs))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Attempt to allocate memory segment size");
}

@Test
void testDelayedAllocation() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(5 * 1024, 1024);
MemorySegment segment = pool.allocate(1024, maxBlockTimeMs);
CountDownLatch doDealloc = asyncDeallocate(pool, segment);
CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
assertThat(allocation.getCount()).isEqualTo(1);
doDealloc.countDown(); // return the memory.
assertThat(allocation.await(1, TimeUnit.SECONDS)).isTrue();
}

@Test
void testBufferExhaustedExceptionIsThrown() throws Exception {
// If there is not enough memory to allocate and the elapsed time is greater than the max
// specified block time.
WriterMemoryBuffer pool = new WriterMemoryBuffer(2, 1);
pool.allocate(1, maxBlockTimeMs);
assertThatThrownBy(() -> pool.allocate(2, maxBlockTimeMs))
.isInstanceOf(BufferExhaustedException.class)
.hasMessageContaining(
"Failed to allocate 2 bytes within the configured max blocking time 10 ms."
+ " Total memory: 2 bytes. Available memory: 1 bytes. page size: 1 bytes");
assertThat(pool.queued()).isEqualTo(0);
assertThat(pool.getAvailableMemory()).isEqualTo(1);
}

@Test
void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(2, 1);
Expand Down Expand Up @@ -223,21 +155,6 @@ protected MemorySegment allocateMemorySegment(int size) {
assertThat(pool.getAvailableMemory()).isEqualTo(1024);
}

@Test
void testCloseAllocations() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(10, 1);
MemorySegment segment = pool.allocate(1, Long.MAX_VALUE);

// Close the memory segment pool. This should prevent any further allocations.
pool.close();

assertThatThrownBy(() -> pool.allocate(1, maxBlockTimeMs))
.isInstanceOf(FlussRuntimeException.class);

// ensure de-allocation still works.
pool.deallocate(segment);
}

@Test
void testCloseNotifyWaiters() throws Exception {
final int numWorkers = 2;
Expand Down Expand Up @@ -316,35 +233,4 @@ public void run() {
}
}
}

private CountDownLatch asyncDeallocate(
final WriterMemoryBuffer pool, final MemorySegment segment) {
final CountDownLatch latch = new CountDownLatch(1);
Thread thread =
new Thread(
unchecked(
() -> {
latch.await();
pool.deallocate(segment);
}));
thread.start();
return latch;
}

private CountDownLatch asyncAllocate(final WriterMemoryBuffer pool, final int size) {
final CountDownLatch completed = new CountDownLatch(1);
Thread thread =
new Thread(
() -> {
try {
pool.allocate(size, maxBlockTimeMs);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completed.countDown();
}
});
thread.start();
return completed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ public class ConfigOptions {
+ SERVER_BUFFER_MEMORY_SIZE.key()
+ "').");

// TODO: #323/#324
// noDefaultValue() leads to NPE in some code parts, hence we default to 0b
public static final ConfigOption<MemorySize> SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE =
key("server.buffer.per-request-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("0b"))
.withDescription("");

// ------------------------------------------------------------------
// ZooKeeper Settings
// ------------------------------------------------------------------
Expand Down Expand Up @@ -620,6 +628,14 @@ public class ConfigOptions {
"The writer or walBuilder will attempt to batch records together into one batch for"
+ " the same bucket. This helps performance on both the client and the server.");

// TODO: #323/#324
// noDefaultValue() leads to NPE in some code parts, hence we default to 0b
public static final ConfigOption<MemorySize> CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE =
key("client.writer.per-request-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("0b"))
.withDescription("");

@Deprecated
public static final ConfigOption<MemorySize> CLIENT_WRITER_LEGACY_BATCH_SIZE =
key("client.writer.legacy.batch-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.exception.FlussRuntimeException;

import javax.annotation.concurrent.GuardedBy;
Expand All @@ -45,7 +46,6 @@
@ThreadSafe
public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {

private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_WAIT_TIMEOUT_MS = Long.MAX_VALUE;

/** The lock to guard the memory pool. */
Expand All @@ -54,37 +54,46 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
@GuardedBy("lock")
private final List<MemorySegment> cachePages;

@VisibleForTesting
@GuardedBy("lock")
private final Deque<Condition> waiters;
final Deque<Condition> waiters;

private final int pageSize;
private final int maxPages;
private final int perRequestPages;
@VisibleForTesting final int perRequestPages;
private final long maxTimeToBlockMs;

@GuardedBy("lock")
private boolean closed;

private int pageUsage;

LazyMemorySegmentPool(int maxPages, int pageSize, long maxTimeToBlockMs) {
@VisibleForTesting
LazyMemorySegmentPool(
int maxPages, int pageSize, long maxTimeToBlockMs, long perRequestMemorySize) {
checkArgument(maxPages > 0, "MaxPages for LazyMemorySegmentPool should be greater than 0.");
checkArgument(
pageSize >= 64,
"Page size should be greater than 64 bytes to include the record batch header, but is "
+ pageSize
+ " bytes.");
checkArgument(
PER_REQUEST_MEMORY_SIZE > pageSize,
perRequestMemorySize >= pageSize,
String.format(
"Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:"
+ " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.",
pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024));
"Page size should be less than or equal to per request memory size. Page size is:"
+ " %s KB, per request memory size is %s KB.",
pageSize / 1024, perRequestMemorySize / 1024));
checkArgument(
perRequestMemorySize <= ((long) pageSize * maxPages),
String.format(
"Per request memory size must be less than or equal to total memory (page size * max pages). Per request memory size is:"
+ " %s KB, total memory is %s KB.",
perRequestMemorySize / 1024, ((long) pageSize * maxPages) / 1024));
this.cachePages = new ArrayList<>();
this.pageUsage = 0;
this.maxPages = maxPages;
this.pageSize = pageSize;
this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / pageSize()));
this.perRequestPages = Math.max(1, (int) (perRequestMemorySize / pageSize()));

this.closed = false;
this.waiters = new ArrayDeque<>();
Expand All @@ -102,34 +111,33 @@ public static LazyMemorySegmentPool createWriterBufferPool(Configuration conf) {
totalBytes,
ConfigOptions.CLIENT_WRITER_BATCH_SIZE.key(),
batchSize));

int pageSize = (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes();
long perRequestMemorySize =
conf.getOptional(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE)
.filter(s -> s.getBytes() > 0)
.map(MemorySize::getBytes)
.orElse((long) pageSize);
int segmentCount = (int) (totalBytes / pageSize);
return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS);
return new LazyMemorySegmentPool(
segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, perRequestMemorySize);
}

public static LazyMemorySegmentPool createServerBufferPool(Configuration conf) {
long totalBytes = conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes();
int pageSize = (int) conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes();
long perRequestMemorySize =
conf.getOptional(ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE)
.filter(s -> s.getBytes() > 0)
.map(MemorySize::getBytes)
.orElse((long) pageSize);
int segmentCount = (int) (totalBytes / pageSize);
return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS);
return new LazyMemorySegmentPool(
segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, perRequestMemorySize);
}

@Override
public MemorySegment nextSegment() throws IOException {
return inLock(
lock,
() -> {
checkClosed();
if (freePages() == 0) {
waitForSegment(1);
}

lazilyAllocatePages(freePages());

this.pageUsage++;
return cachePages.remove(this.cachePages.size() - 1);
});
return inLock(lock, () -> allocatePages(1).get(0));
}

@Override
Expand All @@ -138,8 +146,9 @@ public List<MemorySegment> allocatePages(int requiredPages) throws IOException {
lock,
() -> {
checkClosed();

if (freePages() < requiredPages) {
waitForSegment(requiredPages);
waitForSegmentOrElseThrow(requiredPages);
}

lazilyAllocatePages(requiredPages);
Expand All @@ -156,16 +165,25 @@ private List<MemorySegment> drain(int numPages) {
return pages;
}

private void lazilyAllocatePages(int required) {
if (cachePages.isEmpty()) {
int numPages = Math.min(required, perRequestPages);
@VisibleForTesting
protected void lazilyAllocatePages(int required) {
if (cachePages.size() < required) {
int numPages = Math.max(required - cachePages.size(), perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}
}

private void waitForSegment(int requiredPages) throws EOFException {
private void waitForSegmentOrElseThrow(int requiredPages) throws EOFException {
if (maxPages < requiredPages) { // immediately fail if the request is impossible to satisfy
throw new EOFException(
String.format(
"Allocation request cannot be satisfied because the number of maximum available pages is "
+ "exceeded. Available pages: %d. Requested pages: %d",
this.maxPages, requiredPages));
}

Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
Expand All @@ -181,7 +199,7 @@ private void waitForSegment(int requiredPages) throws EOFException {
+ pageSize
+ " bytes. Available pages: "
+ freePages()
+ ". Request pages: "
+ ". Requested pages: "
+ requiredPages);
}
checkClosed();
Expand All @@ -199,6 +217,7 @@ public int pageSize() {
return pageSize;
}

@Override
public int totalSize() {
return maxPages * pageSize;
}
Expand All @@ -216,10 +235,11 @@ public void returnAll(List<MemorySegment> memory) {
inLock(
lock,
() -> {
pageUsage -= memory.size();
if (this.pageUsage < 0) {
final int newPageUsage = pageUsage - memory.size();
if (newPageUsage < 0) {
throw new RuntimeException("Return too more memories.");
}
pageUsage = newPageUsage;
cachePages.addAll(memory);
for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) {
waiters.pollFirst().signal();
Expand All @@ -237,6 +257,7 @@ public long availableMemory() {
return ((long) freePages()) * pageSize;
}

@Override
public void close() {
inLock(
lock,
Expand Down
Loading
Loading