diff --git a/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java b/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java index 92ec6aed91a..75838cd1710 100644 --- a/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java +++ b/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java @@ -605,6 +605,15 @@ public SocketHttpClient sendChunk(String payload) throws IOException { return this; } + /** + * Provides access to underlying socket reader. + * + * @return the reader + */ + public BufferedReader socketReader() { + return socketReader; + } + /** * Override this to send a specific payload. * diff --git a/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java b/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java index 502bb1f8989..e8758e16977 100644 --- a/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java +++ b/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java @@ -25,6 +25,7 @@ import java.time.Duration; import io.helidon.common.GenericType; +import io.helidon.common.buffers.DataReader; import io.helidon.common.media.type.MediaTypes; import io.helidon.http.media.MediaContext; import io.helidon.http.sse.SseEvent; @@ -93,6 +94,9 @@ public > void handle(X source, HttpClientResponse res } } + source.onClose(); + } catch (DataReader.InsufficientDataAvailableException e) { + // normal SSE termination when connection closed by server source.onClose(); } catch (IOException e) { source.onError(e); diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/DataWriterSseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/DataWriterSseSink.java new file mode 100644 index 00000000000..9457b369291 --- /dev/null +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/DataWriterSseSink.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.sse; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import io.helidon.common.GenericType; +import io.helidon.common.buffers.BufferData; +import io.helidon.common.media.type.MediaType; +import io.helidon.common.media.type.MediaTypes; +import io.helidon.http.DateTime; +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; +import io.helidon.http.HttpMediaType; +import io.helidon.http.ServerResponseHeaders; +import io.helidon.http.Status; +import io.helidon.http.WritableHeaders; +import io.helidon.http.media.EntityWriter; +import io.helidon.http.media.MediaContext; +import io.helidon.http.sse.SseEvent; +import io.helidon.webserver.ConnectionContext; +import io.helidon.webserver.ServerConnectionException; +import io.helidon.webserver.http.ServerResponse; +import io.helidon.webserver.http.spi.SinkProviderContext; + +import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; +import static io.helidon.http.HeaderValues.create; + +/** + * Implementation of an SSE sink. Emits {@link SseEvent}s. + */ +class DataWriterSseSink implements SseSink { + + /** + * Type of SSE event sinks. + */ + public static final GenericType TYPE = GenericType.create(DataWriterSseSink.class); + + private static final Header CACHE_NO_CACHE_ONLY = create(HeaderNames.CACHE_CONTROL, "no-cache"); + private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); + private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8); + private static final WritableHeaders EMPTY_HEADERS = WritableHeaders.create(); + + private final ServerResponse response; + private final ConnectionContext ctx; + private final MediaContext mediaContext; + private final Runnable closeRunnable; + + DataWriterSseSink(SinkProviderContext context) { + this.response = context.serverResponse(); + this.ctx = context.connectionContext(); + this.mediaContext = ctx.listenerContext().mediaContext(); + this.closeRunnable = context.closeRunnable(); + writeStatusAndHeaders(); + } + + @Override + public DataWriterSseSink emit(SseEvent sseEvent) { + BufferData bufferData = BufferData.growing(512); + + Optional comment = sseEvent.comment(); + if (comment.isPresent()) { + bufferData.write(SSE_COMMENT); + bufferData.write(comment.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Optional id = sseEvent.id(); + if (id.isPresent()) { + bufferData.write(SSE_ID); + bufferData.write(id.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Optional name = sseEvent.name(); + if (name.isPresent()) { + bufferData.write(SSE_EVENT); + bufferData.write(name.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Object data = sseEvent.data(); + if (data != null) { + bufferData.write(SSE_DATA); + byte[] bytes = serializeData(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); + bufferData.write(bytes); + bufferData.write(SSE_NL); + } + bufferData.write(SSE_NL); + + // write event to the network + ctx.dataWriter().writeNow(bufferData); + return this; + } + + @Override + public void close() { + closeRunnable.run(); + ctx.serverSocket().close(); + } + + void writeStatusAndHeaders() { + ServerResponseHeaders headers = response.headers(); + + // verify response has no status or content type + HttpMediaType ct = headers.contentType().orElse(null); + if (response.status().code() != Status.OK_200.code() + || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { + throw new IllegalStateException("ServerResponse instance cannot be used to create SseSink"); + } + + // start writing status line + BufferData buffer = BufferData.growing(256); + buffer.write(OK_200); + + // serialize a date header if not included + if (!headers.contains(HeaderNames.DATE)) { + buffer.write(DATE); + byte[] dateBytes = DateTime.http1Bytes(); + buffer.write(dateBytes); + } + + // set up and write headers + if (ct == null) { + headers.add(CONTENT_TYPE_EVENT_STREAM); + } + headers.set(CACHE_NO_CACHE_ONLY); + for (Header header : headers) { + header.writeHttp1Header(buffer); + } + + // complete heading + buffer.write('\r'); // "\r\n" - empty line after headers + buffer.write('\n'); + + // write response heading to the network + ctx.dataWriter().writeNow(buffer); + } + + private byte[] serializeData(Object object, MediaType mediaType) { + if (object instanceof byte[] bytes) { + return bytes; + } else if (mediaContext != null) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + if (object instanceof String str && mediaType.equals(MediaTypes.TEXT_PLAIN)) { + EntityWriter writer = mediaContext.writer(GenericType.STRING, EMPTY_HEADERS, EMPTY_HEADERS); + writer.write(GenericType.STRING, str, baos, EMPTY_HEADERS, EMPTY_HEADERS); + } else { + GenericType type = GenericType.create(object); + WritableHeaders resHeaders = WritableHeaders.create(); + resHeaders.set(HeaderNames.CONTENT_TYPE, mediaType.text()); + EntityWriter writer = mediaContext.writer(type, EMPTY_HEADERS, resHeaders); + writer.write(type, object, baos, EMPTY_HEADERS, resHeaders); + } + return baos.toByteArray(); + } catch (IOException e) { + throw new ServerConnectionException("Failed to write SSE event", e); + + } + } + throw new IllegalStateException("Unable to serialize SSE event without a media context"); + } +} diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/OutputStreamSseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/OutputStreamSseSink.java new file mode 100644 index 00000000000..9b6db97623f --- /dev/null +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/OutputStreamSseSink.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.function.BiConsumer; + +import io.helidon.common.GenericType; +import io.helidon.common.media.type.MediaType; +import io.helidon.common.media.type.MediaTypes; +import io.helidon.http.HttpMediaType; +import io.helidon.http.Status; +import io.helidon.http.sse.SseEvent; +import io.helidon.webserver.http.ServerResponse; + +import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; + +/** + * Deprecated implementation of an SSE sink. Emits {@link SseEvent}s. + * + * @deprecated Should use {@link io.helidon.webserver.sse.DataWriterSseSink}. + */ +@Deprecated(since = "4.1.2", forRemoval = true) +class OutputStreamSseSink implements SseSink { + + /** + * Type of SSE event sinks. + */ + public static final GenericType TYPE = GenericType.create(DataWriterSseSink.class); + + private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); + + private final BiConsumer eventConsumer; + private final Runnable closeRunnable; + private final OutputStream outputStream; + + OutputStreamSseSink(ServerResponse serverResponse, BiConsumer eventConsumer, Runnable closeRunnable) { + // Verify response has no status or content type + HttpMediaType ct = serverResponse.headers().contentType().orElse(null); + if (serverResponse.status().code() != Status.OK_200.code() + || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { + throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse"); + } + + // Ensure content type set for SSE + if (ct == null) { + serverResponse.headers().add(CONTENT_TYPE_EVENT_STREAM); + } + + this.outputStream = serverResponse.outputStream(); + this.eventConsumer = eventConsumer; + this.closeRunnable = closeRunnable; + } + + @Override + public OutputStreamSseSink emit(SseEvent sseEvent) { + try { + Optional comment = sseEvent.comment(); + if (comment.isPresent()) { + outputStream.write(SSE_COMMENT); + outputStream.write(comment.get().getBytes(StandardCharsets.UTF_8)); + outputStream.write(SSE_NL); + } + Optional id = sseEvent.id(); + if (id.isPresent()) { + outputStream.write(SSE_ID); + outputStream.write(id.get().getBytes(StandardCharsets.UTF_8)); + outputStream.write(SSE_NL); + } + Optional name = sseEvent.name(); + if (name.isPresent()) { + outputStream.write(SSE_EVENT); + outputStream.write(name.get().getBytes(StandardCharsets.UTF_8)); + outputStream.write(SSE_NL); + } + Object data = sseEvent.data(); + if (data != SseEvent.NO_DATA) { + outputStream.write(SSE_DATA); + eventConsumer.accept(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); + outputStream.write(SSE_NL); + } + outputStream.write(SSE_NL); + outputStream.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public void close() { + closeRunnable.run(); + } +} diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java index 8cae4685a3e..aa89e5110c7 100644 --- a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, 2024 Oracle and/or its affiliates. + * Copyright (c) 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,102 +13,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.helidon.webserver.sse; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.util.Optional; -import java.util.function.BiConsumer; - import io.helidon.common.GenericType; -import io.helidon.common.media.type.MediaType; -import io.helidon.common.media.type.MediaTypes; -import io.helidon.http.HttpMediaType; -import io.helidon.http.Status; import io.helidon.http.sse.SseEvent; -import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.spi.Sink; -import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; - /** - * Implementation of an SSE sink. Emits {@link SseEvent}s. + * A sink for SSE events. */ -public class SseSink implements Sink { +public interface SseSink extends Sink { /** * Type of SSE event sinks. */ - public static final GenericType TYPE = GenericType.create(SseSink.class); - - private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); - - private final BiConsumer eventConsumer; - private final Runnable closeRunnable; - private final OutputStream outputStream; - - SseSink(ServerResponse serverResponse, BiConsumer eventConsumer, Runnable closeRunnable) { - // Verify response has no status or content type - HttpMediaType ct = serverResponse.headers().contentType().orElse(null); - if (serverResponse.status().code() != Status.OK_200.code() - || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { - throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse"); - } - - // Ensure content type set for SSE - if (ct == null) { - serverResponse.headers().add(CONTENT_TYPE_EVENT_STREAM); - } - - this.outputStream = serverResponse.outputStream(); - this.eventConsumer = eventConsumer; - this.closeRunnable = closeRunnable; - } + GenericType TYPE = GenericType.create(SseSink.class); + /** + * Emits an event using to the sink. + * + * @param event the event to emit + * @return this sink + */ @Override - public SseSink emit(SseEvent sseEvent) { - try { - Optional comment = sseEvent.comment(); - if (comment.isPresent()) { - outputStream.write(SSE_COMMENT); - outputStream.write(comment.get().getBytes(StandardCharsets.UTF_8)); - outputStream.write(SSE_NL); - } - Optional id = sseEvent.id(); - if (id.isPresent()) { - outputStream.write(SSE_ID); - outputStream.write(id.get().getBytes(StandardCharsets.UTF_8)); - outputStream.write(SSE_NL); - } - Optional name = sseEvent.name(); - if (name.isPresent()) { - outputStream.write(SSE_EVENT); - outputStream.write(name.get().getBytes(StandardCharsets.UTF_8)); - outputStream.write(SSE_NL); - } - Object data = sseEvent.data(); - if (data != SseEvent.NO_DATA) { - outputStream.write(SSE_DATA); - eventConsumer.accept(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); - outputStream.write(SSE_NL); - } - outputStream.write(SSE_NL); - outputStream.flush(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return this; - } + SseSink emit(SseEvent event); + /** + * Close SSE sink. + */ @Override - public void close() { - closeRunnable.run(); - } + void close(); } diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java index 466a1d5d126..f83625fc37f 100644 --- a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,9 +26,12 @@ import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.spi.Sink; import io.helidon.webserver.http.spi.SinkProvider; +import io.helidon.webserver.http.spi.SinkProviderContext; /** * Sink provider for SSE type. + * + * @see io.helidon.webserver.http.spi.SinkProvider */ public class SseSinkProvider implements SinkProvider { @@ -37,10 +40,34 @@ public boolean supports(GenericType> type, ServerRequest reque return SseSink.TYPE.equals(type) && request.headers().isAccepted(MediaTypes.TEXT_EVENT_STREAM); } + /** + * Creates a Sink for SSE events. + * + * @param context the context + * @return newly created sink + * @param type of sink + */ @Override @SuppressWarnings("unchecked") - public > X create(ServerResponse response, BiConsumer eventConsumer, - Runnable closeRunnable) { - return (X) new SseSink(response, eventConsumer, closeRunnable); + public > X create(SinkProviderContext context) { + return (X) new DataWriterSseSink(context); + } + + /** + * Creates a Sink for SSE events. + * + * @param response the HTTP response + * @param eventConsumer an event consumer + * @param closeRunnable a runnable to call on close + * @param type of sink + * @return newly created sink + * @deprecated replaced by {@link #create(SinkProviderContext)} + */ + @Override + @Deprecated(since = "4.1.2", forRemoval = true) + public > X create(ServerResponse response, + BiConsumer eventConsumer, + Runnable closeRunnable) { + return (X) new OutputStreamSseSink(response, eventConsumer, closeRunnable); } } diff --git a/webserver/sse/src/main/java/module-info.java b/webserver/sse/src/main/java/module-info.java index 571dbf8ebf3..5bd5d8ab346 100644 --- a/webserver/sse/src/main/java/module-info.java +++ b/webserver/sse/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ requires static io.helidon.common.features.api; requires transitive io.helidon.common; + requires transitive io.helidon.common.socket; requires transitive io.helidon.http.sse; requires transitive io.helidon.webserver; diff --git a/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java b/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java index 03b59ad0f31..e74626ebf81 100644 --- a/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java +++ b/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -138,4 +138,9 @@ public DirectHandlers directHandlers() { public ListenerConfig config() { return listenerConfiguration; } + + @Override + public HelidonSocket serverSocket() { + return serverSocket; + } } diff --git a/webserver/tests/sse/pom.xml b/webserver/tests/sse/pom.xml index 81553b54df9..239ba778edc 100644 --- a/webserver/tests/sse/pom.xml +++ b/webserver/tests/sse/pom.xml @@ -28,11 +28,18 @@ Helidon WebServer Tests SSE WebServer SSE tests + + io.helidon.webserver.tests.sse.Main + + + + io.helidon.webserver + helidon-webserver + io.helidon.webserver helidon-webserver-sse - test io.helidon.webclient @@ -70,4 +77,51 @@ test + + + + + org.apache.maven.plugins + maven-resources-plugin + ${version.plugin.resources} + + + org.apache.maven.plugins + maven-dependency-plugin + ${version.plugin.dependency} + + + copy-libs + prepare-package + + copy-dependencies + + + ${project.build.directory}/libs + false + false + true + true + runtime + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${version.plugin.jar} + + + + true + libs + ${mainClass} + false + + + + + + diff --git a/webserver/tests/sse/src/main/java/io/helidon/webserver/tests/sse/Main.java b/webserver/tests/sse/src/main/java/io/helidon/webserver/tests/sse/Main.java new file mode 100644 index 00000000000..65de0bfac5d --- /dev/null +++ b/webserver/tests/sse/src/main/java/io/helidon/webserver/tests/sse/Main.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.tests.sse; + +import java.util.stream.IntStream; + +import io.helidon.http.sse.SseEvent; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.http.HttpRouting; +import io.helidon.webserver.http.ServerRequest; +import io.helidon.webserver.http.ServerResponse; +import io.helidon.webserver.sse.SseSink; + +/** + * Simple SSE server that can be used to manually test interop with other + * clients such as Postman. + */ +public class Main { + + static void sse(ServerRequest req, ServerResponse res) { + try (SseSink sseSink = res.sink(SseSink.TYPE)) { + IntStream.range(0, 1000).forEach(i -> sseSink.emit(SseEvent.create("hello world " + i))); + } + } + + public static void main(String[] args) { + WebServer.builder() + .port(8080) + .routing(HttpRouting.builder().get("/sse", Main::sse)) + .build() + .start(); + } +} diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java new file mode 100644 index 00000000000..967b0552157 --- /dev/null +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.webserver.tests.sse; + +import java.io.BufferedReader; +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; + +import io.helidon.common.testing.http.junit5.SocketHttpClient; +import io.helidon.http.Method; + +class SimpleSseClient implements AutoCloseable { + + public enum State { + DISCONNECTED, + CONNECTED, + HEADERS_READ, + ERROR + } + + private State state = State.DISCONNECTED; + private BufferedReader reader; + private final String path; + private final SocketHttpClient client; + + public static SimpleSseClient create(int port, String path) { + return create("localhost", port, path, Duration.ofSeconds(10)); + } + + public static SimpleSseClient create(int port, String path, Duration timeout) { + return create("localhost", port, path, timeout); + } + + public static SimpleSseClient create(String host, int port, String path, Duration timeout) { + return new SimpleSseClient("localhost", port, path, timeout); + } + + private SimpleSseClient(String host, int port, String path, Duration timeout) { + this.path = path; + this.client = SocketHttpClient.create(host, port, timeout); + } + + public String nextEvent() { + ensureConnected(); + ensureHeadersRead(); + + try { + String line; + StringBuilder event = new StringBuilder(); + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + return event.toString(); + } + if (!event.isEmpty()) { + event.append("\n"); + } + event.append(line); + } + if (event.isEmpty()) { + return null; + } + state = State.ERROR; + throw new RuntimeException("Unable to parse response"); + } catch (IOException e) { + state = State.ERROR; + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + client.close(); + state = State.DISCONNECTED; + + } + + public State state() { + return state; + } + + private void ensureConnected() { + if (state == State.DISCONNECTED) { + client.request(Method.GET.toString(), + path, + "HTTP/1.1", + "localhost", + Collections.emptyList(), + null); + reader = client.socketReader(); + state = State.CONNECTED; + } + } + + private void ensureHeadersRead() { + if (state == State.CONNECTED) { + try { + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + state = State.HEADERS_READ; + return; + } + line = line.toLowerCase(); + if (line.contains("http/1.1") && !line.contains("200")) { + throw new RuntimeException("Invalid status code in response"); + } else if (line.contains("content-type") && !line.contains("text/event-stream")) { + throw new RuntimeException("Invalid content-type in response"); + } + } + state = State.ERROR; + throw new RuntimeException("Unable to parse response"); + } catch (IOException e) { + state = State.ERROR; + throw new RuntimeException(e); + } + } + } +} diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java index c25552ecb46..8254f8e4e51 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,14 +17,34 @@ package io.helidon.webserver.tests.sse; import io.helidon.http.sse.SseEvent; -import io.helidon.webserver.sse.SseSink; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.ServerRequest; import io.helidon.webserver.http.ServerResponse; +import io.helidon.webserver.sse.SseSink; + import jakarta.json.Json; import jakarta.json.JsonObject; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + class SseBaseTest { + private final WebServer webServer; + + SseBaseTest() { + this.webServer = null; + } + + SseBaseTest(WebServer webServer) { + this.webServer = webServer; + } + + protected WebServer webServer() { + return webServer; + } + static void sseString1(ServerRequest req, ServerResponse res) { try (SseSink sseSink = res.sink(SseSink.TYPE)) { sseSink.emit(SseEvent.create("hello")) @@ -96,4 +116,14 @@ static void sseIdComment(ServerRequest req, ServerResponse res) { sseSink.emit(event); } } + + protected void testSse(String path, String... events) throws Exception { + assert webServer != null; + try (SimpleSseClient sseClient = SimpleSseClient.create(webServer.port(), path)) { + for (String e : events) { + assertThat(sseClient.nextEvent(), is(e)); + } + assertThat(sseClient.nextEvent(), is(nullValue())); + } + } } diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java index a812b102841..f039353cca4 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import io.helidon.webclient.http1.Http1Client; import io.helidon.webclient.http1.Http1ClientResponse; import io.helidon.webclient.sse.SseSource; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.HttpRules; import io.helidon.webserver.http.ServerRequest; import io.helidon.webserver.http.ServerResponse; @@ -44,7 +45,8 @@ class SseClientTest extends SseBaseTest { private final Http1Client client; - SseClientTest(Http1Client client) { + SseClientTest(WebServer webServer, Http1Client client) { + super(webServer); this.client = client; } diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java index 0ee68a6bc8c..b22f446530d 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import io.helidon.common.config.Config; import io.helidon.http.Headers; import io.helidon.http.HttpMediaType; -import io.helidon.http.Status; import io.helidon.http.WritableHeaders; import io.helidon.http.media.EntityWriter; import io.helidon.http.media.MediaSupport; @@ -34,7 +33,7 @@ import io.helidon.http.media.spi.MediaSupportProvider; import io.helidon.http.sse.SseEvent; import io.helidon.webclient.http1.Http1Client; -import io.helidon.webclient.http1.Http1ClientResponse; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.HttpRules; import io.helidon.webserver.http.ServerRequest; import io.helidon.webserver.http.ServerResponse; @@ -44,23 +43,20 @@ import org.junit.jupiter.api.Test; -import static io.helidon.http.HeaderValues.ACCEPT_EVENT_STREAM; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - /** * Test that shows how to serialize an individual SSE event using a custom * {@link io.helidon.http.media.spi.MediaSupportProvider} and a user-defined * media type. Each SSE event can be given a different media type. */ @ServerTest -class SseServerMediaTest { +class SseServerMediaTest extends SseBaseTest { private static final HttpMediaType MY_PLAIN_TEXT = HttpMediaType.create("text/my_plain"); private final Http1Client client; - SseServerMediaTest(Http1Client client) { + SseServerMediaTest(WebServer webServer, Http1Client client) { + super(webServer); this.client = client; } @@ -70,8 +66,8 @@ static void routing(HttpRules rules) { } @Test - void testSseJson() { - testSse("/sse", "data:HELLO\n\ndata:world\n\n"); + void testSseJson() throws Exception { + testSse("/sse", "data:HELLO", "data:world"); } private static void sse(ServerRequest req, ServerResponse res) { @@ -81,13 +77,6 @@ private static void sse(ServerRequest req, ServerResponse res) { } } - private void testSse(String path, String result) { - try (Http1ClientResponse response = client.get(path).header(ACCEPT_EVENT_STREAM).request()) { - assertThat(response.status(), is(Status.OK_200)); - assertThat(response.as(String.class), is(result)); - } - } - @SuppressWarnings("unchecked") public static class MyStringSupport extends StringSupport { @@ -143,6 +132,10 @@ private void write(String toWrite, } } + /** + * Provider for {@link io.helidon.webserver.tests.sse.SseServerMediaTest.MyStringSupport}, + * loaded as a service. + */ public static class MyStringSupportProvider implements MediaSupportProvider, Weighted { @Override diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java index cf49ddbe2d8..81a71df14a4 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,13 @@ import io.helidon.http.Status; import io.helidon.webclient.http1.Http1Client; import io.helidon.webclient.http1.Http1ClientResponse; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.HttpRules; import io.helidon.webserver.testing.junit5.ServerTest; import io.helidon.webserver.testing.junit5.SetUpRoute; import org.junit.jupiter.api.Test; -import static io.helidon.http.HeaderValues.ACCEPT_EVENT_STREAM; import static io.helidon.http.HeaderValues.ACCEPT_JSON; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -33,10 +33,8 @@ @ServerTest class SseServerTest extends SseBaseTest { - private final Http1Client client; - - SseServerTest(Http1Client client) { - this.client = client; + SseServerTest(WebServer webServer) { + super(webServer); } @SetUpRoute @@ -50,49 +48,43 @@ static void routing(HttpRules rules) { } @Test - void testSseString1() { - testSse("/sseString1", "data:hello\n\ndata:world\n\n"); + void testSseString1() throws Exception { + testSse("/sseString1", "data:hello", "data:world"); } @Test - void testSseString2() { - testSse("/sseString2", "data:1\n\ndata:2\n\ndata:3\n\n"); + void testSseString2() throws Exception { + testSse("/sseString2", "data:1", "data:2", "data:3"); } @Test - void testSseJson1() { - testSse("/sseJson1", "data:{\"hello\":\"world\"}\n\n"); + void testSseJson1() throws Exception { + testSse("/sseJson1", "data:{\"hello\":\"world\"}"); } @Test - void testSseJson2() { - testSse("/sseJson2", "data:{\"hello\":\"world\"}\n\n"); + void testSseJson2() throws Exception { + testSse("/sseJson2", "data:{\"hello\":\"world\"}"); } @Test - void testSseMixed() { - testSse("/sseMixed", - "data:hello\n\ndata:world\n\n" + - "data:{\"hello\":\"world\"}\n\n" + - "data:{\"hello\":\"world\"}\n\n"); + void testSseMixed() throws Exception { + testSse("/sseMixed", "data:hello", "data:world", + "data:{\"hello\":\"world\"}", "data:{\"hello\":\"world\"}"); } @Test - void testIdComment() { - testSse("/sseIdComment", ":This is a comment\nid:1\ndata:hello\n\n"); + void testIdComment() throws Exception { + testSse("/sseIdComment", ":This is a comment\nid:1\ndata:hello"); } @Test void testWrongAcceptType() { + Http1Client client = Http1Client.builder() + .baseUri("http://localhost:" + webServer().port()) + .build(); try (Http1ClientResponse response = client.get("/sseString1").header(ACCEPT_JSON).request()) { assertThat(response.status(), is(Status.NOT_ACCEPTABLE_406)); } } - - private void testSse(String path, String result) { - try (Http1ClientResponse response = client.get(path).header(ACCEPT_EVENT_STREAM).request()) { - assertThat(response.status(), is(Status.OK_200)); - assertThat(response.as(String.class), is(result)); - } - } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java index 1c14c051e91..c267d69dfa3 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import io.helidon.common.buffers.DataReader; import io.helidon.common.buffers.DataWriter; +import io.helidon.common.socket.HelidonSocket; import io.helidon.common.socket.SocketContext; /** @@ -58,7 +59,7 @@ public interface ConnectionContext extends SocketContext { /** * Router that may contain routings of different types (HTTP, WebSocket, grpc). * - * @return rouer + * @return the router */ Router router(); @@ -71,4 +72,13 @@ public interface ConnectionContext extends SocketContext { default Optional proxyProtocolData() { return Optional.empty(); } + + /** + * The underlying network socket for the connection. + * + * @return the socket + */ + default HelidonSocket serverSocket() { + throw new UnsupportedOperationException("Not supported"); + } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java index ca81fb4dfe2..abe56f67b76 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java @@ -244,6 +244,11 @@ public Optional proxyProtocolData() { return Optional.ofNullable(proxyProtocolData); } + @Override + public HelidonSocket serverSocket() { + return helidonSocket; + } + private ServerConnection identifyConnection() { // if just one candidate, take a chance with it if (providerCandidates.size() == 1) { diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java index bae84c1f343..53b1c7dd150 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,17 @@ public interface SinkProvider { */ boolean supports(GenericType> type, ServerRequest request); + /** + * Creates a sink using this provider. + * + * @param context a context for a sync provider + * @param type of sink + * @return newly created sink + */ + default > X create(SinkProviderContext context) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * Creates a sink using this provider. * @@ -47,7 +58,10 @@ public interface SinkProvider { * @param closeRunnable a runnable to call on close * @param type of sink * @return newly created sink + * @deprecated replaced by {@link #create(SinkProviderContext)} */ - > X create(ServerResponse response, BiConsumer eventConsumer, + @Deprecated(since = "4.1.2", forRemoval = true) + > X create(ServerResponse response, + BiConsumer eventConsumer, Runnable closeRunnable); } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java new file mode 100644 index 00000000000..7df4030d486 --- /dev/null +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.webserver.http.spi; + +import io.helidon.webserver.ConnectionContext; +import io.helidon.webserver.http.ServerResponse; + +/** + * A context for {@link io.helidon.webserver.http.spi.SinkProvider}s supplied + * at creation time. + */ +public interface SinkProviderContext { + + /** + * Obtains the server response associated with this context. + * + * @return the server response + */ + ServerResponse serverResponse(); + + /** + * Obtains access to the connection context. + * + * @return the connection context + */ + ConnectionContext connectionContext(); + + /** + * Runnable to execute to close the response. + * + * @return the close runnable + */ + Runnable closeRunnable(); +} diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java index 40e6d09b228..330611ae76b 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java @@ -45,9 +45,11 @@ import io.helidon.http.media.MediaContext; import io.helidon.webserver.ConnectionContext; import io.helidon.webserver.ServerConnectionException; +import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.ServerResponseBase; import io.helidon.webserver.http.spi.Sink; import io.helidon.webserver.http.spi.SinkProvider; +import io.helidon.webserver.http.spi.SinkProviderContext; /** * An HTTP/1 server response. @@ -261,7 +263,31 @@ public void commit() { public > X sink(GenericType sinkType) { for (SinkProvider p : SINK_PROVIDERS) { if (p.supports(sinkType, request)) { - return (X) p.create(this, this::handleSinkData, this::commit); + try { + return (X) p.create(new SinkProviderContext() { + @Override + public ServerResponse serverResponse() { + return Http1ServerResponse.this; + } + + @Override + public ConnectionContext connectionContext() { + return Http1ServerResponse.this.ctx; + } + + @Override + public Runnable closeRunnable() { + return () -> { + Http1ServerResponse.this.isSent = true; + afterSend(); + request.reset(); + }; + } + }); + } catch (UnsupportedOperationException e) { + // deprecated - will be removed in 5.x + return (X) p.create(this, this::handleSinkData, this::commit); + } } } // Request not acceptable if provider not found