From 6794ca269b0be470464ed60813219c732ab21f1d Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Wed, 12 Feb 2025 10:11:22 -0500 Subject: [PATCH] Some experiments with IdleInputStream. Signed-off-by: Santiago Pericas-Geertsen --- .../common/socket/IdleInputStream.java | 18 ++--- .../common/socket/IdleInputStreamTest.java | 79 +++++++++++++++++++ 2 files changed, 85 insertions(+), 12 deletions(-) create mode 100644 common/socket/src/test/java/io/helidon/common/socket/IdleInputStreamTest.java diff --git a/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java b/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java index 33b037c8fe0..b5459e23ad8 100644 --- a/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java +++ b/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2025 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.util.Objects; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -105,6 +104,11 @@ boolean isClosed() { return closed; } + void endIdle() { + idlingThread.cancel(true); + idlingThread = null; + } + private void handle() { try { next = upstream.read(); @@ -116,14 +120,4 @@ private void handle() { throw new UncheckedIOException(e); } } - - private void endIdle() { - try { - idlingThread.get(); - idlingThread = null; - } catch (InterruptedException | ExecutionException e) { - closed = true; - throw new RuntimeException("Exception in socket monitor thread.", e); - } - } } diff --git a/common/socket/src/test/java/io/helidon/common/socket/IdleInputStreamTest.java b/common/socket/src/test/java/io/helidon/common/socket/IdleInputStreamTest.java new file mode 100644 index 00000000000..7884c8c6c8b --- /dev/null +++ b/common/socket/src/test/java/io/helidon/common/socket/IdleInputStreamTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2025 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.common.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.jupiter.api.Test; + +class IdleInputStreamTest { + + @Test + void testIdleInputStream() throws ExecutionException, InterruptedException { + Barrier endBarrier = new Barrier(); + Barrier startBarrier = new Barrier(); + BarrierInputStream bis = new BarrierInputStream(startBarrier, endBarrier); + IdleInputStream iis = new IdleInputStream(bis, "socketId", "socketId"); + + iis.idle(); + startBarrier.waitOn(); // waits on idle thread to start + iis.endIdle(); + endBarrier.waitOn(); // waits on idle thread interruption + } + + static class BarrierInputStream extends InputStream { + + private final Barrier endBarrier; + private final Barrier startBarrier; + + BarrierInputStream(Barrier startBarrier, Barrier endBarrier) { + this.startBarrier = startBarrier; + this.endBarrier = endBarrier; + } + + @Override + public int read() throws IOException { + try { + startBarrier.retract(); + Barrier nextBarrier = new Barrier(); + nextBarrier.waitOn(); + return 0; + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + endBarrier.retract(); + return 0; + } + } + } + + /** + * A barrier is used to force a thread to wait (block) until it is retracted. + */ + private static class Barrier { + private final CompletableFuture future = new CompletableFuture<>(); + + void waitOn() throws ExecutionException, InterruptedException { + future.get(); + } + + void retract() { + future.complete(null); + } + } +}