Skip to content

Commit

Permalink
Cleanup and improve some logging code. (deephaven#4065)
Browse files Browse the repository at this point in the history
Improved as part of the investigation into deephaven#4051
  • Loading branch information
devinrsmith authored Jun 26, 2023
1 parent aee97fd commit 97c492b
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 171 deletions.
26 changes: 26 additions & 0 deletions IO/src/main/java/io/deephaven/io/log/LogBufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,35 @@
package io.deephaven.io.log;

import io.deephaven.base.pool.Pool;
import io.deephaven.io.log.impl.LogBufferPoolImpl;

import java.nio.ByteBuffer;

public interface LogBufferPool extends Pool<ByteBuffer> {

/**
* Creates a log buffer pool with at least {@code bufferCount} items. If all of the buffers are in use, additional
* buffers will be created on-demand.
*
* @param bufferCount the buffer count
* @param bufferSize the buffer size
* @return the log buffer pool
*/
static LogBufferPool of(int bufferCount, int bufferSize) {
return new LogBufferPoolLenientImpl(bufferCount, bufferSize);
}

/**
* Creates a log buffer pool with at least {@code bufferCount} items. If all of the buffers are in use, additional
* takes will spin or block until one becomes available.
*
* @param bufferCount the buffer count
* @param bufferSize the buffer size
* @return the log buffer pool
*/
static LogBufferPool ofStrict(int bufferCount, int bufferSize) {
return new LogBufferPoolImpl(bufferCount, bufferSize);
}

ByteBuffer take(int minSize);
}
27 changes: 27 additions & 0 deletions IO/src/main/java/io/deephaven/io/log/LogBufferPoolLenientImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.io.log;

import io.deephaven.base.pool.ThreadSafeLenientFixedSizePool;

import java.nio.ByteBuffer;

class LogBufferPoolLenientImpl extends ThreadSafeLenientFixedSizePool<ByteBuffer> implements LogBufferPool {

private final int bufferSize;

public LogBufferPoolLenientImpl(int bufferCount, final int bufferSize) {
super(bufferCount, () -> ByteBuffer.allocate(bufferSize), ByteBuffer::clear);
this.bufferSize = bufferSize;

}

@Override
public ByteBuffer take(int minSize) {
if (minSize > bufferSize) {
throw new UnsupportedOperationException("Not Implemented Yet");
}
return take();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class LogBufferPoolImpl extends ThreadSafeFixedSizePool<ByteBuffer> imple
private final int bufferSize;

public LogBufferPoolImpl(int bufferCount, final int bufferSize) {
super(bufferCount, () -> ByteBuffer.allocate(bufferSize), null);
super(bufferCount, () -> ByteBuffer.allocate(bufferSize), ByteBuffer::clear);
this.bufferSize = bufferSize;

}
Expand Down
39 changes: 20 additions & 19 deletions IO/src/main/java/io/deephaven/io/log/impl/LogOutputBaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,31 @@
package io.deephaven.io.log.impl;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.io.log.LogBufferPool;
import io.deephaven.base.log.LogOutput;
import io.deephaven.io.streams.ByteBufferOutputStream;
import io.deephaven.io.streams.ByteBufferSink;
import org.jetbrains.annotations.VisibleForTesting;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public abstract class LogOutputBaseImpl implements LogOutput, ByteBufferSink {
@VisibleForTesting
static final int START_SIZE_BYTES = 128;

// the stream in which we produce our output
protected final ByteBufferOutputStream stream;

// Marker for the end of the header position
private int endOfHeaderPosition;

// where the output buffers come from
@SuppressWarnings("WeakerAccess")
protected final LogBufferPool bufferPool;

// the buffers in this entry
protected ByteBuffer[] buffers;
@SuppressWarnings("WeakerAccess")
protected int bufferCount;
private final LogBufferPool bufferPool;
private ByteBuffer[] buffers;
private int bufferCount;

@SuppressWarnings("WeakerAccess")
public LogOutputBaseImpl(LogBufferPool bufferPool) {
Expand All @@ -49,17 +50,18 @@ public int getEndOfHeaderOffset() {
return endOfHeaderPosition;
}

private ByteBuffer nextBuffer() {
ByteBuffer byteBuffer = bufferPool.take();
private ByteBuffer nextBuffer(int need) {
ByteBuffer byteBuffer = Objects.requireNonNull(bufferPool.take(need));
buffers = ArrayUtil.put(buffers, bufferCount, byteBuffer, ByteBuffer.class);
bufferCount++;
stream.setBuffer(byteBuffer);
return byteBuffer;
}

@Override // from ByteBufferSink
public ByteBuffer acceptBuffer(ByteBuffer b, int need) throws IOException {
return nextBuffer();
public ByteBuffer acceptBuffer(ByteBuffer b, int need) {
assert b == buffers[bufferCount - 1];
return nextBuffer(need);
}

@Override // from ByteBufferSink
Expand All @@ -71,7 +73,7 @@ public void close(ByteBuffer b) throws IOException {
@Override // from LogOutput
public LogOutput start() {
clear();
nextBuffer();
nextBuffer(START_SIZE_BYTES);
return this;
}

Expand All @@ -88,8 +90,8 @@ public int relativeSize() {
@Override // from LogOutput
public int size() {
int byteCount = 0;
for (int nIndex = 0, nLength = bufferCount; nIndex < nLength; nIndex++) {
byteCount += buffers[nIndex].position();
for (int i = 0; i < bufferCount; ++i) {
byteCount += buffers[i].position();
}
return byteCount;
}
Expand All @@ -106,10 +108,9 @@ public ByteBuffer getBuffer(int i) {

@Override // from LogOutput
public LogOutput clear() {
for (int nIndex = 0, nLength = bufferCount; nIndex < nLength; nIndex++) {
buffers[nIndex].clear();
bufferPool.give(buffers[nIndex]);
buffers[nIndex] = null;
for (int i = 0; i < bufferCount; ++i) {
bufferPool.give(buffers[i]);
buffers[i] = null;
}
bufferCount = 0;
stream.setBuffer(null);
Expand Down
4 changes: 2 additions & 2 deletions IO/src/main/java/io/deephaven/io/logger/NullLoggerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.io.log.LogBufferPool;
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.log.LogEntryPool;
import io.deephaven.io.log.LogLevel;
import io.deephaven.io.log.LogSink;
import io.deephaven.io.log.impl.LogBufferPoolImpl;
import io.deephaven.io.log.impl.LogEntryPoolImpl;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -41,7 +41,7 @@ private NullLoggerImpl(@NotNull final LogEntryPool logEntryPool,
}

public NullLoggerImpl(@NotNull final LogLevel loggingLevel) {
this(new LogEntryPoolImpl(1024, new LogBufferPoolImpl(2048, 1024)),
this(new LogEntryPoolImpl(1024, LogBufferPool.ofStrict(2048, 1024)),
loggingLevel,
new NullLoggerTimeSource(),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.io.log.*;
import io.deephaven.io.log.impl.LogBufferPoolImpl;
import io.deephaven.io.log.impl.LogEntryPoolImpl;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -34,7 +33,7 @@ public static Logger makeLogger(@NotNull final OutputStream outputStream,
@NotNull final LoggerTimeSource timeSource,
@NotNull final TimeZone tz) {
final LogEntryPool logEntryPool =
new LogEntryPoolImpl(entryCount, new LogBufferPoolImpl(bufferCount, bufferSize));
new LogEntryPoolImpl(entryCount, LogBufferPool.ofStrict(bufferCount, bufferSize));
return new ProcessStreamLoggerImpl(logEntryPool, outputStream, loggingLevel, timeSource, tz);
}

Expand Down
25 changes: 2 additions & 23 deletions IO/src/main/java/io/deephaven/io/logger/StreamLoggerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,13 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;

public class StreamLoggerImpl implements Logger {

/**
* Static buffer pool, shared among all SystemOut loggers
*/
private static final Pool<ByteBuffer> buffers = new ThreadSafeLenientFixedSizePool<>(2048,
() -> ByteBuffer.allocate(512),
null);

private static final LogBufferPool logBufferPool = new LogBufferPool() {
@Override
public ByteBuffer take(int minSize) {
return buffers.take();
}

@Override
public ByteBuffer take() {
return buffers.take();
}

@Override
public void give(ByteBuffer item) {
buffers.give(item);
}
};
private static final LogBufferPool logBufferPool = LogBufferPool.of(2048, 512);

/**
* Specialized entries for SystemOut loggers
Expand Down Expand Up @@ -70,7 +50,7 @@ public Entry start(LogSink sink, OutputStream stream, LogLevel level, long curre
*/
private static final Pool<Entry> entries = new ThreadSafeLenientFixedSizePool<>(1024,
() -> new Entry(logBufferPool),
null);
Entry::clear);

/**
* Specialized sink for stream loggers
Expand All @@ -86,7 +66,6 @@ public void write(Entry e) {
} catch (IOException x) {
throw new UncheckedIOException(x);
} finally {
e.clear();
entries.give(e);
}
}
Expand Down
Loading

0 comments on commit 97c492b

Please sign in to comment.