From 5a4ebfef1c056cdbfe18bfcfc8fec28f593908a1 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 6 Jan 2025 18:02:33 -0500 Subject: [PATCH 1/7] Reader Listener A way to debug incoming messages before they are processed --- src/main/java/io/nats/client/Options.java | 29 ++++++++++++++++++- .../java/io/nats/client/ReadListener.java | 19 ++++++++++++ .../client/impl/NatsConnectionReader.java | 25 +++++++++++++++- .../client/impl/NatsConnectionWriter.java | 2 +- 4 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/nats/client/ReadListener.java diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 4e812dd45..42ac19ce5 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -517,6 +517,11 @@ public class Options { * {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory}. */ public static final String PROP_CALLBACK_THREAD_FACTORY_CLASS = "callback.thread.factory.class"; + /** + * Property used to set class name for the Callback Thread Factory + * {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory}. + */ + public static final String PROP_READ_LISTENER_CLASS = "read.listener.class"; // ---------------------------------------------------------------------------------------------------- // PROTOCOL CONNECT OPTION CONSTANTS @@ -658,6 +663,7 @@ public class Options { private final ErrorListener errorListener; private final TimeTraceLogger timeTraceLogger; private final ConnectionListener connectionListener; + private ReadListener readListener; private final StatisticsCollector statisticsCollector; private final String dataPortType; @@ -779,6 +785,7 @@ public static class Builder { private ErrorListener errorListener = null; private TimeTraceLogger timeTraceLogger = null; private ConnectionListener connectionListener = null; + private ReadListener readListener = null; private StatisticsCollector statisticsCollector = null; private String dataPortType = DEFAULT_DATA_PORT_TYPE; private ExecutorService executor; @@ -896,6 +903,7 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_ERROR_LISTENER, o -> this.errorListener = (ErrorListener) o); classnameProperty(props, PROP_TIME_TRACE_LOGGER, o -> this.timeTraceLogger = (TimeTraceLogger) o); classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o); + classnameProperty(props, PROP_READ_LISTENER_CLASS, o -> this.readListener = (ReadListener) o); classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o); stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s); @@ -914,7 +922,6 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o); classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o); classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o); - return this; } @@ -1551,6 +1558,17 @@ public Builder connectionListener(ConnectionListener listener) { return this; } + /** + * Sets a listener to be notified on incoming protocol/message + * + * @param readListener the listener + * @return the Builder for chaining + */ + public Builder readListener(ReadListener readListener) { + this.readListener = readListener; + return this; + } + /** * Set the {@link StatisticsCollector StatisticsCollector} to collect connection metrics. *

@@ -1960,6 +1978,7 @@ public Builder(Options o) { this.errorListener = o.errorListener; this.timeTraceLogger = o.timeTraceLogger; this.connectionListener = o.connectionListener; + this.readListener = o.readListener; this.statisticsCollector = o.statisticsCollector; this.dataPortType = o.dataPortType; this.trackAdvancedStats = o.trackAdvancedStats; @@ -2027,6 +2046,7 @@ private Options(Builder b) { this.errorListener = b.errorListener; this.timeTraceLogger = b.timeTraceLogger; this.connectionListener = b.connectionListener; + this.readListener = b.readListener; this.statisticsCollector = b.statisticsCollector; this.dataPortType = b.dataPortType; this.trackAdvancedStats = b.trackAdvancedStats; @@ -2112,6 +2132,13 @@ public ConnectionListener getConnectionListener() { return this.connectionListener; } + /** + * @return the read listener, or null, see {@link Builder#readListener(ReadListener) readListener()} in the builder doc + */ + public ReadListener getReadListener() { + return this.readListener; + } + /** * @return the statistics collector, or null, see {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector()} in the builder doc */ diff --git a/src/main/java/io/nats/client/ReadListener.java b/src/main/java/io/nats/client/ReadListener.java new file mode 100644 index 000000000..5439e6d2e --- /dev/null +++ b/src/main/java/io/nats/client/ReadListener.java @@ -0,0 +1,19 @@ +// Copyright 2015-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +public interface ReadListener { + void protocol(String op, String string); + void message(String op, Message message); +} diff --git a/src/main/java/io/nats/client/impl/NatsConnectionReader.java b/src/main/java/io/nats/client/impl/NatsConnectionReader.java index 7278a0754..dc65fe5e4 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionReader.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionReader.java @@ -13,6 +13,7 @@ package io.nats.client.impl; +import io.nats.client.ReadListener; import io.nats.client.support.IncomingHeadersProcessor; import java.io.IOException; @@ -68,6 +69,7 @@ enum Mode { private final AtomicBoolean running; private final boolean utf8Mode; + private final ReadListener readListener; NatsConnectionReader(NatsConnection connection) { this.connection = connection; @@ -83,6 +85,7 @@ enum Mode { this.bufferPosition = 0; this.utf8Mode = connection.getOptions().supportUTF8Subjects(); + readListener = connection.getOptions().getReadListener(); } // Should only be called if the current thread has exited. @@ -346,7 +349,11 @@ void gatherMessageData(int maxPos) throws IOException { if (gotCR) { if (b == LF) { incoming.setData(msgData); - this.connection.deliverMessage(incoming.getMessage()); + NatsMessage m = incoming.getMessage(); + this.connection.deliverMessage(m); + if (readListener != null) { + readListener.message(op, m); + } msgData = null; msgDataPosition = 0; incoming = null; @@ -544,34 +551,50 @@ void parseProtocolMessage() throws IOException { break; case OP_OK: this.connection.processOK(); + if (readListener != null) { + readListener.protocol(op, null); + } this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; break; case OP_ERR: String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", ""); this.connection.processError(errorText); + if (readListener != null) { + readListener.protocol(op, errorText); + } this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; break; case OP_PING: this.connection.sendPong(); + if (readListener != null) { + readListener.protocol(op, null); + } this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; break; case OP_PONG: this.connection.handlePong(); + if (readListener != null) { + readListener.protocol(op, null); + } this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; break; case OP_INFO: String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString(); this.connection.handleInfo(info); + if (readListener != null) { + readListener.protocol(op, info); + } this.op = UNKNOWN_OP; this.mode = Mode.GATHER_OP; break; default: throw new IllegalStateException("Unknown protocol operation "+op); } + } catch (IllegalStateException | NumberFormatException | NullPointerException ex) { this.encounteredProtocolError(ex); } diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index aff60d61c..c096b4196 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -151,7 +151,7 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st sendBuffer[sendPosition++] = CR; sendBuffer[sendPosition++] = LF; - if (!msg.isProtocol()) { + if (!msg.isProtocol()) { // because a protocol message does not have headers sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer); byte[] bytes = msg.getData(); // guaranteed to not be null From 4d23323e1f277ef6d4792fd4d1f6a4f84da022df Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 6 Jan 2025 19:01:17 -0500 Subject: [PATCH 2/7] Reader Listener A way to debug incoming messages before they are processed --- src/main/java/io/nats/client/Options.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 42ac19ce5..d59986e72 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -518,8 +518,8 @@ public class Options { */ public static final String PROP_CALLBACK_THREAD_FACTORY_CLASS = "callback.thread.factory.class"; /** - * Property used to set class name for the Callback Thread Factory - * {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory}. + * Property used to set class name for the ReaderListener implementations + * {@link Builder#readListener(ReadListener) readListener}. */ public static final String PROP_READ_LISTENER_CLASS = "read.listener.class"; From 2cd23c0db957ff461e7b198105c58d1d49600666 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 6 Jan 2025 19:01:49 -0500 Subject: [PATCH 3/7] Reader Listener A way to debug incoming messages before they are processed --- src/main/java/io/nats/client/Options.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index d59986e72..9fdd00149 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -518,7 +518,7 @@ public class Options { */ public static final String PROP_CALLBACK_THREAD_FACTORY_CLASS = "callback.thread.factory.class"; /** - * Property used to set class name for the ReaderListener implementations + * Property used to set class name for the ReaderListener implementation * {@link Builder#readListener(ReadListener) readListener}. */ public static final String PROP_READ_LISTENER_CLASS = "read.listener.class"; From 107fef9e9c47187e11c9632d5a54adcf9365da76 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 7 Jan 2025 12:50:12 -0500 Subject: [PATCH 4/7] update copyright --- src/main/java/io/nats/client/ReadListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/nats/client/ReadListener.java b/src/main/java/io/nats/client/ReadListener.java index 5439e6d2e..c2b40e012 100644 --- a/src/main/java/io/nats/client/ReadListener.java +++ b/src/main/java/io/nats/client/ReadListener.java @@ -1,4 +1,4 @@ -// Copyright 2015-2018 The NATS Authors +// Copyright 2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at: From cafa6bb715d84827b3e29c6343bfa7147f62f640 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 7 Jan 2025 16:32:17 -0500 Subject: [PATCH 5/7] added benchmark --- CHANGELOG.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21724176a..3de1175a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,19 @@ ### Tests * Unit test coverage #1252 @scottf +### Benchmark + +``` +┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐ +│ │ count │ time │ msgs/sec │ bytes/sec │ +├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ +│ Pubish Async │ 5,000,000 msgs │ 3:10.530 │ 26,242.586 msgs/sec │ 25.03 mb/sec │ +├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ +│ Simplified Fetch │ 5,000,000 msgs │ 5:08.620 │ 16,201.154 msgs/sec │ 15.45 mb/sec │ +├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ +│ Simplified Iterate │ 5,000,000 msgs │ 2:16.095 │ 36,739.043 msgs/sec │ 35.04 mb/sec │ +└─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘ +``` ## 2.20.4 From 86ed32c5389c9cf99e8725a612c7d72ebc5e1e68 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 7 Jan 2025 17:18:14 -0500 Subject: [PATCH 6/7] added benchmark --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3de1175a5..1bf92868b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ ┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐ │ │ count │ time │ msgs/sec │ bytes/sec │ ├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ -│ Pubish Async │ 5,000,000 msgs │ 3:10.530 │ 26,242.586 msgs/sec │ 25.03 mb/sec │ +│ Publish Async │ 5,000,000 msgs │ 3:10.530 │ 26,242.586 msgs/sec │ 25.03 mb/sec │ ├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ │ Simplified Fetch │ 5,000,000 msgs │ 5:08.620 │ 16,201.154 msgs/sec │ 15.45 mb/sec │ ├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ From 6df79bc7ea74de4129130237ce6ea5c4545da707 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 8 Jan 2025 15:18:18 -0500 Subject: [PATCH 7/7] updated benchmark --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bf92868b..4b2e26fa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,11 +23,11 @@ ┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐ │ │ count │ time │ msgs/sec │ bytes/sec │ ├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ -│ Publish Async │ 5,000,000 msgs │ 3:10.530 │ 26,242.586 msgs/sec │ 25.03 mb/sec │ +│ PubAsync │ 50,000,000 msgs │ 28:37.522 │ 29,111.709 msgs/sec │ 6.94 mb/sec │ ├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ -│ Simplified Fetch │ 5,000,000 msgs │ 5:08.620 │ 16,201.154 msgs/sec │ 15.45 mb/sec │ +│ SubFetch │ 50,000,000 msgs │ 35:23.774 │ 23,542.995 msgs/sec │ 5.61 mb/sec │ ├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤ -│ Simplified Iterate │ 5,000,000 msgs │ 2:16.095 │ 36,739.043 msgs/sec │ 35.04 mb/sec │ +│ SubIterate │ 50,000,000 msgs │ 17:10.329 │ 48,528.189 msgs/sec │ 11.57 mb/sec │ └─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘ ```