diff --git a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java index fab4803..5b5b765 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java @@ -3,6 +3,7 @@ import android.util.Log; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -33,13 +34,11 @@ private WebSocket openedSocked; - private final Object mLifecycleLock = new Object(); - /* package */ OkHttpConnectionProvider(String uri, Map connectHttpHeaders, OkHttpClient okHttpClient) { mUri = uri; mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>(); - mLifecycleEmitters = new ArrayList<>(); + mLifecycleEmitters = Collections.synchronizedList(new ArrayList<>()); mMessagesEmitters = new ArrayList<>(); mOkHttpClient = okHttpClient; } @@ -133,7 +132,7 @@ public Flowable send(String stompMessage) { public Flowable getLifecycleReceiver() { return Flowable.create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) .doOnCancel(() -> { - synchronized (mLifecycleLock) { + synchronized (mLifecycleEmitters) { Iterator> iterator = mLifecycleEmitters.iterator(); while (iterator.hasNext()) { if (iterator.next().isCancelled()) iterator.remove(); @@ -158,7 +157,7 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map subscriber : mLifecycleEmitters) { subscriber.onNext(lifecycleEvent); diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java index f9605f1..dcad677 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java @@ -35,7 +35,7 @@ public class StompClient { private Disposable mMessagesDisposable; private Disposable mLifecycleDisposable; - private Map>> mEmitters = new ConcurrentHashMap<>(); + private Map>> mEmitters = Collections.synchronizedMap(new HashMap<>()); private List> mWaitConnectionFlowables; private final ConnectionProvider mConnectionProvider; private HashMap mTopics; @@ -141,12 +141,14 @@ public Flowable send(StompMessage stompMessage) { private void callSubscribers(StompMessage stompMessage) { String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION); - for (String dest : mEmitters.keySet()) { - if (dest.equals(messageDestination)) { - for (FlowableEmitter subscriber : mEmitters.get(dest)) { - subscriber.onNext(stompMessage); + synchronized (mEmitters) { + for (String dest : mEmitters.keySet()) { + if (dest.equals(messageDestination)) { + for (FlowableEmitter subscriber : mEmitters.get(dest)) { + subscriber.onNext(stompMessage); + } + return; } - return; } } } @@ -167,27 +169,31 @@ public Flowable topic(String destinationPath) { public Flowable topic(String destinationPath, List headerList) { return Flowable.create(emitter -> { - Set> emittersSet = mEmitters.get(destinationPath); - if (emittersSet == null) { - emittersSet = new HashSet<>(); - mEmitters.put(destinationPath, emittersSet); - subscribePath(destinationPath, headerList).subscribe(); + synchronized (mEmitters) { + Set> emittersSet = mEmitters.get(destinationPath); + if (emittersSet == null) { + emittersSet = new HashSet<>(); + mEmitters.put(destinationPath, emittersSet); + subscribePath(destinationPath, headerList).subscribe(); + } + emittersSet.add(emitter); } - emittersSet.add(emitter); }, BackpressureStrategy.BUFFER) .doOnCancel(() -> { - Iterator mapIterator = mEmitters.keySet().iterator(); - while (mapIterator.hasNext()) { - String destinationUrl = mapIterator.next(); - Set> set = mEmitters.get(destinationUrl); - Iterator> setIterator = set.iterator(); - while (setIterator.hasNext()) { - FlowableEmitter subscriber = setIterator.next(); - if (subscriber.isCancelled()) { - setIterator.remove(); - if (set.size() < 1) { - mapIterator.remove(); - unsubscribePath(destinationUrl).subscribe(); + synchronized (mEmitters) { + Iterator mapIterator = mEmitters.keySet().iterator(); + while (mapIterator.hasNext()) { + String destinationUrl = mapIterator.next(); + Set> set = mEmitters.get(destinationUrl); + Iterator> setIterator = set.iterator(); + while (setIterator.hasNext()) { + FlowableEmitter subscriber = setIterator.next(); + if (subscriber.isCancelled()) { + setIterator.remove(); + if (set.size() < 1) { + mapIterator.remove(); + unsubscribePath(destinationUrl).subscribe(); + } } } }