Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: non-blocking sockets #1562

Draft
wants to merge 29 commits into
base: postgresql-dialect
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e182a20
perf: use non-blocking sockets
olavloite Mar 17, 2024
06e2903
chore: add missing copyright headers
olavloite Mar 17, 2024
c730315
chore: further support for non-blocking sockets
olavloite Mar 19, 2024
a0ba3e4
Merge branch 'postgresql-dialect' into non-blocking-socket
olavloite Mar 24, 2024
d260585
chore: more fixes for non-blocking sockets
olavloite Mar 27, 2024
720dc4e
Merge branch 'postgresql-dialect' into non-blocking-socket
olavloite Mar 30, 2024
020057e
test: fix tests + stop reader thread
olavloite Mar 30, 2024
dccadf6
refactor: listen to all localhost addresses
olavloite Mar 30, 2024
4114b12
chore: use wildcard address
olavloite Mar 30, 2024
6bb83ce
fix: handle copy messages
olavloite Mar 30, 2024
5a0247f
fix: create parent directory
olavloite Mar 30, 2024
c6305d8
chore: accept copy messages in all modes
olavloite Mar 30, 2024
b4ea859
test: skip copy tests + skip uds for SQLAlchemy
olavloite Mar 30, 2024
41e2c0f
test: skip uds for SQLAlchemy
olavloite Mar 30, 2024
c88d5a6
chore: log zero bytes read
olavloite Mar 30, 2024
b9f4f23
test: skip more uds tests
olavloite Mar 30, 2024
ca46056
chore: keep track of time between reads
olavloite Apr 1, 2024
f20f7cf
chore: log more warnings
olavloite Apr 1, 2024
feb8912
chore: more logging
olavloite Apr 1, 2024
e2b1cb7
chore: more logging
olavloite Apr 1, 2024
2495839
test: always loop through selected keys
olavloite Apr 1, 2024
6763a65
chore: more logging
olavloite Apr 1, 2024
7a32a1a
fix: start time before end time
olavloite Apr 1, 2024
df7c41d
fix: only log each 10 seconds
olavloite Apr 1, 2024
5018275
test: wait at most 60 seconds for a read
olavloite Apr 1, 2024
315c59e
test: always select and iterate over keys
olavloite Apr 1, 2024
4a1fd32
chore: go back to conditional select
olavloite Apr 1, 2024
43b63a1
test: bind to loopback
olavloite Apr 1, 2024
d65455a
chore: go back to wildcard address
olavloite Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: log zero bytes read
olavloite committed Mar 30, 2024
commit c88d5a60b0628a7a5b23e645bf5c2b68309a0a1b
Original file line number Diff line number Diff line change
@@ -56,22 +56,25 @@
new InetSocketAddress((InetAddress) null, getLocalPort()),
++index);
if (getLocalPort() == 0) {
this.localPort = tcpListener.getServerSocketChannel().socket().getLocalPort();

Check warning on line 59 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java#L59

Added line #L59 was not covered by tests
}
listenersBuilder.add(tcpListener);

if (optionsMetadata.isDomainSocketEnabled()) {
File tempDir = new File(optionsMetadata.getSocketFile(getLocalPort()));
if (tempDir.getParentFile() != null && !tempDir.getParentFile().exists()) {
File socketFile = new File(optionsMetadata.getSocketFile(getLocalPort()));
if (socketFile.getParentFile() != null && !socketFile.getParentFile().exists()) {
//noinspection ResultOfMethodCallIgnored
tempDir.getParentFile().mkdirs();
socketFile.getParentFile().mkdirs();
}
if (socketFile.exists() && !socketFile.delete()) {
throw new IOException("Failed to re-create socket file");

Check warning on line 70 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java#L70

Added line #L70 was not covered by tests
}
NonBlockingServerListener listener =
createServerListener(
AFUNIXSelectorProvider.provider().openSelector(),
AFUNIXSelectorProvider.provider().openSelector(),
AFUNIXServerSocketChannel.open(),
AFUNIXSocketAddress.of(tempDir),
AFUNIXSocketAddress.of(socketFile),
++index);
listenersBuilder.add(listener);
}
@@ -107,9 +110,9 @@
}
notifyStarted();
logger.log(Level.INFO, "Non-blocking server started");
} catch (Throwable throwable) {
logger.log(Level.WARNING, "Non-blocking server failed to start", throwable);
notifyFailed(throwable);

Check warning on line 115 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java#L113-L115

Added lines #L113 - L115 were not covered by tests
}
}

@@ -119,8 +122,8 @@
try {
listener.getAcceptSelector().close();
logger.log(Level.INFO, "Closed listening selector {}", listener.getAcceptSelector());
} catch (IOException ioException) {
logger.log(Level.WARNING, "Failed to close selector", ioException);

Check warning on line 126 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java#L125-L126

Added lines #L125 - L126 were not covered by tests
}
}
for (ConnectionHandler handler : getConnectionHandlers()) {
@@ -129,14 +132,14 @@
logger.log(Level.INFO, "Terminated all active connections");
try {
SpannerPool.closeSpannerPool();
} catch (Throwable ignore) {

Check warning on line 135 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java#L135

Added line #L135 was not covered by tests
}
for (NonBlockingServerListener listener : serverListeners) {
try {
listener.getServerSocketChannel().close();
logger.log(Level.INFO, "Closed listening socket {}", listener.getServerSocketChannel());
} catch (IOException ioException) {
logger.log(Level.WARNING, "Failed to close socket", ioException);

Check warning on line 142 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingProxyServer.java#L141-L142

Added lines #L141 - L142 were not covered by tests
}
}
notifyStopped();
Original file line number Diff line number Diff line change
@@ -55,9 +55,9 @@
new Listener() {
@Override
public void failed(State from, Throwable failure) {
super.failed(from, failure);
running.set(false);
}

Check warning on line 60 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingSocketReader.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingSocketReader.java#L58-L60

Added lines #L58 - L60 were not covered by tests

@Override
public void terminated(State from) {
@@ -92,8 +92,8 @@
keys.clear();
}
}
} catch (IOException ioException) {
logger.log(Level.WARNING, "selectNow for reader failed", ioException);

Check warning on line 96 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingSocketReader.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingSocketReader.java#L95-L96

Added lines #L95 - L96 were not covered by tests
}
}

@@ -104,15 +104,15 @@
}
SocketChannel channel = (SocketChannel) key.channel();
if (!(channel.isOpen() && channel.isConnected())) {
return;

Check warning on line 107 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingSocketReader.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingSocketReader.java#L107

Added line #L107 was not covered by tests
}

try {
if (handler.getStatus() == ConnectionStatus.UNAUTHENTICATED) {
ByteBuffer lengthBuffer = read(4, channel);
ByteBuffer lengthBuffer = read(4, channel, true);
lengthBuffer.rewind();
int length = ByteConverter.int4(lengthBuffer.array(), 0);
ByteBuffer dataBuffer = read(length, lengthBuffer, channel);
ByteBuffer dataBuffer = read(length, lengthBuffer, channel, true);
dataBuffer.rewind();
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(dataBuffer)) {
handler.setRawInputStream(inputStream);
@@ -121,13 +121,14 @@
}
} else {
// All control messages has a 1-byte type + 4 byte length.
ByteBuffer headerBuffer = read(handler.getHeaderBuffer(), channel);
ByteBuffer headerBuffer = read(handler.getHeaderBuffer(), channel, false);
byte[] dst = new byte[4];
headerBuffer.position(1);
headerBuffer.get(dst);
int length = ByteConverter.int4(dst, 0);
headerBuffer.rewind();
ByteBuffer message = read(handler.getMessageBuffer(length + 1), headerBuffer, channel);
ByteBuffer message =
read(handler.getMessageBuffer(length + 1), headerBuffer, channel, false);
message.rewind();
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(message)) {
handler.setRawInputStream(inputStream);
@@ -154,28 +155,39 @@
}
}

