Skip to content

Commit

Permalink
Fix concurrent modification exception
Browse files Browse the repository at this point in the history
  • Loading branch information
NaikSoftware committed Dec 25, 2017
1 parent 885c8ce commit ccb87f6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import android.util.Log;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -33,13 +34,11 @@

private WebSocket openedSocked;

private final Object mLifecycleLock = new Object();


/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleEmitters = new ArrayList<>();
mLifecycleEmitters = Collections.synchronizedList(new ArrayList<>());
mMessagesEmitters = new ArrayList<>();
mOkHttpClient = okHttpClient;
}
Expand Down Expand Up @@ -133,7 +132,7 @@ public Flowable<Void> send(String stompMessage) {
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
synchronized (mLifecycleLock) {
synchronized (mLifecycleEmitters) {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
Expand All @@ -158,7 +157,7 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S
}

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
synchronized (mLifecycleLock) {
synchronized (mLifecycleEmitters) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
subscriber.onNext(lifecycleEvent);
Expand Down
54 changes: 30 additions & 24 deletions lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class StompClient {

private Disposable mMessagesDisposable;
private Disposable mLifecycleDisposable;
private Map<String, Set<FlowableEmitter<? super StompMessage>>> mEmitters = new ConcurrentHashMap<>();
private Map<String, Set<FlowableEmitter<? super StompMessage>>> mEmitters = Collections.synchronizedMap(new HashMap<>());
private List<ConnectableFlowable<Void>> mWaitConnectionFlowables;
private final ConnectionProvider mConnectionProvider;
private HashMap<String, String> mTopics;
Expand Down Expand Up @@ -141,12 +141,14 @@ public Flowable<Void> send(StompMessage stompMessage) {

private void callSubscribers(StompMessage stompMessage) {
String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION);
for (String dest : mEmitters.keySet()) {
if (dest.equals(messageDestination)) {
for (FlowableEmitter<? super StompMessage> subscriber : mEmitters.get(dest)) {
subscriber.onNext(stompMessage);
synchronized (mEmitters) {
for (String dest : mEmitters.keySet()) {
if (dest.equals(messageDestination)) {
for (FlowableEmitter<? super StompMessage> subscriber : mEmitters.get(dest)) {
subscriber.onNext(stompMessage);
}
return;
}
return;
}
}
}
Expand All @@ -167,27 +169,31 @@ public Flowable<StompMessage> topic(String destinationPath) {

public Flowable<StompMessage> topic(String destinationPath, List<StompHeader> headerList) {
return Flowable.<StompMessage>create(emitter -> {
Set<FlowableEmitter<? super StompMessage>> emittersSet = mEmitters.get(destinationPath);
if (emittersSet == null) {
emittersSet = new HashSet<>();
mEmitters.put(destinationPath, emittersSet);
subscribePath(destinationPath, headerList).subscribe();
synchronized (mEmitters) {
Set<FlowableEmitter<? super StompMessage>> emittersSet = mEmitters.get(destinationPath);
if (emittersSet == null) {
emittersSet = new HashSet<>();
mEmitters.put(destinationPath, emittersSet);
subscribePath(destinationPath, headerList).subscribe();
}
emittersSet.add(emitter);
}
emittersSet.add(emitter);
}, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<String> mapIterator = mEmitters.keySet().iterator();
while (mapIterator.hasNext()) {
String destinationUrl = mapIterator.next();
Set<FlowableEmitter<? super StompMessage>> set = mEmitters.get(destinationUrl);
Iterator<FlowableEmitter<? super StompMessage>> setIterator = set.iterator();
while (setIterator.hasNext()) {
FlowableEmitter<? super StompMessage> subscriber = setIterator.next();
if (subscriber.isCancelled()) {
setIterator.remove();
if (set.size() < 1) {
mapIterator.remove();
unsubscribePath(destinationUrl).subscribe();
synchronized (mEmitters) {
Iterator<String> mapIterator = mEmitters.keySet().iterator();
while (mapIterator.hasNext()) {
String destinationUrl = mapIterator.next();
Set<FlowableEmitter<? super StompMessage>> set = mEmitters.get(destinationUrl);
Iterator<FlowableEmitter<? super StompMessage>> setIterator = set.iterator();
while (setIterator.hasNext()) {
FlowableEmitter<? super StompMessage> subscriber = setIterator.next();
if (subscriber.isCancelled()) {
setIterator.remove();
if (set.size() < 1) {
mapIterator.remove();
unsubscribePath(destinationUrl).subscribe();
}
}
}
}
Expand Down

0 comments on commit ccb87f6

Please sign in to comment.