diff --git a/lib/src/main/java/ua/naiksoftware/stomp/AbstractConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/AbstractConnectionProvider.java index 4f089e2..bd1e2f8 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/AbstractConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/AbstractConnectionProvider.java @@ -4,14 +4,8 @@ import android.support.annotation.Nullable; import android.util.Log; -import java.util.concurrent.TimeUnit; - -import io.reactivex.BackpressureStrategy; import io.reactivex.Completable; -import io.reactivex.CompletableSource; -import io.reactivex.Flowable; import io.reactivex.Observable; -import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.PublishSubject; /** @@ -28,12 +22,10 @@ abstract class AbstractConnectionProvider implements ConnectionProvider { private final PublishSubject mLifecycleStream; @NonNull private final PublishSubject mMessagesStream; - final BehaviorSubject mConnectionStream; AbstractConnectionProvider() { mLifecycleStream = PublishSubject.create(); mMessagesStream = PublishSubject.create(); - mConnectionStream = BehaviorSubject.createDefault(false); } @NonNull @@ -54,27 +46,13 @@ public Observable messages() { @Override public Completable disconnect() { - CompletableSource ex = Completable.error(new IllegalStateException("Attempted to disconnect when already disconnected")); - - Completable block = mConnectionStream - .filter(connected -> connected).firstOrError().toCompletable() - .timeout(1, TimeUnit.SECONDS, ex); - return Completable - .fromAction(this::rawDisconnect) - .startWith(block); + .fromAction(this::rawDisconnect); } private Completable initSocket() { - CompletableSource ex = Completable.error(new IllegalStateException("Attempted to connect when already connected")); - - Completable block = mConnectionStream - .filter(connected -> !connected).firstOrError().toCompletable() - .timeout(1, TimeUnit.SECONDS, ex); - return Completable - .fromAction(this::createWebSocketConnection) - .startWith(block); + .fromAction(this::createWebSocketConnection); } // Doesn't do anything at all, only here as a stub @@ -130,8 +108,6 @@ public Completable send(String stompMessage) { void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) { Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); mLifecycleStream.onNext(lifecycleEvent); - if (lifecycleEvent.getType().equals(LifecycleEvent.Type.CLOSED)) - mConnectionStream.onNext(false); } void emitMessage(String stompMessage) { @@ -144,9 +120,4 @@ void emitMessage(String stompMessage) { public Observable lifecycle() { return mLifecycleStream; } - - @Override - public Flowable connected() { - return mConnectionStream.toFlowable(BackpressureStrategy.LATEST); - } } diff --git a/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java index 505c033..58fb826 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java @@ -33,6 +33,4 @@ public interface ConnectionProvider { Completable disconnect(); Completable setHeartbeat(int ms); - - Flowable connected(); } diff --git a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java index 9046400..a7be2e3 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java @@ -93,7 +93,6 @@ public void onClosing(final WebSocket webSocket, final int code, final String re } ); - mConnectionStream.onNext(true); } @Override diff --git a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java index 9ee4b16..eeadc81 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java @@ -110,7 +110,6 @@ public void onError(Exception ex) { mWebSocketClient.connect(); haveConnection = true; - mConnectionStream.onNext(true); } @Override diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java index 9f7a8f7..ff4c199 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java @@ -15,6 +15,7 @@ import io.reactivex.CompletableSource; import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; +import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.PublishSubject; import ua.naiksoftware.stomp.ConnectionProvider; import ua.naiksoftware.stomp.LifecycleEvent; @@ -39,6 +40,7 @@ public class StompClient { private PublishSubject mMessageStream; private ConcurrentHashMap> mStreamMap; + private final BehaviorSubject mConnectionStream; private Parser parser; private Disposable mLifecycleDisposable; private Disposable mMessagesDisposable; @@ -49,6 +51,7 @@ public StompClient(ConnectionProvider connectionProvider) { mConnectionProvider = connectionProvider; mMessageStream = PublishSubject.create(); mStreamMap = new ConcurrentHashMap<>(); + mConnectionStream = BehaviorSubject.createDefault(false); parser = Parser.NONE; } @@ -113,12 +116,12 @@ public void connect(@Nullable List _headers) { break; case CLOSED: - mConnected = false; + setConnected(false); isConnecting = false; break; case ERROR: - mConnected = false; + setConnected(false); isConnecting = false; break; } @@ -130,17 +133,24 @@ public void connect(@Nullable List _headers) { .doOnNext(this::callSubscribers) .filter(msg -> msg.getStompCommand().equals(StompCommand.CONNECTED)) .subscribe(stompMessage -> { - mConnected = true; + setConnected(true); isConnecting = false; + }); } + private void setConnected(boolean connected) { + mConnected = connected; + mConnectionStream.onNext(mConnected); + } + /** * Disconnect from server, and then reconnect with the last-used headers */ public void reconnect() { - disconnect(); - connect(mHeaders); + disconnectCompletable() + .subscribe(() -> connect(mHeaders), + e -> Log.e(tag, "Disconnect error", e)); } public Completable send(String destination) { @@ -156,7 +166,7 @@ public Completable send(String destination, String data) { public Completable send(@NonNull StompMessage stompMessage) { Completable completable = mConnectionProvider.send(stompMessage.compile(legacyWhitespace)); - CompletableSource connectionComplete = mConnectionProvider.connected() + CompletableSource connectionComplete = mConnectionStream .filter(isConnected -> isConnected) .firstOrError().toCompletable(); return completable @@ -172,9 +182,14 @@ public Flowable lifecycle() { } public void disconnect() { + disconnectCompletable().subscribe(() -> {}, e -> Log.e(tag, "Disconnect error", e)); + } + + public Completable disconnectCompletable() { mLifecycleDisposable.dispose(); mMessagesDisposable.dispose(); - mConnectionProvider.disconnect().subscribe(() -> mConnected = false, e -> Log.e(tag, "Disconnect error", e)); + return mConnectionProvider.disconnect() + .doOnComplete(() -> setConnected(false)); } public Flowable topic(String destinationPath) {