Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader Listener #1265

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@
### Tests
* Unit test coverage #1252 @scottf

### Benchmark

```
┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐
│ │ count │ time │ msgs/sec │ bytes/sec │
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
│ PubAsync │ 50,000,000 msgs │ 28:37.522 │ 29,111.709 msgs/sec │ 6.94 mb/sec │
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
│ SubFetch │ 50,000,000 msgs │ 35:23.774 │ 23,542.995 msgs/sec │ 5.61 mb/sec │
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
│ SubIterate │ 50,000,000 msgs │ 17:10.329 │ 48,528.189 msgs/sec │ 11.57 mb/sec │
└─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘
```

## 2.20.4

Expand Down
29 changes: 28 additions & 1 deletion src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,11 @@ public class Options {
* {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory}.
*/
public static final String PROP_CALLBACK_THREAD_FACTORY_CLASS = "callback.thread.factory.class";
/**
* Property used to set class name for the ReaderListener implementation
* {@link Builder#readListener(ReadListener) readListener}.
*/
public static final String PROP_READ_LISTENER_CLASS = "read.listener.class";

// ----------------------------------------------------------------------------------------------------
// PROTOCOL CONNECT OPTION CONSTANTS
Expand Down Expand Up @@ -658,6 +663,7 @@ public class Options {
private final ErrorListener errorListener;
private final TimeTraceLogger timeTraceLogger;
private final ConnectionListener connectionListener;
private ReadListener readListener;
private final StatisticsCollector statisticsCollector;
private final String dataPortType;

Expand Down Expand Up @@ -779,6 +785,7 @@ public static class Builder {
private ErrorListener errorListener = null;
private TimeTraceLogger timeTraceLogger = null;
private ConnectionListener connectionListener = null;
private ReadListener readListener = null;
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
Expand Down Expand Up @@ -896,6 +903,7 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_ERROR_LISTENER, o -> this.errorListener = (ErrorListener) o);
classnameProperty(props, PROP_TIME_TRACE_LOGGER, o -> this.timeTraceLogger = (TimeTraceLogger) o);
classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o);
classnameProperty(props, PROP_READ_LISTENER_CLASS, o -> this.readListener = (ReadListener) o);
classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o);

stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s);
Expand All @@ -914,7 +922,6 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);

return this;
}

Expand Down Expand Up @@ -1551,6 +1558,17 @@ public Builder connectionListener(ConnectionListener listener) {
return this;
}

/**
* Sets a listener to be notified on incoming protocol/message
*
* @param readListener the listener
* @return the Builder for chaining
*/
public Builder readListener(ReadListener readListener) {
this.readListener = readListener;
return this;
}

/**
* Set the {@link StatisticsCollector StatisticsCollector} to collect connection metrics.
* <p>
Expand Down Expand Up @@ -1960,6 +1978,7 @@ public Builder(Options o) {
this.errorListener = o.errorListener;
this.timeTraceLogger = o.timeTraceLogger;
this.connectionListener = o.connectionListener;
this.readListener = o.readListener;
this.statisticsCollector = o.statisticsCollector;
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
Expand Down Expand Up @@ -2027,6 +2046,7 @@ private Options(Builder b) {
this.errorListener = b.errorListener;
this.timeTraceLogger = b.timeTraceLogger;
this.connectionListener = b.connectionListener;
this.readListener = b.readListener;
this.statisticsCollector = b.statisticsCollector;
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
Expand Down Expand Up @@ -2112,6 +2132,13 @@ public ConnectionListener getConnectionListener() {
return this.connectionListener;
}

/**
* @return the read listener, or null, see {@link Builder#readListener(ReadListener) readListener()} in the builder doc
*/
public ReadListener getReadListener() {
return this.readListener;
}

/**
* @return the statistics collector, or null, see {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector()} in the builder doc
*/
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/nats/client/ReadListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

public interface ReadListener {
void protocol(String op, String string);
void message(String op, Message message);
}
25 changes: 24 additions & 1 deletion src/main/java/io/nats/client/impl/NatsConnectionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.nats.client.impl;

import io.nats.client.ReadListener;
import io.nats.client.support.IncomingHeadersProcessor;

import java.io.IOException;
Expand Down Expand Up @@ -68,6 +69,7 @@ enum Mode {
private final AtomicBoolean running;

private final boolean utf8Mode;
private final ReadListener readListener;

NatsConnectionReader(NatsConnection connection) {
this.connection = connection;
Expand All @@ -83,6 +85,7 @@ enum Mode {
this.bufferPosition = 0;

this.utf8Mode = connection.getOptions().supportUTF8Subjects();
readListener = connection.getOptions().getReadListener();
}

// Should only be called if the current thread has exited.
Expand Down Expand Up @@ -346,7 +349,11 @@ void gatherMessageData(int maxPos) throws IOException {
if (gotCR) {
if (b == LF) {
incoming.setData(msgData);
this.connection.deliverMessage(incoming.getMessage());
NatsMessage m = incoming.getMessage();
this.connection.deliverMessage(m);
if (readListener != null) {
readListener.message(op, m);
}
msgData = null;
msgDataPosition = 0;
incoming = null;
Expand Down Expand Up @@ -544,34 +551,50 @@ void parseProtocolMessage() throws IOException {
break;
case OP_OK:
this.connection.processOK();
if (readListener != null) {
readListener.protocol(op, null);
}
this.op = UNKNOWN_OP;
this.mode = Mode.GATHER_OP;
break;
case OP_ERR:
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
this.connection.processError(errorText);
if (readListener != null) {
readListener.protocol(op, errorText);
}
this.op = UNKNOWN_OP;
this.mode = Mode.GATHER_OP;
break;
case OP_PING:
this.connection.sendPong();
if (readListener != null) {
readListener.protocol(op, null);
}
this.op = UNKNOWN_OP;
this.mode = Mode.GATHER_OP;
break;
case OP_PONG:
this.connection.handlePong();
if (readListener != null) {
readListener.protocol(op, null);
}
this.op = UNKNOWN_OP;
this.mode = Mode.GATHER_OP;
break;
case OP_INFO:
String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
this.connection.handleInfo(info);
if (readListener != null) {
readListener.protocol(op, info);
}
this.op = UNKNOWN_OP;
this.mode = Mode.GATHER_OP;
break;
default:
throw new IllegalStateException("Unknown protocol operation "+op);
}

} catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
this.encounteredProtocolError(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st
sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;

if (!msg.isProtocol()) {
if (!msg.isProtocol()) { // because a protocol message does not have headers
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);

byte[] bytes = msg.getData(); // guaranteed to not be null
Expand Down
Loading