From 578edc368a4276e9a82000dd2ba71273807bd6b5 Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Thu, 26 Sep 2024 10:04:23 -0400 Subject: [PATCH] Smart async writer in webserver (#9191) New type of writer that can switch from async to sync writing dynamically based on average async queue size heuristics. --- .../common/socket/SmartSocketWriter.java | 62 +++++++++++++++++++ .../helidon/common/socket/SocketWriter.java | 10 ++- .../common/socket/SocketWriterAsync.java | 19 +++++- .../helidon/webserver/ConnectionHandler.java | 6 +- .../webserver/ListenerConfigBlueprint.java | 11 ++++ 5 files changed, 101 insertions(+), 7 deletions(-) create mode 100644 common/socket/src/main/java/io/helidon/common/socket/SmartSocketWriter.java diff --git a/common/socket/src/main/java/io/helidon/common/socket/SmartSocketWriter.java b/common/socket/src/main/java/io/helidon/common/socket/SmartSocketWriter.java new file mode 100644 index 00000000000..1d310535b4b --- /dev/null +++ b/common/socket/src/main/java/io/helidon/common/socket/SmartSocketWriter.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.socket; + +import java.util.concurrent.ExecutorService; + +import io.helidon.common.buffers.BufferData; + +/** + * A special socket write that starts async but may switch to sync mode if it + * detects that the async queue size is below {@link #QUEUE_SIZE_THRESHOLD}. + * If it switches to sync mode, it shall never return back to async mode. + */ +public class SmartSocketWriter extends SocketWriter { + private static final long WINDOW_SIZE = 1000; + private static final double QUEUE_SIZE_THRESHOLD = 2.0; + + private final SocketWriterAsync asyncWriter; + private volatile long windowIndex; + private volatile boolean asyncMode; + + SmartSocketWriter(ExecutorService executor, HelidonSocket socket, int writeQueueLength) { + super(socket); + this.asyncWriter = new SocketWriterAsync(executor, socket, writeQueueLength); + this.asyncMode = true; + this.windowIndex = 0L; + } + + @Override + public void write(BufferData... buffers) { + for (BufferData buffer : buffers) { + write(buffer); + } + } + + @Override + public void write(BufferData buffer) { + if (asyncMode) { + asyncWriter.write(buffer); + if (++windowIndex % WINDOW_SIZE == 0 && asyncWriter.avgQueueSize() < QUEUE_SIZE_THRESHOLD) { + asyncMode = false; + } + } else { + asyncWriter.drainQueue(); + writeNow(buffer); // blocking write + } + } +} diff --git a/common/socket/src/main/java/io/helidon/common/socket/SocketWriter.java b/common/socket/src/main/java/io/helidon/common/socket/SocketWriter.java index 7d2480c02e3..f5323ab6360 100644 --- a/common/socket/src/main/java/io/helidon/common/socket/SocketWriter.java +++ b/common/socket/src/main/java/io/helidon/common/socket/SocketWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,15 +44,19 @@ protected SocketWriter(HelidonSocket socket) { * @param socket socket to write to * @param writeQueueLength maximal number of queued writes, write operation will block if the queue is full; if set to * {code 1} or lower, write queue is disabled and writes are direct to socket (blocking) + * @param smartAsyncWrites flag to enable smart async writes, see {@link io.helidon.common.socket.SmartSocketWriter} * @return a new socket writer */ public static SocketWriter create(ExecutorService executor, HelidonSocket socket, - int writeQueueLength) { + int writeQueueLength, + boolean smartAsyncWrites) { if (writeQueueLength <= 1) { return new SocketWriterDirect(socket); } else { - return new SocketWriterAsync(executor, socket, writeQueueLength); + return smartAsyncWrites + ? new SmartSocketWriter(executor, socket, writeQueueLength) + : new SocketWriterAsync(executor, socket, writeQueueLength); } } diff --git a/common/socket/src/main/java/io/helidon/common/socket/SocketWriterAsync.java b/common/socket/src/main/java/io/helidon/common/socket/SocketWriterAsync.java index 764e273170a..a663191f414 100644 --- a/common/socket/src/main/java/io/helidon/common/socket/SocketWriterAsync.java +++ b/common/socket/src/main/java/io/helidon/common/socket/SocketWriterAsync.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ class SocketWriterAsync extends SocketWriter implements DataWriter { private static final System.Logger LOGGER = System.getLogger(SocketWriterAsync.class.getName()); private static final BufferData CLOSING_TOKEN = BufferData.empty(); + private final ExecutorService executor; private final ArrayBlockingQueue writeQueue; private final CountDownLatch cdl = new CountDownLatch(1); @@ -40,6 +41,7 @@ class SocketWriterAsync extends SocketWriter implements DataWriter { private volatile Throwable caught; private volatile boolean run = true; private Thread thread; + private double avgQueueSize; /** * A new socket writer. @@ -116,7 +118,8 @@ private void run() { CompositeBufferData toWrite = BufferData.createComposite(writeQueue.take()); // wait if the queue is empty // we only want to read a certain amount of data, if somebody writes huge amounts // we could spin here forever and run out of memory - for (int i = 0; i < 1000; i++) { + int queueSize = 1; + for (; queueSize <= 1000; queueSize++) { BufferData newBuf = writeQueue.poll(); // drain ~all elements from the queue, don't wait. if (newBuf == null) { break; @@ -124,6 +127,7 @@ private void run() { toWrite.add(newBuf); } writeNow(toWrite); + avgQueueSize = (avgQueueSize + queueSize) / 2.0; } cdl.countDown(); } catch (Throwable e) { @@ -141,4 +145,15 @@ private void checkRunning() { throw new SocketWriterException(caught); } } + + void drainQueue() { + BufferData buffer; + while ((buffer = writeQueue.poll()) != null) { + writeNow(buffer); + } + } + + double avgQueueSize() { + return avgQueueSize; + } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java index ab049a295d1..ca81fb4dfe2 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java @@ -132,8 +132,10 @@ public final void run() { } reader = new DataReader(new MapExceptionDataSupplier(helidonSocket)); - writer = SocketWriter.create(listenerContext.executor(), helidonSocket, - listenerContext.config().writeQueueLength()); + writer = SocketWriter.create(listenerContext.executor(), + helidonSocket, + listenerConfig.writeQueueLength(), + listenerConfig.smartAsyncWrites()); } catch (Exception e) { throw e instanceof RuntimeException re ? re : new RuntimeException(e); // see ServerListener } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ListenerConfigBlueprint.java b/webserver/webserver/src/main/java/io/helidon/webserver/ListenerConfigBlueprint.java index 66048e5720a..0894d00bc8c 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ListenerConfigBlueprint.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ListenerConfigBlueprint.java @@ -162,6 +162,17 @@ interface ListenerConfigBlueprint { @Option.DefaultInt(0) int writeQueueLength(); + /** + * If enabled and {@link #writeQueueLength()} is greater than 1, then + * start with async writes but possibly switch to sync writes if + * async queue size is always below a certain threshold. + * + * @return smart async setting + */ + @Option.Configured + @Option.DefaultBoolean(false) + boolean smartAsyncWrites(); + /** * Initial buffer size in bytes of {@link java.io.BufferedOutputStream} created internally to * write data to a socket connection. Default is {@code 4096}.