Skip to content

Commit

Permalink
Harmonization [commit will be amended!]
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelkoepf committed Feb 7, 2025
1 parent 4facea3 commit 0179de1
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.alibaba.fluss.client.write;

import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.exception.BufferExhaustedException;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.MemorySegment;
Expand All @@ -29,16 +28,13 @@
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;

import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
import static com.alibaba.fluss.utils.function.ThrowingRunnable.unchecked;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -47,70 +43,6 @@
public class WriterMemoryBufferTest {
private final long maxBlockTimeMs = 10;

@Test
void testSimple() throws Exception {
// test the simple non-blocking allocation paths.
long totalMemory = MemorySize.parse("64kb").getBytes();
int batchSize = (int) MemorySize.parse("1kb").getBytes();

WriterMemoryBuffer writerMemoryBuffer = new WriterMemoryBuffer(totalMemory, batchSize);
MemorySegment segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs);
assertThat(segment.size()).isEqualTo(batchSize);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory - batchSize);
segment.putInt(0, 1);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);

segment = writerMemoryBuffer.allocate(batchSize, maxBlockTimeMs);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);

segment = writerMemoryBuffer.allocate(2 * batchSize, maxBlockTimeMs);
writerMemoryBuffer.deallocate(segment);
assertThat(writerMemoryBuffer.getAvailableMemory()).isEqualTo(totalMemory);
assertThat(writerMemoryBuffer.getUnallocatedMemory()).isEqualTo(totalMemory - batchSize);
}

@Test
void testCanAllocateMoreMemoryThanHave() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(1024, 512);
MemorySegment segment = pool.allocate(1024, maxBlockTimeMs);
assertThat(segment.size()).isEqualTo(1024);
pool.deallocate(segment);
assertThatThrownBy(() -> pool.allocate(1025, maxBlockTimeMs))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Attempt to allocate memory segment size");
}

@Test
void testDelayedAllocation() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(5 * 1024, 1024);
MemorySegment segment = pool.allocate(1024, maxBlockTimeMs);
CountDownLatch doDealloc = asyncDeallocate(pool, segment);
CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
assertThat(allocation.getCount()).isEqualTo(1);
doDealloc.countDown(); // return the memory.
assertThat(allocation.await(1, TimeUnit.SECONDS)).isTrue();
}

@Test
void testBufferExhaustedExceptionIsThrown() throws Exception {
// If there is not enough memory to allocate and the elapsed time is greater than the max
// specified block time.
WriterMemoryBuffer pool = new WriterMemoryBuffer(2, 1);
pool.allocate(1, maxBlockTimeMs);
assertThatThrownBy(() -> pool.allocate(2, maxBlockTimeMs))
.isInstanceOf(BufferExhaustedException.class)
.hasMessageContaining(
"Failed to allocate 2 bytes within the configured max blocking time 10 ms."
+ " Total memory: 2 bytes. Available memory: 1 bytes. page size: 1 bytes");
assertThat(pool.queued()).isEqualTo(0);
assertThat(pool.getAvailableMemory()).isEqualTo(1);
}

@Test
void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(2, 1);
Expand Down Expand Up @@ -223,21 +155,6 @@ protected MemorySegment allocateMemorySegment(int size) {
assertThat(pool.getAvailableMemory()).isEqualTo(1024);
}

@Test
void testCloseAllocations() throws Exception {
WriterMemoryBuffer pool = new WriterMemoryBuffer(10, 1);
MemorySegment segment = pool.allocate(1, Long.MAX_VALUE);

// Close the memory segment pool. This should prevent any further allocations.
pool.close();

assertThatThrownBy(() -> pool.allocate(1, maxBlockTimeMs))
.isInstanceOf(FlussRuntimeException.class);

// ensure de-allocation still works.
pool.deallocate(segment);
}

@Test
void testCloseNotifyWaiters() throws Exception {
final int numWorkers = 2;
Expand Down Expand Up @@ -316,35 +233,4 @@ public void run() {
}
}
}