private static ByteBuffer read(int length, SocketChannel channel) throws IOException {
return read(length, null, channel);
private static ByteBuffer read(int length, SocketChannel channel, boolean bootstrap)
throws IOException {
return read(length, null, channel, bootstrap);
}

private static ByteBuffer read(ByteBuffer destination, SocketChannel channel) throws IOException {
return read(destination, null, channel);
private static ByteBuffer read(ByteBuffer destination, SocketChannel channel, boolean bootstrap)
throws IOException {
return read(destination, null, channel, bootstrap);
}

private static ByteBuffer read(int length, ByteBuffer header, SocketChannel channel)
throws IOException {
private static ByteBuffer read(
int length, ByteBuffer header, SocketChannel channel, boolean bootstrap) throws IOException {
ByteBuffer destination = ByteBuffer.allocate(length);
return read(destination, header, channel);
return read(destination, header, channel, bootstrap);
}

private static ByteBuffer read(ByteBuffer destination, ByteBuffer header, SocketChannel channel)
private static ByteBuffer read(
ByteBuffer destination, ByteBuffer header, SocketChannel channel, boolean bootstrap)
throws IOException {
if (header != null) {
destination.put(header);
}
int read;
int read, zeroBytesCounter = 0;
do {
read = channel.read(destination);
if (read == 0) {
zeroBytesCounter++;
if (zeroBytesCounter % 1000 == 0) {
System.out.println("Read zero bytes " + zeroBytesCounter + " times");
System.out.println("Expecting " + destination.capacity() + " bytes");
System.out.println("Remaining " + destination.remaining() + " bytes");
}
}
} while (read > -1 && destination.hasRemaining());
if (read == -1) {
throw new EOFException();
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.pgadapter.python.PythonTest;
import com.google.cloud.spanner.pgadapter.wireprotocol.QueryMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.WireMessage;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ public class SqlAlchemyOrmTest extends AbstractMockServerTest {

@Parameters(name = "host = {0}")
public static List<Object[]> data() {
return ImmutableList.of(new Object[] {"localhost"}/*, new Object[] {""}*/);
return ImmutableList.of(new Object[] {"localhost"} /*, new Object[] {""}*/);
}

@BeforeClass

Unchanged files with check annotations Beta

@Override
void createSSLSocket() throws IOException {
throw new IOException("SSL is not supported for non-blocking connection handlers");

Check warning on line 59 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java#L59

Added line #L59 was not covered by tests
}
ByteBuffer getHeaderBuffer() {
protected void closeSocket() throws IOException {
try {
this.channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
throw ioException;
} catch (Throwable t) {
t.printStackTrace();

Check warning on line 99 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java#L95-L99

Added lines #L95 - L99 were not covered by tests
}
}
while (true) {
ControlMessage message = this.controlMessages.take();
if (message instanceof FlushMessage || message instanceof SyncMessage) {
continue;

Check warning on line 122 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java#L122

Added line #L122 was not covered by tests
}
return message;
}
// the server is shutting down.
logger.log(Level.INFO, "Listener shutting down");
break;
} catch (Throwable unexpectedError) {
logger.warning(

Check warning on line 78 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingServerListener.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingServerListener.java#L77-L78

Added lines #L77 - L78 were not covered by tests
"Unexpected error while listening for incoming connections: "
+ unexpectedError.getMessage());

Check warning on line 80 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingServerListener.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingServerListener.java#L80

Added line #L80 was not covered by tests
}
}
}
try {
OptionsMetadata optionsMetadata = extractMetadata(args, System.out);
OpenTelemetry openTelemetry = setupOpenTelemetry(optionsMetadata);
ProxyServer server = new NonBlockingProxyServer(optionsMetadata, openTelemetry);

Check warning on line 55 in src/main/java/com/google/cloud/spanner/pgadapter/Server.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/Server.java#L55

Added line #L55 was not covered by tests
server.startServer();
} catch (Exception e) {
printError(e, System.err, System.out);
@Override
public int read() throws IOException {
if (!buffer.hasRemaining()) {
return -1;

Check warning on line 33 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ByteBufferInputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ByteBufferInputStream.java#L33

Added line #L33 was not covered by tests
}
return Byte.toUnsignedInt(buffer.get());

Check warning on line 35 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ByteBufferInputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ByteBufferInputStream.java#L35

Added line #L35 was not covered by tests
}
@Override
public int read(@Nonnull byte[] destination, int offset, int len) throws IOException {
if (!buffer.hasRemaining()) {
return -1;

Check warning on line 41 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ByteBufferInputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ByteBufferInputStream.java#L41

Added line #L41 was not covered by tests
}
int actualLength = Math.min(len, buffer.remaining());
@Override
public void write(int b) throws IOException {
if (b1 == null) {
b1 = new byte[1];

Check warning on line 34 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ChannelOutputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ChannelOutputStream.java#L34

Added line #L34 was not covered by tests
}
b1[0] = (byte) b;
write(b1, 0, 1);
}

Check warning on line 38 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ChannelOutputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ChannelOutputStream.java#L36-L38

Added lines #L36 - L38 were not covered by tests
@Override
public void write(byte[] b, int off, int len) throws IOException {
@Override
public int read() throws IOException {
if (delegate == null) {
throw new IOException("No delegate connected");

Check warning on line 36 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ForwardingInputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ForwardingInputStream.java#L36

Added line #L36 was not covered by tests
}
return delegate.read();

Check warning on line 38 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ForwardingInputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ForwardingInputStream.java#L38

Added line #L38 was not covered by tests
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (delegate == null) {
throw new IOException("No delegate connected");

Check warning on line 44 in src/main/java/com/google/cloud/spanner/pgadapter/metadata/ForwardingInputStream.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/metadata/ForwardingInputStream.java#L44

Added line #L44 was not covered by tests
}
return delegate.read(b, off, len);
}
case PasswordMessage.IDENTIFIER:
return new PasswordMessage(connection, connection.getConnectionParameters());
default:
throw new IllegalStateException(String.format("Unknown message: %c", nextMsg));

Check warning on line 150 in src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java#L150

Added line #L150 was not covered by tests
}
} else {
switch (nextMsg) {
}
}
return rows;
} catch (InterruptedException interruptedException) {
throw PGExceptionFactory.newQueryCancelledException();

Check warning on line 572 in src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/ControlMessage.java#L571-L572

Added lines #L571 - L572 were not covered by tests
} finally {
if (converter != null) {
converter.close();