Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Fix error on closing the client and router in case it was auto-closed…
Browse files Browse the repository at this point in the history
… before
  • Loading branch information
Matthias247 committed Nov 11, 2014
1 parent b187d5b commit bc3e9e6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>ws.wamp.jawampa</groupId>
<artifactId>jawampa</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>

<name>jawampa</name>
<url></url>
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ and declare the following dependency:
<dependency>
<groupId>ws.wamp.jawampa</groupId>
<artifactId>jawampa</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
</dependency>


Expand Down
65 changes: 34 additions & 31 deletions src/main/java/ws/wamp/jawampa/WampClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
Expand Down Expand Up @@ -126,8 +125,7 @@ public String realm() {
}

final boolean closeClientOnErrors;
boolean isDisposed = false;
Exception closeError;
boolean isCompleted = false;

/** The factory which is used to create new transports to the remote peer */
final WampClientChannelFactory channelFactory;
Expand Down Expand Up @@ -233,18 +231,14 @@ public Thread newThread(Runnable r) {
this.reconnectInterval = reconnectInterval;
}

private void performDispose(Exception e) {
if (isDisposed) return;

closeError = e;
isDisposed = true;
private void completeStatus(Exception e) {
if (isCompleted) return;
isCompleted = true;

if (e != null)
statusObservable.onError(e);
else
statusObservable.onCompleted();

eventLoop.shutdownGracefully();
}

/**
Expand All @@ -260,7 +254,7 @@ public void open() {
eventLoop.execute(new Runnable() {
@Override
public void run() {
if (isDisposed) return;
if (isCompleted) return;
// Reset the number of reconnects
// This happens in both connecting and disconnected case
remainingNrReconnects = totalNrReconnects;
Expand Down Expand Up @@ -415,7 +409,7 @@ public void operationComplete(ChannelFuture f) throws Exception {
// Therefore we don't trigger retries
closeCurrentTransport();
statusObservable.onNext(status);
performDispose(e);
completeStatus(e);
}
}

Expand Down Expand Up @@ -453,31 +447,40 @@ private void closeCurrentTransport() {
/**
* Closes the session.<br>
* It will not be possible to open the session again with {@link #open} for safety
* reasons. If a new session is required a new {@link WampClient} shoule be built
* reasons. If a new session is required a new {@link WampClient} should be built
* through the used {@link WampClientBuilder}.
*/
public void close() {

// Avoid crashes on multiple/concurrent shutdowns
if (eventLoop.isShuttingDown() || eventLoop.isShutdown()) return;

eventLoop.execute(new Runnable() {
@Override
public void run() {
if (isDisposed) return; // Already closed

if (status == Status.Connected) {
// Send goodbye to the remote
GoodbyeMessage msg = new GoodbyeMessage(null,
ApplicationError.SYSTEM_SHUTDOWN);
channel.writeAndFlush(msg);
}

if (status != Status.Disconnected) {
// Close the connection (or connection attempt)
remainingNrReconnects = 0; // Don't reconnect
closeCurrentTransport();
statusObservable.onNext(status);
if (!isCompleted) // Check if already closed
{
if (status == Status.Connected) {
// Send goodbye to the remote
GoodbyeMessage msg = new GoodbyeMessage(null,
ApplicationError.SYSTEM_SHUTDOWN);
channel.writeAndFlush(msg);
}

if (status != Status.Disconnected) {
// Close the connection (or connection attempt)
remainingNrReconnects = 0; // Don't reconnect
closeCurrentTransport();
statusObservable.onNext(status);
}

// Normal close without an error
completeStatus(null);
}

// Normal close without an error
performDispose(null);
// Shut down the eventLoop if it didn't happen before
if (eventLoop.isShuttingDown() || eventLoop.isShutdown()) return;
eventLoop.shutdownGracefully();
}
});
}
Expand All @@ -500,7 +503,7 @@ private void onSessionError(ApplicationError error) {

if (closeClientOnErrors) {
remainingNrReconnects = 0;
performDispose(error);
completeStatus(error);
}
else {
if (mayReconnect()) {
Expand Down Expand Up @@ -1099,7 +1102,7 @@ public void call(Long t1) {
newEntry.subscriptionId = t1;
subscriptionsBySubscriptionId.put(t1, newEntry);
// Add the cancellation functionality to all subscribers
// If one is already usubscribed this will immediatly call
// If one is already unsubscribed this will immediately call
// the cancellation function for this subscriber
for (Subscriber<? super PubSubData> s : newEntry.subscribers) {
attachPubSubCancelationAction(s, newEntry, topic);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/ws/wamp/jawampa/WampRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ public Thread newThread(Runnable r) {
* as far as possible.
*/
public void close() {
if (eventLoop.isShuttingDown() || eventLoop.isShutdown()) return;

eventLoop.execute(new Runnable() {
@Override
public void run() {
Expand Down

0 comments on commit bc3e9e6

Please sign in to comment.