private CountDownLatch asyncDeallocate(
final WriterMemoryBuffer pool, final MemorySegment segment) {
final CountDownLatch latch = new CountDownLatch(1);
Thread thread =
new Thread(
unchecked(
() -> {
latch.await();
pool.deallocate(segment);
}));
thread.start();
return latch;
}

private CountDownLatch asyncAllocate(final WriterMemoryBuffer pool, final int size) {
final CountDownLatch completed = new CountDownLatch(1);
Thread thread =
new Thread(
() -> {
try {
pool.allocate(size, maxBlockTimeMs);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completed.countDown();
}
});
thread.start();
return completed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ public class ConfigOptions {
+ SERVER_BUFFER_MEMORY_SIZE.key()
+ "').");

// TODO: #323/#324
// noDefaultValue() leads to NPE in some code parts, hence we default to 0b
public static final ConfigOption<MemorySize> SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE =
key("server.buffer.per-request-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("0b"))
.withDescription("");

// ------------------------------------------------------------------
// ZooKeeper Settings
// ------------------------------------------------------------------
Expand Down Expand Up @@ -620,6 +628,14 @@ public class ConfigOptions {
"The writer or walBuilder will attempt to batch records together into one batch for"
+ " the same bucket. This helps performance on both the client and the server.");

// TODO: #323/#324
// noDefaultValue() leads to NPE in some code parts, hence we default to 0b
public static final ConfigOption<MemorySize> CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE =
key("client.writer.per-request-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("0b"))
.withDescription("");

@Deprecated
public static final ConfigOption<MemorySize> CLIENT_WRITER_LEGACY_BATCH_SIZE =
key("client.writer.legacy.batch-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.exception.FlussRuntimeException;

import javax.annotation.concurrent.GuardedBy;
Expand All @@ -45,7 +46,6 @@
@ThreadSafe
public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {

private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_WAIT_TIMEOUT_MS = Long.MAX_VALUE;

/** The lock to guard the memory pool. */
Expand All @@ -54,37 +54,46 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
@GuardedBy("lock")
private final List<MemorySegment> cachePages;

@VisibleForTesting
@GuardedBy("lock")
private final Deque<Condition> waiters;
final Deque<Condition> waiters;

private final int pageSize;
private final int maxPages;
private final int perRequestPages;
@VisibleForTesting final int perRequestPages;
private final long maxTimeToBlockMs;

@GuardedBy("lock")
private boolean closed;

private int pageUsage;

LazyMemorySegmentPool(int maxPages, int pageSize, long maxTimeToBlockMs) {
@VisibleForTesting
LazyMemorySegmentPool(
int maxPages, int pageSize, long maxTimeToBlockMs, long perRequestMemorySize) {
checkArgument(maxPages > 0, "MaxPages for LazyMemorySegmentPool should be greater than 0.");
checkArgument(
pageSize >= 64,
"Page size should be greater than 64 bytes to include the record batch header, but is "
+ pageSize
+ " bytes.");
checkArgument(
PER_REQUEST_MEMORY_SIZE > pageSize,
perRequestMemorySize >= pageSize,
String.format(
"Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:"
+ " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.",
pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024));
"Page size should be less than or equal to per request memory size. Page size is:"
+ " %s KB, per request memory size is %s KB.",
pageSize / 1024, perRequestMemorySize / 1024));
checkArgument(
perRequestMemorySize <= ((long) pageSize * maxPages),
String.format(
"Per request memory size must be less than or equal to total memory (page size * max pages). Per request memory size is:"
+ " %s KB, total memory is %s KB.",
perRequestMemorySize / 1024, ((long) pageSize * maxPages) / 1024));
this.cachePages = new ArrayList<>();
this.pageUsage = 0;
this.maxPages = maxPages;
this.pageSize = pageSize;
this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / pageSize()));
this.perRequestPages = Math.max(1, (int) (perRequestMemorySize / pageSize()));

this.closed = false;
this.waiters = new ArrayDeque<>();
Expand All @@ -102,34 +111,33 @@ public static LazyMemorySegmentPool createWriterBufferPool(Configuration conf) {
totalBytes,
ConfigOptions.CLIENT_WRITER_BATCH_SIZE.key(),
batchSize));

int pageSize = (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes();
long perRequestMemorySize =
conf.getOptional(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE)
.filter(s -> s.getBytes() > 0)
.map(MemorySize::getBytes)
.orElse((long) pageSize);
int segmentCount = (int) (totalBytes / pageSize);
return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS);
return new LazyMemorySegmentPool(
segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, perRequestMemorySize);
}

public static LazyMemorySegmentPool createServerBufferPool(Configuration conf) {
long totalBytes = conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes();
int pageSize = (int) conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes();
long perRequestMemorySize =
conf.getOptional(ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE)
.filter(s -> s.getBytes() > 0)
.map(MemorySize::getBytes)
.orElse((long) pageSize);
int segmentCount = (int) (totalBytes / pageSize);
return new LazyMemorySegmentPool(segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS);
return new LazyMemorySegmentPool(
segmentCount, pageSize, DEFAULT_WAIT_TIMEOUT_MS, perRequestMemorySize);
}

@Override
public MemorySegment nextSegment() throws IOException {
return inLock(
lock,
() -> {
checkClosed();
if (freePages() == 0) {
waitForSegment(1);
}

lazilyAllocatePages(freePages());

this.pageUsage++;
return cachePages.remove(this.cachePages.size() - 1);
});
return inLock(lock, () -> allocatePages(1).get(0));
}

@Override
Expand All @@ -138,8 +146,9 @@ public List<MemorySegment> allocatePages(int requiredPages) throws IOException {
lock,
() -> {
checkClosed();

if (freePages() < requiredPages) {
waitForSegment(requiredPages);
waitForSegmentOrElseThrow(requiredPages);
}

lazilyAllocatePages(requiredPages);
Expand All @@ -156,16 +165,25 @@ private List<MemorySegment> drain(int numPages) {
return pages;
}

private void lazilyAllocatePages(int required) {
if (cachePages.isEmpty()) {
int numPages = Math.min(required, perRequestPages);
@VisibleForTesting
protected void lazilyAllocatePages(int required) {
if (cachePages.size() < required) {
int numPages = Math.max(required - cachePages.size(), perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}
}

private void waitForSegment(int requiredPages) throws EOFException {
private void waitForSegmentOrElseThrow(int requiredPages) throws EOFException {
if (maxPages < requiredPages) { // immediately fail if the request is impossible to satisfy
throw new EOFException(
String.format(
"Allocation request cannot be satisfied because the number of maximum available pages is "
+ "exceeded. Available pages: %d. Requested pages: %d",
this.maxPages, requiredPages));
}

Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
Expand All @@ -181,7 +199,7 @@ private void waitForSegment(int requiredPages) throws EOFException {
+ pageSize
+ " bytes. Available pages: "
+ freePages()
+ ". Request pages: "
+ ". Requested pages: "
+ requiredPages);
}
checkClosed();
Expand All @@ -199,6 +217,7 @@ public int pageSize() {
return pageSize;
}

@Override
public int totalSize() {
return maxPages * pageSize;
}
Expand All @@ -216,10 +235,11 @@ public void returnAll(List<MemorySegment> memory) {
inLock(
lock,
() -> {
pageUsage -= memory.size();
if (this.pageUsage < 0) {
final int newPageUsage = pageUsage - memory.size();
if (newPageUsage < 0) {
throw new RuntimeException("Return too more memories.");
}
pageUsage = newPageUsage;
cachePages.addAll(memory);
for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) {
waiters.pollFirst().signal();
Expand All @@ -237,6 +257,7 @@ public long availableMemory() {
return ((long) freePages()) * pageSize;
}

@Override
public void close() {
inLock(
lock,
Expand Down
Loading

0 comments on commit 0179de1

Please sign in to comment.