Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Headers Improvement #1054

Merged
merged 5 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/main/java/io/nats/client/api/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,21 @@ public MessageInfo(Message msg, String streamName, boolean direct) {
this.direct = direct;

if (direct) {
this.headers = msg.getHeaders();
this.subject = headers.getLast(NATS_SUBJECT);
Headers msgHeaders = msg.getHeaders();
this.subject = msgHeaders.getLast(NATS_SUBJECT);
this.data = msg.getData();
seq = Long.parseLong(headers.getFirst(NATS_SEQUENCE));
time = DateTimeUtils.parseDateTime(headers.getFirst(NATS_TIMESTAMP));
stream = headers.getFirst(NATS_STREAM);
String temp = headers.getFirst(NATS_LAST_SEQUENCE);
seq = Long.parseLong(msgHeaders.getFirst(NATS_SEQUENCE));
time = DateTimeUtils.parseDateTime(msgHeaders.getFirst(NATS_TIMESTAMP));
stream = msgHeaders.getFirst(NATS_STREAM);
String temp = msgHeaders.getFirst(NATS_LAST_SEQUENCE);
if (temp == null) {
lastSeq = -1;
}
else {
lastSeq = JsonUtils.safeParseLong(temp, -1);
}
// these are control headers, not real headers so don't give them to the user.
headers.remove(NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE);
headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
}
else if (hasError()) {
subject = null;
Expand Down
92 changes: 63 additions & 29 deletions src/main/java/io/nats/client/impl/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* An object that represents a map of keys to a list of values. It does not accept
* null or invalid keys. It ignores null values, accepts empty string as a value
* and rejects invalid values.
*
* !!!
* THIS CLASS IS NOT THREAD SAFE
*/
public class Headers {
Expand All @@ -36,21 +36,47 @@ public class Headers {

private final Map<String, List<String>> valuesMap;
private final Map<String, Integer> lengthMap;
private final boolean readOnly;
private byte[] serialized;
private int dataLength;

public Headers() {
valuesMap = new HashMap<>();
lengthMap = new HashMap<>();
this(null, false, null);
}

public Headers(Headers headers) {
this();
this(headers, false, null);
}

public Headers(Headers headers, boolean readOnly) {
this(headers, readOnly, null);
}

public Headers(Headers headers, boolean readOnly, String[] keysNotToCopy) {
Map<String, List<String>> tempValuesMap = new HashMap<>();
Map<String, Integer> tempLengthMap = new HashMap<>();
if (headers != null) {
valuesMap.putAll(headers.valuesMap);
lengthMap.putAll(headers.lengthMap);
tempValuesMap.putAll(headers.valuesMap);
tempLengthMap.putAll(headers.lengthMap);
dataLength = headers.dataLength;
serialized = null;
if (keysNotToCopy != null) {
for (String key : keysNotToCopy) {
if (key != null) {
if (tempValuesMap.remove(key) != null) {
dataLength -= tempLengthMap.remove(key);
}
}
}
}
}
this.readOnly = readOnly;
if (readOnly) {
valuesMap = Collections.unmodifiableMap(tempValuesMap);
Copy link

@sdklib sdklib Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections$UnmodifiableMap throws an UnsupportedOperationException when computeIfAbsent is called which is happening in the _add method. Unsure if this is really intended

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sdklib Are you trying to modify the headers after the message is created? This PR was intended to prevent this.

Copy link

@sdklib sdklib Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, we're using AOP to add a correlation id for proper logging across microservices to the headers.

After updating to the newest version, some unit tests fail, due to readOnly being true and the AOP Pointcut failing to add the correlation ID to the header.

The fix was easy, I was just wondering if the response should really be UnsupportedOperationException or maybe better returning a Headers.java (e.g. ReadOnlyHeaders.java) clone that doesn't have the add method in the first place. Or something similar~ :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have to change the api to return a different object, and this would require a major version. Also, before, since I did not effectively copy the headers, the original headers object could have been modified. I'm updating the readme with a note about the change for the 2.17.2 version.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feared so, never the less, thank you so much! 👍

lengthMap = Collections.unmodifiableMap(tempLengthMap);
}
else {
valuesMap = tempValuesMap;
lengthMap = tempLengthMap;
}
}

Expand Down Expand Up @@ -155,7 +181,7 @@ public Headers put(Map<String, List<String>> map) {

// the put delegate that all puts call
private void _put(String key, Collection<String> values) {
if (key == null || key.length() == 0) {
if (key == null || key.isEmpty()) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
if (values != null) {
Expand Down Expand Up @@ -203,7 +229,7 @@ private void _remove(String key) {
}

/**
* Returns the number of keys (case sensitive) in the header.
* Returns the number of keys (case-sensitive) in the header.
*
* @return the number of keys
*/
Expand All @@ -221,7 +247,7 @@ public boolean isEmpty() {
}

/**
* Removes all of the keys The object map will be empty after this call returns.
* Removes all the keys The object map will be empty after this call returns.
*/
public void clear() {
valuesMap.clear();
Expand All @@ -231,20 +257,20 @@ public void clear() {
}

/**
* Returns <tt>true</tt> if key (case sensitive) is present (has values)
* Returns <tt>true</tt> if key (case-sensitive) is present (has values)
*
* @param key key whose presence is to be tested
* @return <tt>true</tt> if the key (case sensitive) is present (has values)
* @return <tt>true</tt> if the key (case-sensitive) is present (has values)
*/
public boolean containsKey(String key) {
return valuesMap.containsKey(key);
}

/**
* Returns <tt>true</tt> if key (case insensitive) is present (has values)
* Returns <tt>true</tt> if key (case-insensitive) is present (has values)
*
* @param key exact key whose presence is to be tested
* @return <tt>true</tt> if the key (case insensitive) is present (has values)
* @return <tt>true</tt> if the key (case-insensitive) is present (has values)
*/
public boolean containsKeyIgnoreCase(String key) {
for (String k : valuesMap.keySet()) {
Expand All @@ -256,7 +282,7 @@ public boolean containsKeyIgnoreCase(String key) {
}

/**
* Returns a {@link Set} view of the keys (case sensitive) contained in the object.
* Returns a {@link Set} view of the keys (case-sensitive) contained in the object.
*
* @return a read-only set the keys contained in this map
*/
Expand All @@ -265,7 +291,7 @@ public Set<String> keySet() {
}

/**
* Returns a {@link Set} view of the keys (case insensitive) contained in the object.
* Returns a {@link Set} view of the keys (case-insensitive) contained in the object.
*
* @return a read-only set of keys (in lowercase) contained in this map
*/
Expand All @@ -278,43 +304,43 @@ public Set<String> keySetIgnoreCase() {
}

/**
* Returns a {@link List} view of the values for the specific (case sensitive) key.
* Returns a {@link List} view of the values for the specific (case-sensitive) key.
* Will be {@code null} if the key is not found.
*
* @return a read-only list of the values for the case sensitive key.
* @return a read-only list of the values for the case-sensitive key.
*/
public List<String> get(String key) {
List<String> values = valuesMap.get(key);
return values == null ? null : Collections.unmodifiableList(values);
}

/**
* Returns the first value for the specific (case sensitive) key.
* Returns the first value for the specific (case-sensitive) key.
* Will be {@code null} if the key is not found.
*
* @return the first value for the case sensitive key.
* @return the first value for the case-sensitive key.
*/
public String getFirst(String key) {
List<String> values = valuesMap.get(key);
return values == null ? null : values.get(0);
}

/**
* Returns the last value for the specific (case sensitive) key.
* Returns the last value for the specific (case-sensitive) key.
* Will be {@code null} if the key is not found.
*
* @return the last value for the case sensitive key.
* @return the last value for the case-sensitive key.
*/
public String getLast(String key) {
List<String> values = valuesMap.get(key);
return values == null ? null : values.get(values.size() - 1);
}

/**
* Returns a {@link List} view of the values for the specific (case insensitive) key.
* Returns a {@link List} view of the values for the specific (case-insensitive) key.
* Will be {@code null} if the key is not found.
*
* @return a read-only list of the values for the case insensitive key.
* @return a read-only list of the values for the case-insensitive key.
*/
public List<String> getIgnoreCase(String key) {
List<String> values = new ArrayList<>();
Expand All @@ -323,11 +349,11 @@ public List<String> getIgnoreCase(String key) {
values.addAll(valuesMap.get(k));
}
}
return values.size() == 0 ? null : Collections.unmodifiableList(values);
return values.isEmpty() ? null : Collections.unmodifiableList(values);
}

/**
* Performs the given action for each header entry (case sensitive keys) until all entries
* Performs the given action for each header entry (case-sensitive keys) until all entries
* have been processed or the action throws an exception.
* Any attempt to modify the values will throw an exception.
*
Expand All @@ -341,7 +367,7 @@ public void forEach(BiConsumer<String, List<String>> action) {
}

/**
* Returns a {@link Set} read only view of the mappings contained in the header (case sensitive keys).
* Returns a {@link Set} read only view of the mappings contained in the header (case-sensitive keys).
* The set is not modifiable and any attempt to modify will throw an exception.
*
* @return a set view of the mappings contained in this map
Expand Down Expand Up @@ -447,7 +473,7 @@ public int serializeToArray(int destPosition, byte[] dest) {
*/
private void checkKey(String key) {
// key cannot be null or empty and contain only printable characters except colon
if (key == null || key.length() == 0) {
if (key == null || key.isEmpty()) {
throw new IllegalArgumentException(KEY_CANNOT_BE_EMPTY_OR_NULL);
}

Expand Down Expand Up @@ -500,10 +526,18 @@ private class Checker {
}

boolean hasValues() {
return list.size() > 0;
return !list.isEmpty();
}
}

/**
* Whether the entire Headers is read only
* @return the read only state
*/
public boolean isReadOnly() {
return readOnly;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/nats/client/impl/NatsMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data)
this(data);
this.subject = validateSubject(subject, true);
this.replyTo = validateReplyTo(replyTo, false);
this.headers = headers;
this.headers = readOnlyOf(headers);
this.utf8mode = false;
finishConstruct();
}
Expand All @@ -99,11 +99,18 @@ public NatsMessage(Message message) {
this(message.getData());
this.subject = message.getSubject();
this.replyTo = message.getReplyTo();
this.headers = message.getHeaders();
this.headers = readOnlyOf(message.getHeaders());
this.utf8mode = message.isUtf8mode();
finishConstruct();
}

private static Headers readOnlyOf(Headers headers) {
if (headers == null || headers.isReadOnly()) {
return headers;
}
return new Headers(headers, true, null);
}

protected void finishConstruct() {
int replyToLen = replyTo == null ? 0 : replyTo.length();

Expand Down Expand Up @@ -167,13 +174,6 @@ int getControlLineLength() {
return controlLineLength;
}

Headers getOrCreateHeaders() {
if (headers == null) {
headers = new Headers();
}
return headers;
}

void setSubscription(NatsSubscription sub) {
subscription = sub;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public interface NatsJetStreamConstants {
String NATS_TIMESTAMP = "Nats-Time-Stamp";
String NATS_SUBJECT = "Nats-Subject";
String NATS_LAST_SEQUENCE = "Nats-Last-Sequence";
String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE};

String NATS_PENDING_MESSAGES = "Nats-Pending-Messages";
String NATS_PENDING_BYTES = "Nats-Pending-Bytes";
Expand Down
Loading