diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java index bd5300e..a7da924 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java @@ -147,46 +147,48 @@ public void disconnect() { mConnected = false; } - public Observable topic(String destinationPath) { - return Observable.create(subscriber -> { - - Set> subscribersSet = mSubscribers.get(destinationPath); - if (subscribersSet == null) { - subscribersSet = new HashSet<>(); - mSubscribers.put(destinationPath, subscribersSet); - subscribePath(destinationPath); - } - subscribersSet.add(subscriber); - - }).doOnUnsubscribe(() -> { - for (String dest : mSubscribers.keySet()) { - Set> set = mSubscribers.get(dest); - for (Subscriber subscriber : set) { - if (subscriber.isUnsubscribed()) { - set.remove(subscriber); - if (set.size() < 1) { - mSubscribers.remove(dest); - unsubscribePath(dest); - } - } - } - } - }); - } - - private void subscribePath(String destinationPath) { - if (destinationPath == null) return; - String topicId = UUID.randomUUID().toString(); - Log.d(TAG, "Subscribe path: " + destinationPath + " id: " + topicId); - - if (mTopics == null) mTopics = new HashMap<>(); - mTopics.put(destinationPath, topicId); - send(new StompMessage(StompCommand.SUBSCRIBE, - Arrays.asList( - new StompHeader(StompHeader.ID, topicId), - new StompHeader(StompHeader.DESTINATION, destinationPath), - new StompHeader(StompHeader.ACK, DEFAULT_ACK)), null)); - } + public Observable topic(String destinationPath, List headerList) { + return Observable.create(subscriber -> { + Set> subscribersSet = mSubscribers.get(destinationPath); + if (subscribersSet == null) { + subscribersSet = new HashSet<>(); + mSubscribers.put(destinationPath, subscribersSet); + subscribePath(destinationPath, headerList); + } + subscribersSet.add(subscriber); + + }).doOnUnsubscribe(() -> { + for (String dest : mSubscribers.keySet()) { + Set> set = mSubscribers.get(dest); + for (Subscriber subscriber : set) { + if (subscriber.isUnsubscribed()) { + set.remove(subscriber); + if (set.size() < 1) { + mSubscribers.remove(dest); + unsubscribePath(dest); + } + } + } + } + }); + } + + private void subscribePath(String destinationPath, List headerList) { + if (destinationPath == null) return; + String topicId = UUID.randomUUID().toString(); + + if (mTopics == null) mTopics = new HashMap<>(); + mTopics.put(destinationPath, topicId); + List headers = new ArrayList<>(); + headers.add(new StompHeader(StompHeader.ID, topicId)); + headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath)); + headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK)); + for(StompHeader header : headerList){ + headers.add(header); + } + send(new StompMessage(StompCommand.SUBSCRIBE, + headers, null)); + } private void unsubscribePath(String dest) {