Skip to content

Commit

Permalink
Merge pull request #146 from Jordan-Turgeon/multiple-topic-subscriptions
Browse files Browse the repository at this point in the history
#142 Fixed multiple topic subscription errors
  • Loading branch information
NaikSoftware authored Jun 25, 2020
2 parents b59b1e7 + a8cf433 commit 518eebe
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions lib/src/main/java/ua/naiksoftware/stomp/StompClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ public Flowable<StompMessage> topic(@NonNull String destPath, List<StompHeader>
return Flowable.error(new IllegalArgumentException("Topic path cannot be null"));
else if (!streamMap.containsKey(destPath))
streamMap.put(destPath,
subscribePath(destPath, headerList).andThen(
Completable.defer(() -> subscribePath(destPath, headerList)).andThen(
getMessageStream()
.filter(msg -> pathMatcher.matches(destPath, msg))
.toFlowable(BackpressureStrategy.BUFFER)
.share()).doFinally(() -> unsubscribePath(destPath).subscribe())
.doFinally(() -> unsubscribePath(destPath).subscribe())
.share())
);
return streamMap.get(destPath);
}
Expand All @@ -269,14 +270,20 @@ private Completable subscribePath(String destinationPath, @Nullable List<StompHe
headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK));
if (headerList != null) headers.addAll(headerList);
return send(new StompMessage(StompCommand.SUBSCRIBE,
headers, null));
headers, null))
.doOnError(throwable -> unsubscribePath(destinationPath).subscribe());
}


private Completable unsubscribePath(String dest) {
streamMap.remove(dest);

String topicId = topics.get(dest);

if (topicId == null) {
return Completable.complete();
}

topics.remove(dest);

Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId);
Expand Down

0 comments on commit 518eebe

Please sign in to comment.