diff --git a/examples/webserver/protocols/src/main/proto/strings.proto b/examples/webserver/protocols/src/main/proto/strings.proto index d1a4f7be178..756b2246797 100644 --- a/examples/webserver/protocols/src/main/proto/strings.proto +++ b/examples/webserver/protocols/src/main/proto/strings.proto @@ -20,7 +20,6 @@ option java_package = "io.helidon.examples.grpc.strings"; service StringService { rpc Upper (StringMessage) returns (StringMessage) {} - rpc Lower (StringMessage) returns (StringMessage) {} rpc Split (StringMessage) returns (stream StringMessage) {} rpc Join (stream StringMessage) returns (StringMessage) {} rpc Echo (stream StringMessage) returns (stream StringMessage) {} diff --git a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java index 77b6bcf7ed3..3706d4e9f1f 100644 --- a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java +++ b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java @@ -145,7 +145,6 @@ public void onCompleted() { void testUnaryUpper() { Channel channel = new GrpcChannel(grpcClient); StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel); - Strings.StringMessage message = Strings.StringMessage.newBuilder().setText("hello").build(); Strings.StringMessage res = service.upper(newStringMessage("hello")); assertThat(res.getText(), is("HELLO")); } @@ -160,6 +159,16 @@ void testUnaryUpperAsync() throws ExecutionException, InterruptedException, Time assertThat(res.getText(), is("HELLO")); } + @Test + void testServerStreamingSplit() { + Channel channel = new GrpcChannel(grpcClient); + StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel); + Iterator res = service.split(newStringMessage("hello world")); + assertThat(res.next().getText(), is("hello")); + assertThat(res.next().getText(), is("world")); + assertThat(res.hasNext(), is(false)); + } + @Test void testServerStreamingSplitAsync() throws ExecutionException, InterruptedException, TimeoutException { Channel channel = new GrpcChannel(grpcClient); diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcChannel.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcChannel.java index 52a88dde0d9..f885af221f7 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcChannel.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcChannel.java @@ -16,16 +16,25 @@ package io.helidon.webclient.grpc; +import io.helidon.webclient.api.ClientUri; + import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.MethodDescriptor; -import io.helidon.webclient.api.ClientUri; +/** + * Helidon's implementation of a gRPC {@link Channel}. + */ public class GrpcChannel extends Channel { private final GrpcClientImpl grpcClient; + /** + * Creates a new channel from a {@link GrpcClient}. + * + * @param grpcClient the gRPC client + */ public GrpcChannel(GrpcClient grpcClient) { this.grpcClient = (GrpcClientImpl) grpcClient; } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java index c52fe66a8e6..029c713c0b4 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java @@ -27,10 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Status; import io.helidon.common.buffers.BufferData; import io.helidon.common.tls.Tls; import io.helidon.http.Header; @@ -54,6 +50,11 @@ import io.helidon.webclient.http2.Http2StreamConfig; import io.helidon.webclient.http2.StreamTimeoutException; +import io.grpc.ClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; + /** * A gRPC client call handler. The typical order of calls will be: * @@ -162,6 +163,8 @@ public void request(int numMessages) { public void cancel(String message, Throwable cause) { LOGGER.finest(() -> "cancel called " + message); responseListener.onClose(Status.CANCELLED, new Metadata()); + readStreamFuture.cancel(true); + writeStreamFuture.cancel(true); close(); } @@ -262,11 +265,10 @@ private void startStreamingThreads() { private void close() { LOGGER.finest("closing client call"); - readStreamFuture.cancel(true); - writeStreamFuture.cancel(true); sendingQueue.clear(); clientStream.cancel(); connection.close(); + LOGGER.finest("closing client call ends"); } private ClientConnection clientConnection() { diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientMethodDescriptor.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientMethodDescriptor.java index 81a1630dbda..3c35e380477 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientMethodDescriptor.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientMethodDescriptor.java @@ -20,14 +20,15 @@ import java.util.Collections; import java.util.List; -import io.grpc.CallCredentials; -import io.grpc.ClientInterceptor; -import io.grpc.MethodDescriptor; import io.helidon.grpc.core.InterceptorPriorities; import io.helidon.grpc.core.MarshallerSupplier; import io.helidon.grpc.core.MethodHandler; import io.helidon.grpc.core.PriorityBag; +import io.grpc.CallCredentials; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; + /** * Encapsulates all metadata necessary to define a gRPC method. In addition to wrapping * a {@link io.grpc.MethodDescriptor}, this class also holds the request and response @@ -203,6 +204,11 @@ public MethodDescriptor descriptor() { return (MethodDescriptor) descriptor; } + /** + * Returns the {@link MethodDescriptor.MethodType} of this method. + * + * @return the method type + */ public MethodDescriptor.MethodType type() { return descriptor.getType(); } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java index 84e96b1191d..b701edd8f9c 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java @@ -23,6 +23,10 @@ import io.helidon.webclient.http2.Http2ClientConfig; class GrpcClientProtocol { + + private GrpcClientProtocol() { + } + static GrpcClientStream create(SocketContext scoketContext, Http2Settings serverSettings, Http2ClientConfig clientConfig, diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java index 3c63f56ae8e..7ea3122e521 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java @@ -19,6 +19,9 @@ import io.helidon.webclient.api.WebClient; import io.helidon.webclient.spi.ClientProtocolProvider; +/** + * Provider for {@link GrpcClient}. + */ public class GrpcProtocolProvider implements ClientProtocolProvider { static final String CONFIG_KEY = "grpc"; diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java index c31d3258714..4af0bede8f6 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java @@ -26,6 +26,7 @@ * @see io.helidon.webclient.grpc.GrpcClient#serviceClient(io.helidon.webclient.grpc.GrpcServiceDescriptor) */ public interface GrpcServiceClient { + /** * Name of the service this client was created for. * @@ -33,19 +34,91 @@ public interface GrpcServiceClient { */ String serviceName(); + /** + * Blocking gRPC unary call. + * + * @param methodName method name + * @param request the request + * @return the response + * @param type of request + * @param type of response + */ ResT unary(String methodName, ReqT request); + /** + * Asynchronous gRPC unary call. + * + * @param methodName method name + * @param request the request + * @param response the response observer + * @param type of request + * @param type of response + */ void unary(String methodName, ReqT request, StreamObserver response); + /** + * Blocking gRPC server stream call. + * + * @param methodName method name + * @param request the request + * @return the response iterator + * @param type of request + * @param type of response + */ Iterator serverStream(String methodName, ReqT request); + /** + * Asynchronous gRPC server stream call. + * + * @param methodName method name + * @param request the request + * @param response the response observer + * @param type of request + * @param type of response + */ void serverStream(String methodName, ReqT request, StreamObserver response); + /** + * Blocking gRPC client stream call. + * + * @param methodName method name + * @param request the request iterator + * @return the response + * @param type of request + * @param type of response + */ ResT clientStream(String methodName, Iterator request); + /** + * Asynchronous gRPC client stream call. + * + * @param methodName method name + * @param response the response observer + * @return the request observer + * @param type of request + * @param type of response + */ StreamObserver clientStream(String methodName, StreamObserver response); + /** + * gRPC bidirectional call using {@link Iterator}. + * + * @param methodName method name + * @param request request iterator + * @return response iterator + * @param type of request + * @param type of response + */ Iterator bidi(String methodName, Iterator request); + /** + * gRPC bidirectional call using {@link StreamObserver}. + * + * @param methodName method name + * @param response the response observer + * @return the request observer + * @param type of request + * @param type of response + */ StreamObserver bidi(String methodName, StreamObserver response); } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptorBlueprint.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptorBlueprint.java index 00fff468699..0693528e6fc 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptorBlueprint.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptorBlueprint.java @@ -20,11 +20,12 @@ import java.util.NoSuchElementException; import java.util.Optional; -import io.grpc.CallCredentials; -import io.grpc.ClientInterceptor; import io.helidon.builder.api.Option; import io.helidon.builder.api.Prototype; +import io.grpc.CallCredentials; +import io.grpc.ClientInterceptor; + @Prototype.Blueprint interface GrpcServiceDescriptorBlueprint { diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/package-info.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/package-info.java new file mode 100644 index 00000000000..f7918d9e325 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Helidon WebClient HTTP/1.1 Support. + */ +package io.helidon.webclient.grpc; diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java index 286731ba1d9..72fc534aab1 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java index 32cb9b903f9..fa92a9e7561 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,9 @@ import static java.lang.System.Logger.Level.TRACE; import static java.lang.System.Logger.Level.WARNING; +/** + * Represents an HTTP2 connection on the client. + */ public class Http2ClientConnection { private static final System.Logger LOGGER = System.getLogger(Http2ClientConnection.class.getName()); private static final int FRAME_HEADER_LENGTH = 9; @@ -103,6 +106,14 @@ public class Http2ClientConnection { this.writer = new Http2ConnectionWriter(connection.helidonSocket(), connection.writer(), List.of()); } + /** + * Creates an HTTP2 client connection. + * + * @param http2Client the HTTP2 client + * @param connection the client connection + * @param sendSettings whether to send the settings or not + * @return an HTTP2 client connection + */ public static Http2ClientConnection create(Http2ClientImpl http2Client, ClientConnection connection, boolean sendSettings) { @@ -136,6 +147,11 @@ Http2ClientStream stream(int streamId) { } + /** + * Stream ID sequence. + * + * @return the ID sequence + */ public LockingStreamIdSequence streamIdSequence() { return streamIdSeq; } @@ -151,6 +167,12 @@ Http2ClientStream createStream(Http2StreamConfig config) { return stream; } + /** + * Adds a stream to the connection. + * + * @param streamId the stream ID + * @param stream the stream + */ public void addStream(int streamId, Http2ClientStream stream) { Lock lock = streamsLock.writeLock(); lock.lock(); @@ -161,6 +183,11 @@ public void addStream(int streamId, Http2ClientStream stream) { } } + /** + * Removes a stream from the connection. + * + * @param streamId the stream ID + */ public void removeStream(int streamId) { Lock lock = streamsLock.writeLock(); lock.lock(); @@ -206,6 +233,9 @@ void updateLastStreamId(int lastStreamId) { this.lastStreamId = lastStreamId; } + /** + * Closes this connection. + */ public void close() { this.goAway(0, Http2ErrorCode.NO_ERROR, "Closing connection"); if (state.getAndSet(State.CLOSED) != State.CLOSED) { diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientImpl.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientImpl.java index c45c66f44e1..2381459b45e 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientImpl.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,9 @@ import io.helidon.webclient.api.WebClient; import io.helidon.webclient.spi.HttpClientSpi; +/** + * Implementation of HTTP2 client. + */ public class Http2ClientImpl implements Http2Client, HttpClientSpi { private final WebClient webClient; private final Http2ClientConfig clientConfig; diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java index 9e38315ed86..e979f149678 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,10 @@ import static java.lang.System.Logger.Level.DEBUG; +/** + * Represents an HTTP2 client stream. This class is not intended to be used by + * applications, it is only public internally within Helidon. + */ public class Http2ClientStream implements Http2Stream, ReleasableResource { private static final System.Logger LOGGER = System.getLogger(Http2ClientStream.class.getName()); @@ -182,6 +186,11 @@ void trailers(Http2Headers headers, boolean endOfStream) { trailers.complete(headers.httpHeaders()); } + /** + * Future that shall be completed once trailers are received. + * + * @return the completable future + */ public CompletableFuture trailers() { return trailers; } @@ -190,6 +199,9 @@ boolean hasEntity() { return hasEntity; } + /** + * Cancels this client stream. + */ public void cancel() { if (NON_CANCELABLE.contains(state)) { return; @@ -206,6 +218,9 @@ public void cancel() { } } + /** + * Removes the stream from underlying connection. + */ public void close() { connection.removeStream(streamId); } @@ -227,6 +242,11 @@ BufferData read(int i) { return read(); } + /** + * Reads a buffer data from the stream. + * + * @return the buffer data + */ public BufferData read() { while (state == Http2StreamState.HALF_CLOSED_LOCAL && readState != ReadState.END && hasEntity) { Http2FrameData frameData = readOne(timeout); @@ -258,6 +278,12 @@ Status waitFor100Continue() { return null; } + /** + * Writes HTTP2 headers to the stream. + * + * @param http2Headers the headers + * @param endOfStream end of stream marker + */ public void writeHeaders(Http2Headers http2Headers, boolean endOfStream) { this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true); this.readState = readState.check(http2Headers.httpHeaders().contains(HeaderValues.EXPECT_100) @@ -294,6 +320,12 @@ public void writeHeaders(Http2Headers http2Headers, boolean endOfStream) { } } + /** + * Writes a buffer data into the stream. + * + * @param entityBytes buffer data + * @param endOfStream end of stream marker + */ public void writeData(BufferData entityBytes, boolean endOfStream) { Http2FrameHeader frameHeader = Http2FrameHeader.create(entityBytes.available(), Http2FrameTypes.DATA, @@ -305,6 +337,11 @@ public void writeData(BufferData entityBytes, boolean endOfStream) { splitAndWrite(frameData); } + /** + * Reads headers from this stream. + * + * @return the headers + */ public Http2Headers readHeaders() { while (readState == ReadState.HEADERS) { Http2FrameData frameData = readOne(timeout); @@ -315,10 +352,21 @@ public Http2Headers readHeaders() { return currentHeaders; } + /** + * Returns the socket context associated with the stream. + * + * @return the socket context + */ public SocketContext ctx() { return ctx; } + /** + * Reads an HTTP2 frame from the stream. + * + * @param pollTimeout timeout + * @return the data frame + */ public Http2FrameData readOne(Duration pollTimeout) { Http2FrameData frameData = buffer.poll(pollTimeout); diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java index f50d1e36758..623e5315bb5 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,9 @@ import io.helidon.webclient.http1.Http1ClientResponse; import io.helidon.webclient.spi.ClientConnectionCache; +/** + * A cache of HTTP2 connections. + */ public final class Http2ConnectionCache extends ClientConnectionCache { private static final Http2ConnectionCache SHARED = new Http2ConnectionCache(true); private final LruCache http2Supported = LruCache.builder() @@ -41,10 +44,20 @@ private Http2ConnectionCache(boolean shared) { super(shared); } + /** + * Returns a reference to the shared connection cache. + * + * @return shared connection cache + */ public static Http2ConnectionCache shared() { return SHARED; } + /** + * Creates a fresh connection cache. + * + * @return new connection cache + */ public static Http2ConnectionCache create() { return new Http2ConnectionCache(false); } diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2StreamConfig.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2StreamConfig.java index 29bc30b5032..61cffcf6b9d 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2StreamConfig.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2StreamConfig.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,10 +18,29 @@ import java.time.Duration; +/** + * Configuration for an HTTP2 stream. + */ public interface Http2StreamConfig { + + /** + * Prior knowledge setting. + * + * @return prior knowledge setting + */ boolean priorKnowledge(); + /** + * Stream priority. + * + * @return the stream priority + */ int priority(); + /** + * Read timeout for this stream. + * + * @return the timeout + */ Duration readTimeout(); } diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/LockingStreamIdSequence.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/LockingStreamIdSequence.java index 6df24cc34c9..cb6b53c0084 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/LockingStreamIdSequence.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/LockingStreamIdSequence.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +/** + * A stream ID sequence that uses locks for concurrency. + */ public class LockingStreamIdSequence { private final AtomicInteger streamIdSeq = new AtomicInteger(0);