From 32463ef52a09d564ce5c1025262e7882d409646a Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 18 Apr 2024 22:18:29 -0400 Subject: [PATCH 1/3] Copy message just as part of publish to keep headers mutable until then. --- .../java/io/nats/client/impl/Headers.java | 62 +++++++++--- .../io/nats/client/impl/IncomingMessage.java | 16 +++- .../io/nats/client/impl/NatsConnection.java | 55 +++++------ .../client/impl/NatsConnectionWriter.java | 11 ++- .../io/nats/client/impl/NatsJetStream.java | 36 +++---- .../nats/client/impl/NatsJetStreamImpl.java | 4 +- .../java/io/nats/client/impl/NatsMessage.java | 96 +++++++++---------- .../client/impl/NatsPublishableMessage.java | 80 ++++++++++++++++ .../io/nats/client/impl/ProtocolMessage.java | 21 ++-- .../nats/client/impl/PushMessageManager.java | 2 +- .../io/nats/client/impl/HeadersTests.java | 10 -- .../nats/client/impl/MessageManagerTests.java | 2 +- .../io/nats/client/impl/NatsMessageTests.java | 50 +++++++--- 13 files changed, 297 insertions(+), 148 deletions(-) create mode 100644 src/main/java/io/nats/client/impl/NatsPublishableMessage.java diff --git a/src/main/java/io/nats/client/impl/Headers.java b/src/main/java/io/nats/client/impl/Headers.java index dba1d62ef..510fe323b 100644 --- a/src/main/java/io/nats/client/impl/Headers.java +++ b/src/main/java/io/nats/client/impl/Headers.java @@ -92,10 +92,13 @@ public Headers(Headers headers, boolean readOnly, String[] keysNotToCopy) { * -or- if any value contains invalid characters */ public Headers add(String key, String... values) { - if (values != null) { - _add(key, Arrays.asList(values)); + if (readOnly) { + throw new UnsupportedOperationException(); } - return this; + if (values == null || values.length == 0) { + return this; + } + return _add(key, Arrays.asList(values)); } /** @@ -109,12 +112,17 @@ public Headers add(String key, String... values) { * -or- if any value contains invalid characters */ public Headers add(String key, Collection values) { - _add(key, values); - return this; + if (readOnly) { + throw new UnsupportedOperationException(); + } + if (values == null || values.isEmpty()) { + return this; + } + return _add(key, values); } // the add delegate - private void _add(String key, Collection values) { + private Headers _add(String key, Collection values) { if (values != null) { Checker checked = new Checker(key, values); if (checked.hasValues()) { @@ -129,6 +137,7 @@ private void _add(String key, Collection values) { serialized = null; // since the data changed, clear this so it's rebuilt } } + return this; } /** @@ -143,10 +152,13 @@ private void _add(String key, Collection values) { * -or- if any value contains invalid characters */ public Headers put(String key, String... values) { - if (values != null) { - _put(key, Arrays.asList(values)); + if (readOnly) { + throw new UnsupportedOperationException(); } - return this; + if (values == null || values.length == 0) { + return this; + } + return _put(key, Arrays.asList(values)); } /** @@ -161,8 +173,13 @@ public Headers put(String key, String... values) { * -or- if any value contains invalid characters */ public Headers put(String key, Collection values) { - _put(key, values); - return this; + if (readOnly) { + throw new UnsupportedOperationException(); + } + if (values == null || values.isEmpty()) { + return this; + } + return _put(key, values); } /** @@ -173,14 +190,20 @@ public Headers put(String key, Collection values) { * @return the Headers object */ public Headers put(Map> map) { + if (readOnly) { + throw new UnsupportedOperationException(); + } + if (map == null || map.isEmpty()) { + return this; + } for (String key : map.keySet() ) { - put(key, map.get(key)); + _put(key, map.get(key)); } return this; } - // the put delegate that all puts call - private void _put(String key, Collection values) { + // the put delegate + private Headers _put(String key, Collection values) { if (key == null || key.isEmpty()) { throw new IllegalArgumentException("Key cannot be null or empty."); } @@ -195,6 +218,7 @@ private void _put(String key, Collection values) { serialized = null; // since the data changed, clear this so it's rebuilt } } + return this; } /** @@ -203,6 +227,9 @@ private void _put(String key, Collection values) { * @param keys the key or keys to remove */ public void remove(String... keys) { + if (readOnly) { + throw new UnsupportedOperationException(); + } for (String key : keys) { _remove(key); } @@ -215,12 +242,16 @@ public void remove(String... keys) { * @param keys the key or keys to remove */ public void remove(Collection keys) { + if (readOnly) { + throw new UnsupportedOperationException(); + } for (String key : keys) { _remove(key); } serialized = null; // since the data changed, clear this so it's rebuilt } + // the remove delegate private void _remove(String key) { // if the values had a key, then the data length had a length if (valuesMap.remove(key) != null) { @@ -250,6 +281,9 @@ public boolean isEmpty() { * Removes all the keys The object map will be empty after this call returns. */ public void clear() { + if (readOnly) { + throw new UnsupportedOperationException(); + } valuesMap.clear(); lengthMap.clear(); dataLength = 0; diff --git a/src/main/java/io/nats/client/impl/IncomingMessage.java b/src/main/java/io/nats/client/impl/IncomingMessage.java index a108f0af2..002bc70c9 100644 --- a/src/main/java/io/nats/client/impl/IncomingMessage.java +++ b/src/main/java/io/nats/client/impl/IncomingMessage.java @@ -13,13 +13,27 @@ package io.nats.client.impl; +import io.nats.client.support.ByteArrayBuilder; + public class IncomingMessage extends NatsMessage { - IncomingMessage() {} + IncomingMessage() { + super((byte[])null); + } IncomingMessage(byte[] data) { super(data); } + @Override + protected void calculate() { + // intentionally does nothing + } + + @Override + ByteArrayBuilder getProtocolBab() { + throw new IllegalStateException("getProtocolBab not supported for this type of message."); + } + @Override byte[] getProtocolBytes() { throw new IllegalStateException("getProtocolBytes not supported for this type of message."); diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index ebf62f16c..5d830ef4f 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -836,7 +836,7 @@ void cleanUpPongQueue() { */ @Override public void publish(String subject, byte[] body) { - publishInternal(subject, null, null, body); + publishInternal(subject, null, null, body, true); } /** @@ -844,7 +844,7 @@ public void publish(String subject, byte[] body) { */ @Override public void publish(String subject, Headers headers, byte[] body) { - publishInternal(subject, null, headers, body); + publishInternal(subject, null, headers, body, true); } /** @@ -852,7 +852,7 @@ public void publish(String subject, Headers headers, byte[] body) { */ @Override public void publish(String subject, String replyTo, byte[] body) { - publishInternal(subject, replyTo, null, body); + publishInternal(subject, replyTo, null, body, true); } /** @@ -860,7 +860,7 @@ public void publish(String subject, String replyTo, byte[] body) { */ @Override public void publish(String subject, String replyTo, Headers headers, byte[] body) { - publishInternal(subject, replyTo, headers, body); + publishInternal(subject, replyTo, headers, body, true); } /** @@ -869,12 +869,15 @@ public void publish(String subject, String replyTo, Headers headers, byte[] body @Override public void publish(Message message) { validateNotNull(message, "Message"); - publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData()); + publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false); } - void publishInternal(String subject, String replyTo, Headers headers, byte[] data) { - checkIfNeedsHeaderSupport(headers); + void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubRep) { checkPayloadSize(data); + NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubRep); + if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) { + throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion()); + } if (isClosed()) { throw new IllegalStateException("Connection is Closed"); @@ -882,22 +885,14 @@ void publishInternal(String subject, String replyTo, Headers headers, byte[] dat throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs } - NatsMessage nm = new NatsMessage(subject, replyTo, new Headers(headers), data); - Connection.Status stat = this.status; if ((stat == Status.RECONNECTING || stat == Status.DISCONNECTED) - && !this.writer.canQueueDuringReconnect(nm)) { + && !this.writer.canQueueDuringReconnect(npm)) { throw new IllegalStateException( "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize()); } - queueOutgoing(nm); - } - private void checkIfNeedsHeaderSupport(Headers headers) { - if (headers != null && !headers.isEmpty() && !serverInfo.get().isHeadersSupported()) { - throw new IllegalArgumentException( - "Headers are not supported by the server, version: " + serverInfo.get().getVersion()); - } + queueOutgoing(npm); } private void checkPayloadSize(byte[] body) { @@ -1099,7 +1094,7 @@ else if (future.isDone()) { */ @Override public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException { - return requestInternal(subject, null, body, timeout, cancelAction); + return requestInternal(subject, null, body, timeout, cancelAction, true); } /** @@ -1107,7 +1102,7 @@ public Message request(String subject, byte[] body, Duration timeout) throws Int */ @Override public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException { - return requestInternal(subject, headers, body, timeout, cancelAction); + return requestInternal(subject, headers, body, timeout, cancelAction, true); } /** @@ -1116,11 +1111,11 @@ public Message request(String subject, Headers headers, byte[] body, Duration ti @Override public Message request(Message message, Duration timeout) throws InterruptedException { validateNotNull(message, "Message"); - return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction); + return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false); } - Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction) throws InterruptedException { - CompletableFuture incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction); + Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubRep) throws InterruptedException { + CompletableFuture incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubRep); try { return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS); } catch (TimeoutException | ExecutionException | CancellationException e) { @@ -1133,7 +1128,7 @@ Message requestInternal(String subject, Headers headers, byte[] data, Duration t */ @Override public CompletableFuture request(String subject, byte[] body) { - return requestFutureInternal(subject, null, body, null, cancelAction); + return requestFutureInternal(subject, null, body, null, cancelAction, true); } /** @@ -1141,7 +1136,7 @@ public CompletableFuture request(String subject, byte[] body) { */ @Override public CompletableFuture request(String subject, Headers headers, byte[] body) { - return requestFutureInternal(subject, headers, body, null, cancelAction); + return requestFutureInternal(subject, headers, body, null, cancelAction, true); } /** @@ -1149,7 +1144,7 @@ public CompletableFuture request(String subject, Headers headers, byte[ */ @Override public CompletableFuture requestWithTimeout(String subject, byte[] body, Duration timeout) { - return requestFutureInternal(subject, null, body, timeout, cancelAction); + return requestFutureInternal(subject, null, body, timeout, cancelAction, true); } /** @@ -1157,7 +1152,7 @@ public CompletableFuture requestWithTimeout(String subject, byte[] body */ @Override public CompletableFuture requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) { - return requestFutureInternal(subject, headers, body, timeout, cancelAction); + return requestFutureInternal(subject, headers, body, timeout, cancelAction, true); } /** @@ -1166,7 +1161,7 @@ public CompletableFuture requestWithTimeout(String subject, Headers hea @Override public CompletableFuture requestWithTimeout(Message message, Duration timeout) { validateNotNull(message, "Message"); - return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction); + return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false); } /** @@ -1175,10 +1170,10 @@ public CompletableFuture requestWithTimeout(Message message, Duration t @Override public CompletableFuture request(Message message) { validateNotNull(message, "Message"); - return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction); + return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false); } - CompletableFuture requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction) { + CompletableFuture requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction, boolean validateSubRep) { checkPayloadSize(data); if (isClosed()) { @@ -1230,7 +1225,7 @@ CompletableFuture requestFutureInternal(String subject, Headers headers responsesAwaiting.put(sub.getSID(), future); } - publishInternal(subject, responseInbox, headers, data); + publishInternal(subject, responseInbox, headers, data, validateSubRep); writer.flushBuffer(); statistics.incrementRequestsSent(); diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index 8e1ecf349..e421668d0 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -15,6 +15,7 @@ import io.nats.client.Options; import io.nats.client.StatisticsCollector; +import io.nats.client.support.ByteArrayBuilder; import java.io.IOException; import java.nio.BufferOverflowException; @@ -125,8 +126,7 @@ Future stop() { // Clear old ping/pong requests this.outgoing.filter((msg) -> msg.isProtocol() && - (msg.protocolBab.equals(OP_PING_BYTES) || msg.protocolBab.equals(OP_PONG_BYTES))); - + (msg.getProtocolBab().equals(OP_PING_BYTES) || msg.getProtocolBab().equals(OP_PONG_BYTES))); } finally { this.startStopLock.unlock(); @@ -162,9 +162,10 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st } } - int blen = msg.protocolBab.length(); - System.arraycopy(msg.protocolBab.internalArray(), 0, sendBuffer, sendPosition, blen); - sendPosition += blen; + ByteArrayBuilder bab = msg.getProtocolBab(); + int babLen = bab.length(); + System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen); + sendPosition += babLen; sendBuffer[sendPosition++] = CR; sendBuffer[sendPosition++] = LF; diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 27660ccc2..edce8d1c9 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -47,7 +47,7 @@ public NatsJetStream(NatsConnection connection, JetStreamOptions jsOptions) thro */ @Override public PublishAck publish(String subject, byte[] body) throws IOException, JetStreamApiException { - return publishSyncInternal(subject, null, body, null); + return publishSyncInternal(subject, null, body, null, true); } /** @@ -55,7 +55,7 @@ public PublishAck publish(String subject, byte[] body) throws IOException, JetSt */ @Override public PublishAck publish(String subject, Headers headers, byte[] body) throws IOException, JetStreamApiException { - return publishSyncInternal(subject, headers, body, null); + return publishSyncInternal(subject, headers, body, null, true); } /** @@ -63,7 +63,7 @@ public PublishAck publish(String subject, Headers headers, byte[] body) throws I */ @Override public PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, JetStreamApiException { - return publishSyncInternal(subject, null, body, options); + return publishSyncInternal(subject, null, body, options, true); } /** @@ -71,7 +71,7 @@ public PublishAck publish(String subject, byte[] body, PublishOptions options) t */ @Override public PublishAck publish(String subject, Headers headers, byte[] body, PublishOptions options) throws IOException, JetStreamApiException { - return publishSyncInternal(subject, headers, body, options); + return publishSyncInternal(subject, headers, body, options, true); } /** @@ -80,7 +80,7 @@ public PublishAck publish(String subject, Headers headers, byte[] body, PublishO @Override public PublishAck publish(Message message) throws IOException, JetStreamApiException { validateNotNull(message, "Message"); - return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null); + return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, false); } /** @@ -89,7 +89,7 @@ public PublishAck publish(Message message) throws IOException, JetStreamApiExcep @Override public PublishAck publish(Message message, PublishOptions options) throws IOException, JetStreamApiException { validateNotNull(message, "Message"); - return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options); + return publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, false); } /** @@ -97,7 +97,7 @@ public PublishAck publish(Message message, PublishOptions options) throws IOExce */ @Override public CompletableFuture publishAsync(String subject, byte[] body) { - return publishAsyncInternal(subject, null, body, null, null); + return publishAsyncInternal(subject, null, body, null, null, true); } /** @@ -105,7 +105,7 @@ public CompletableFuture publishAsync(String subject, byte[] body) { */ @Override public CompletableFuture publishAsync(String subject, Headers headers, byte[] body) { - return publishAsyncInternal(subject, headers, body, null, null); + return publishAsyncInternal(subject, headers, body, null, null, true); } /** @@ -113,7 +113,7 @@ public CompletableFuture publishAsync(String subject, Headers header */ @Override public CompletableFuture publishAsync(String subject, byte[] body, PublishOptions options) { - return publishAsyncInternal(subject, null, body, options, null); + return publishAsyncInternal(subject, null, body, options, null, true); } /** @@ -121,7 +121,7 @@ public CompletableFuture publishAsync(String subject, byte[] body, P */ @Override public CompletableFuture publishAsync(String subject, Headers headers, byte[] body, PublishOptions options) { - return publishAsyncInternal(subject, headers, body, options, null); + return publishAsyncInternal(subject, headers, body, options, null, true); } /** @@ -130,7 +130,7 @@ public CompletableFuture publishAsync(String subject, Headers header @Override public CompletableFuture publishAsync(Message message) { validateNotNull(message, "Message"); - return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, null); + return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, null, false); } /** @@ -139,32 +139,32 @@ public CompletableFuture publishAsync(Message message) { @Override public CompletableFuture publishAsync(Message message, PublishOptions options) { validateNotNull(message, "Message"); - return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, null); + return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, null, false); } - private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options) throws IOException, JetStreamApiException { + private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubRep) throws IOException, JetStreamApiException { Headers merged = mergePublishOptions(headers, options); if (jso.isPublishNoAck()) { - conn.publishInternal(subject, null, merged, data); + conn.publishInternal(subject, null, merged, data, validateSubRep); return null; } Duration timeout = options == null ? jso.getRequestTimeout() : options.getStreamTimeout(); - Message resp = makeInternalRequestResponseRequired(subject, merged, data, timeout, CancelAction.COMPLETE); + Message resp = makeInternalRequestResponseRequired(subject, merged, data, timeout, CancelAction.COMPLETE, validateSubRep); return processPublishResponse(resp, options); } - private CompletableFuture publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout) { + private CompletableFuture publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout, boolean validateSubRep) { Headers merged = mergePublishOptions(headers, options); if (jso.isPublishNoAck()) { - conn.publishInternal(subject, null, merged, data); + conn.publishInternal(subject, null, merged, data, validateSubRep); return null; } - CompletableFuture future = conn.requestFutureInternal(subject, merged, data, knownTimeout, CancelAction.COMPLETE); + CompletableFuture future = conn.requestFutureInternal(subject, merged, data, knownTimeout, CancelAction.COMPLETE, validateSubRep); return future.thenCompose(resp -> { try { diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index f25388e00..ad8eea3da 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -236,9 +236,9 @@ Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeo } } - Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction) throws IOException { + Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubRep) throws IOException { try { - return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction)); + return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubRep)); } catch (InterruptedException e) { throw new IOException(e); } diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index 8cc17fae5..06f80fd2a 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -37,7 +37,6 @@ public class NatsMessage implements Message { protected String subject; protected String replyTo; protected byte[] data; - protected boolean utf8mode; protected Headers headers; // incoming specific : subject, replyTo, data and these fields @@ -45,11 +44,11 @@ public class NatsMessage implements Message { protected int controlLineLength; // protocol specific : just this field - protected ByteArrayBuilder protocolBab; + ByteArrayBuilder protocolBab; // housekeeping - protected int sizeInBytes = -1; - protected int headerLen = 0; + protected int sizeInBytes; + protected int headerLen; protected int dataLen; protected NatsSubscription subscription; @@ -70,16 +69,14 @@ protected NatsMessage(byte[] data) { dataLen = this.data.length; } - @Deprecated // Plans are to remove allowing utf8-mode + @Deprecated // utf8-mode is ignored public NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) { this(subject, replyTo, null, data); - this.utf8mode = utf8mode; } - @Deprecated // Plans are to remove allowing utf8-mode + @Deprecated // utf8-mode is ignored public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, boolean utf8mode) { this(subject, replyTo, headers, data); - this.utf8mode = utf8mode; } public NatsMessage(String subject, String replyTo, byte[] data) { @@ -90,36 +87,41 @@ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) this(data); this.subject = validateSubject(subject, true); this.replyTo = validateReplyTo(replyTo, false); - this.headers = readOnlyOf(headers); - this.utf8mode = false; - finishConstruct(); + this.headers = headers; } public NatsMessage(Message message) { this(message.getData()); this.subject = message.getSubject(); this.replyTo = message.getReplyTo(); - this.headers = readOnlyOf(message.getHeaders()); - this.utf8mode = message.isUtf8mode(); - finishConstruct(); + this.headers = message.getHeaders(); } - private static Headers readOnlyOf(Headers headers) { - if (headers == null || headers.isReadOnly()) { - return headers; - } - return new Headers(headers, true, null); + // ---------------------------------------------------------------------------------------------------- + // Client and Message Internal Methods + // ---------------------------------------------------------------------------------------------------- + boolean isProtocol() { + return false; // overridden in NatsMessage.ProtocolMessage } - protected void finishConstruct() { + private static final Headers EMPTY_READ_ONLY = new Headers(null, true, null); + + protected void calculate() { int replyToLen = replyTo == null ? 0 : replyTo.length(); - if (headers != null && !headers.isEmpty()) { - headerLen = headers.serializedLength(); + // headers get frozen (read only) at this point + if (headers == null) { + headerLen = 0; } - else { + else if (headers.isEmpty()) { + headers = EMPTY_READ_ONLY; headerLen = 0; } + else { + headers = headers.isReadOnly() ? headers : new Headers(headers, true, null); + headerLen = headers.serializedLength(); + } + int headerAndDataLen = headerLen + dataLen; // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases @@ -155,25 +157,39 @@ protected void finishConstruct() { sizeInBytes = controlLineLength + headerAndDataLen + 2; // The 2nd CRLFs } - // ---------------------------------------------------------------------------------------------------- - // Client and Message Internal Methods - // ---------------------------------------------------------------------------------------------------- - long getSizeInBytes() { - return sizeInBytes; + ByteArrayBuilder getProtocolBab() { + calculate(); + return protocolBab; } - boolean isProtocol() { - return false; // overridden in NatsMessage.ProtocolMessage + long getSizeInBytes() { + calculate(); + return sizeInBytes; } byte[] getProtocolBytes() { + calculate(); return protocolBab.toByteArray(); } int getControlLineLength() { + calculate(); return controlLineLength; } + /** + * @param destPosition the position index in destination byte array to start + * @param dest is the byte array to write to + * @return the length of the header + */ + int copyNotEmptyHeaders(int destPosition, byte[] dest) { + calculate(); + if (headerLen > 0) { + return headers.serializeToArray(destPosition, dest); + } + return 0; + } + void setSubscription(NatsSubscription sub) { subscription = sub; } @@ -217,18 +233,6 @@ public String getReplyTo() { return replyTo; } - /** - * @param destPosition the position index in destination byte array to start - * @param dest the byte array to write to - * @return the length of the header - */ - int copyNotEmptyHeaders(int destPosition, byte[] dest) { - if (headers != null && !headers.isEmpty()) { - return headers.serializeToArray(destPosition, dest); - } - return 0; - } - /** * {@inheritDoc} */ @@ -274,7 +278,7 @@ public byte[] getData() { */ @Override public boolean isUtf8mode() { - return utf8mode; + return false; } /** @@ -386,7 +390,6 @@ String toDetailString() { "\n subject='" + subject + '\'' + "\n replyTo='" + replyToString() + '\'' + "\n data=" + dataToString() + - "\n utf8mode=" + utf8mode + "\n headers=" + headersToString() + "\n sid='" + sid + '\'' + "\n protocolBytes=" + protocolBytesToString() + @@ -403,7 +406,6 @@ private String headersToString() { } private String dataToString() { - if (data.length == 0) { return ""; } @@ -444,7 +446,6 @@ public static class Builder { private String replyTo; private Headers headers; private byte[] data; - private boolean utf8mode; /** * Set the subject @@ -524,7 +525,6 @@ public Builder data(final byte[] data) { */ @Deprecated public Builder utf8mode(final boolean utf8mode) { - this.utf8mode = utf8mode; return this; } @@ -534,7 +534,7 @@ public Builder utf8mode(final boolean utf8mode) { * @return the {@code NatsMessage} */ public NatsMessage build() { - return new NatsMessage(subject, replyTo, headers, data, utf8mode); + return new NatsMessage(subject, replyTo, headers, data); } } } diff --git a/src/main/java/io/nats/client/impl/NatsPublishableMessage.java b/src/main/java/io/nats/client/impl/NatsPublishableMessage.java new file mode 100644 index 000000000..fcde8b17c --- /dev/null +++ b/src/main/java/io/nats/client/impl/NatsPublishableMessage.java @@ -0,0 +1,80 @@ +// Copyright 2015-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.support.ByteArrayBuilder; + +import static io.nats.client.support.Validator.validateReplyTo; +import static io.nats.client.support.Validator.validateSubject; + +class NatsPublishableMessage extends NatsMessage { + final boolean hasHeaders; + + public NatsPublishableMessage(boolean hasHeaders) { + this.hasHeaders = hasHeaders; + } + + public NatsPublishableMessage(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubRep) { + super(data); + this.subject = validateSubRep ? validateSubject(subject, true) : subject; + this.replyTo = validateSubRep ? validateReplyTo(replyTo, false) : replyTo; + if (headers == null || headers.isEmpty()) { + hasHeaders = false; + } + else { + hasHeaders = true; + headers = headers.isReadOnly() ? headers : new Headers(headers, true, null); + } + this.headers = new Headers(headers, false, null); + calculate(); + } + + @Override + ByteArrayBuilder getProtocolBab() { + // compared to base class, skips calling calculate() + return protocolBab; + } + + @Override + long getSizeInBytes() { + // compared to base class, skips calling calculate() + return sizeInBytes; + } + + @Override + byte[] getProtocolBytes() { + // compared to base class, skips calling calculate() + return protocolBab.toByteArray(); + } + + @Override + int getControlLineLength() { + // compared to base class, skips calling calculate() + return controlLineLength; + } + + /** + * @param destPosition the position index in destination byte array to start + * @param dest is the byte array to write to + * @return the length of the header + */ + @Override + int copyNotEmptyHeaders(int destPosition, byte[] dest) { + // compared to base class, skips calling calculate() + if (headerLen > 0) { + return headers.serializeToArray(destPosition, dest); + } + return 0; + } +} diff --git a/src/main/java/io/nats/client/impl/ProtocolMessage.java b/src/main/java/io/nats/client/impl/ProtocolMessage.java index 5bad98a5d..b4fa91ddd 100644 --- a/src/main/java/io/nats/client/impl/ProtocolMessage.java +++ b/src/main/java/io/nats/client/impl/ProtocolMessage.java @@ -16,18 +16,27 @@ import io.nats.client.support.ByteArrayBuilder; // ---------------------------------------------------------------------------------------------------- -// Protocol message is a special version of a NatsMessage +// Protocol message is a special version of a NatsPublishableMessage extends NatsMessage // ---------------------------------------------------------------------------------------------------- -class ProtocolMessage extends NatsMessage { +class ProtocolMessage extends NatsPublishableMessage { private static final ByteArrayBuilder EMPTY_BAB = new ByteArrayBuilder(); ProtocolMessage(ByteArrayBuilder babProtocol) { + super(false); protocolBab = babProtocol; sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data } ProtocolMessage(byte[] protocol) { - this(protocol == null ? EMPTY_BAB : new ByteArrayBuilder(protocol)); + super(false); + protocolBab = new ByteArrayBuilder(protocol); + sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data + } + + ProtocolMessage(ProtocolMessage pm) { + super(false); + protocolBab = pm.protocolBab; + sizeInBytes = controlLineLength = pm.sizeInBytes; } @Override @@ -35,8 +44,8 @@ boolean isProtocol() { return true; } - ProtocolMessage(ProtocolMessage pm) { - protocolBab = pm.protocolBab; - sizeInBytes = pm.sizeInBytes; + @Override + int copyNotEmptyHeaders(int destPosition, byte[] dest) { + return 0; // until a protocol messages gets headers, might as well shortcut this. } } diff --git a/src/main/java/io/nats/client/impl/PushMessageManager.java b/src/main/java/io/nats/client/impl/PushMessageManager.java index e59ed3b25..83cde9db2 100644 --- a/src/main/java/io/nats/client/impl/PushMessageManager.java +++ b/src/main/java/io/nats/client/impl/PushMessageManager.java @@ -124,7 +124,7 @@ private void processFlowControl(String fcSubject, FlowControlSource source) { // we may get multiple fc/hb messages with the same reply // only need to post to that subject once if (fcSubject != null && !fcSubject.equals(lastFcSubject)) { - conn.publishInternal(fcSubject, null, null, null); + conn.publishInternal(fcSubject, null, null, null, false); lastFcSubject = fcSubject; // set after publish in case the pub fails conn.executeCallback((c, el) -> el.flowControlProcessed(c, sub, fcSubject, source)); } diff --git a/src/test/java/io/nats/client/impl/HeadersTests.java b/src/test/java/io/nats/client/impl/HeadersTests.java index 2fd5e31fb..e7f96bf35 100644 --- a/src/test/java/io/nats/client/impl/HeadersTests.java +++ b/src/test/java/io/nats/client/impl/HeadersTests.java @@ -1,6 +1,5 @@ package io.nats.client.impl; -import io.nats.client.Message; import io.nats.client.support.IncomingHeadersProcessor; import io.nats.client.support.Status; import io.nats.client.support.Token; @@ -221,15 +220,6 @@ public void testReadOnly() { assertThrows(UnsupportedOperationException.class, () -> headers1.remove(KEY1)); assertThrows(UnsupportedOperationException.class, headers1::clear); assertEquals(VAL1, headers1.getFirst(KEY1)); - - Message m = new NatsMessage("subject", null, notRO, null); - Headers headers2 = m.getHeaders(); - assertTrue(headers2.isReadOnly()); - assertThrows(UnsupportedOperationException.class, () -> headers2.put(KEY1, VAL2)); - assertThrows(UnsupportedOperationException.class, () -> headers2.put(KEY1, VAL2)); - assertThrows(UnsupportedOperationException.class, () -> headers2.remove(KEY1)); - assertThrows(UnsupportedOperationException.class, headers2::clear); - assertEquals(VAL1, headers2.getFirst(KEY1)); } @Test diff --git a/src/test/java/io/nats/client/impl/MessageManagerTests.java b/src/test/java/io/nats/client/impl/MessageManagerTests.java index 26be707e0..504fb033b 100644 --- a/src/test/java/io/nats/client/impl/MessageManagerTests.java +++ b/src/test/java/io/nats/client/impl/MessageManagerTests.java @@ -582,7 +582,7 @@ public MockPublishInternal(Options options) { } @Override - void publishInternal(String subject, String replyTo, Headers headers, byte[] data) { + void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validate) { fcSubject = subject; ++pubCount; } diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index 5b8286357..d6bcac2ea 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -23,11 +23,9 @@ import java.util.List; import static io.nats.client.utils.ResourceUtils.dataAsLines; -import static io.nats.client.utils.TestBase.assertByteArraysEqual; -import static io.nats.client.utils.TestBase.standardConnectionWait; import static org.junit.jupiter.api.Assertions.*; -public class NatsMessageTests { +public class NatsMessageTests extends JetStreamTestBase { @Test public void testSizeOnProtocolMessage() { NatsMessage msg = new ProtocolMessage("PING".getBytes()); @@ -246,26 +244,25 @@ public void miscCoverage() { assertNull(m.getHeaders()); assertNotNull(m.toString()); // COVERAGE - ProtocolMessage pm = new ProtocolMessage((byte[])null); - assertNotNull(pm.protocolBab); - assertEquals(0, pm.protocolBab.length()); + ProtocolMessage pm = new ProtocolMessage(new byte[0]); + assertNotNull(pm.getProtocolBab()); + assertEquals(0, pm.getProtocolBab().length()); + assertEquals(2, pm.getSizeInBytes()); + assertEquals(2, pm.getControlLineLength()); IncomingMessage scm = new IncomingMessage() {}; - assertNull(scm.protocolBab); + assertEquals(0, scm.getSizeInBytes()); + assertThrows(IllegalStateException.class, scm::getProtocolBab); assertThrows(IllegalStateException.class, scm::getProtocolBytes); assertThrows(IllegalStateException.class, scm::getControlLineLength); // coverage coverage coverage //noinspection deprecation NatsMessage nmCov = new NatsMessage("sub", "reply", null, true); + nmCov.calculate(); assertTrue(nmCov.toDetailString().contains("PUB sub reply 0")); assertTrue(nmCov.toDetailString().contains("next=No")); - - nmCov.protocolBab = null; - nmCov.next = nmCov; - assertTrue(nmCov.toDetailString().contains("protocolBytes=null")); - assertTrue(nmCov.toDetailString().contains("next=Yes")); } @Test @@ -307,4 +304,33 @@ private NatsMessage testMessage() { .data("data", StandardCharsets.US_ASCII) .build(); } + + @Test + public void testHeadersBecomeImmutable() throws Exception { + jsServer.run(connection -> { + String subject = subject(); + Subscription sub = connection.subscribe(subject); + + Headers h = new Headers(); + h.put("one", "A"); + Message m = new NatsMessage(subject, null, h, null); + connection.publish(m); + Message incoming = sub.nextMessage(1000); + assertEquals(1, incoming.getHeaders().size()); + + // headers are no longer copied, just referenced + // so this will affect the message which is the same + // as the local copy + h.put("two", "B"); + connection.publish(m); + incoming = sub.nextMessage(1000); + assertEquals(2, incoming.getHeaders().size()); + + // also if you get the headers from the message + m.getHeaders().put("three", "C"); + connection.publish(m); + incoming = sub.nextMessage(1000); + assertEquals(3, incoming.getHeaders().size()); + }); + } } \ No newline at end of file From 5f3b85d3a77f374e76c40f3e2d1c8ba4cb46aa8b Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 18 Apr 2024 22:19:42 -0400 Subject: [PATCH 2/3] rename test method --- .../io/nats/client/impl/NatsMessageTests.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index d6bcac2ea..94edc0c55 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -306,31 +306,31 @@ private NatsMessage testMessage() { } @Test - public void testHeadersBecomeImmutable() throws Exception { + public void testHeadersMutableBeforePublish() throws Exception { jsServer.run(connection -> { - String subject = subject(); - Subscription sub = connection.subscribe(subject); - - Headers h = new Headers(); - h.put("one", "A"); - Message m = new NatsMessage(subject, null, h, null); - connection.publish(m); - Message incoming = sub.nextMessage(1000); - assertEquals(1, incoming.getHeaders().size()); - - // headers are no longer copied, just referenced - // so this will affect the message which is the same - // as the local copy - h.put("two", "B"); - connection.publish(m); - incoming = sub.nextMessage(1000); - assertEquals(2, incoming.getHeaders().size()); - - // also if you get the headers from the message - m.getHeaders().put("three", "C"); - connection.publish(m); - incoming = sub.nextMessage(1000); - assertEquals(3, incoming.getHeaders().size()); + String subject = subject(); + Subscription sub = connection.subscribe(subject); + + Headers h = new Headers(); + h.put("one", "A"); + Message m = new NatsMessage(subject, null, h, null); + connection.publish(m); + Message incoming = sub.nextMessage(1000); + assertEquals(1, incoming.getHeaders().size()); + + // headers are no longer copied, just referenced + // so this will affect the message which is the same + // as the local copy + h.put("two", "B"); + connection.publish(m); + incoming = sub.nextMessage(1000); + assertEquals(2, incoming.getHeaders().size()); + + // also if you get the headers from the message + m.getHeaders().put("three", "C"); + connection.publish(m); + incoming = sub.nextMessage(1000); + assertEquals(3, incoming.getHeaders().size()); }); } } \ No newline at end of file From 4577fa9e144494d63d3ba272238b5cc5095cf14a Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 18 Apr 2024 22:20:24 -0400 Subject: [PATCH 3/3] rename test method --- .../io/nats/client/impl/NatsMessageTests.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index 94edc0c55..4422d379f 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -308,29 +308,29 @@ private NatsMessage testMessage() { @Test public void testHeadersMutableBeforePublish() throws Exception { jsServer.run(connection -> { - String subject = subject(); - Subscription sub = connection.subscribe(subject); - - Headers h = new Headers(); - h.put("one", "A"); - Message m = new NatsMessage(subject, null, h, null); - connection.publish(m); - Message incoming = sub.nextMessage(1000); - assertEquals(1, incoming.getHeaders().size()); - - // headers are no longer copied, just referenced - // so this will affect the message which is the same - // as the local copy - h.put("two", "B"); - connection.publish(m); - incoming = sub.nextMessage(1000); - assertEquals(2, incoming.getHeaders().size()); - - // also if you get the headers from the message - m.getHeaders().put("three", "C"); - connection.publish(m); - incoming = sub.nextMessage(1000); - assertEquals(3, incoming.getHeaders().size()); + String subject = subject(); + Subscription sub = connection.subscribe(subject); + + Headers h = new Headers(); + h.put("one", "A"); + Message m = new NatsMessage(subject, null, h, null); + connection.publish(m); + Message incoming = sub.nextMessage(1000); + assertEquals(1, incoming.getHeaders().size()); + + // headers are no longer copied, just referenced + // so this will affect the message which is the same + // as the local copy + h.put("two", "B"); + connection.publish(m); + incoming = sub.nextMessage(1000); + assertEquals(2, incoming.getHeaders().size()); + + // also if you get the headers from the message + m.getHeaders().put("three", "C"); + connection.publish(m); + incoming = sub.nextMessage(1000); + assertEquals(3, incoming.getHeaders().size()); }); } } \ No newline at end of file