Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Handle executor rejections after client shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthias247 committed May 17, 2017
1 parent 12d8f22 commit 00cfb02
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 34 deletions.
29 changes: 25 additions & 4 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public Observable<Long> publish(final String topic, final EnumSet<PublishFlags>
return resultSubject;
}

stateController.scheduler().execute(new Runnable() {
boolean wasScheduled = stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
if (!(stateController.currentState() instanceof SessionEstablishedState)) {
Expand All @@ -260,6 +260,11 @@ public void run() {

}
});

if (!wasScheduled) {
resultSubject.onError(
new ApplicationError(ApplicationError.CLIENT_CLOSED));
}
return resultSubject;
}

Expand Down Expand Up @@ -336,7 +341,7 @@ public void call(final Subscriber<? super Request> subscriber) {
return;
}

stateController.scheduler().execute(new Runnable() {
boolean wasScheduled = stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
// If the Subscriber unsubscribed in the meantime we return early
Expand All @@ -351,6 +356,11 @@ public void run() {
curState.performRegisterProcedure(topic, flags, subscriber);
}
});

if (!wasScheduled) {
subscriber.onError(
new ApplicationError(ApplicationError.CLIENT_CLOSED));
}
}
});
}
Expand Down Expand Up @@ -544,7 +554,7 @@ public void call(final Subscriber<? super PubSubData> subscriber) {
return;
}

stateController.scheduler().execute(new Runnable() {
boolean wasScheduled = stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
// If the Subscriber unsubscribed in the meantime we return early
Expand All @@ -559,6 +569,11 @@ public void run() {
curState.performSubscription(topic, flags, subscriber);
}
});

if (!wasScheduled) {
subscriber.onError(
new ApplicationError(ApplicationError.CLIENT_CLOSED));
}
}
});
}
Expand Down Expand Up @@ -615,7 +630,7 @@ public Observable<Reply> call(final String procedure,
return resultSubject;
}

stateController.scheduler().execute(new Runnable() {
boolean wasScheduled = stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
if (!(stateController.currentState() instanceof SessionEstablishedState)) {
Expand All @@ -628,6 +643,12 @@ public void run() {
curState.performCall(procedure, flags, arguments, argumentsKw, resultSubject);
}
});

if (!wasScheduled) {
resultSubject.onError(
new ApplicationError(ApplicationError.CLIENT_CLOSED));
}

return resultSubject;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package ws.wamp.jawampa.client;

import java.util.concurrent.RejectedExecutionException;

import ws.wamp.jawampa.WampClient;
import ws.wamp.jawampa.connection.IConnectionController;
import ws.wamp.jawampa.connection.IPendingWampConnection;
Expand Down Expand Up @@ -81,27 +79,29 @@ public void onLeave(ClientState newState) {

@Override
public void connectSucceeded(final IWampConnection connection) {
try {
stateController.scheduler().execute(new Runnable() {
@Override
public void run() {
if (!isCancelled) {
// Our new channel is connected
connectionController.setConnection(connection);
HandshakingState newState = new HandshakingState(stateController, connectionController, nrConnectAttempts);
stateController.setState(newState);
} else {
// We we're connected but aren't interested in the channel anymore
// The client should close
// Therefore we close the new channel
stateController.setExternalState(new WampClient.DisconnectedState(null));
WaitingForDisconnectState newState = new WaitingForDisconnectState(stateController, nrConnectAttempts);
connection.close(false, newState.closePromise());
stateController.setState(newState);
}
boolean wasScheduled = stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
if (!isCancelled) {
// Our new channel is connected
connectionController.setConnection(connection);
HandshakingState newState = new HandshakingState(stateController, connectionController, nrConnectAttempts);
stateController.setState(newState);
} else {
// We we're connected but aren't interested in the channel anymore
// The client should close
// Therefore we close the new channel
stateController.setExternalState(new WampClient.DisconnectedState(null));
WaitingForDisconnectState newState = new WaitingForDisconnectState(stateController, nrConnectAttempts);
connection.close(false, newState.closePromise());
stateController.setState(newState);
}
});
} catch (RejectedExecutionException e) {
}
});

if (!wasScheduled) {
// If the client was closed before the connection
// succeeds, close the connection
connection.close(false, IWampConnectionPromise.Empty);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException;

import rx.Subscriber;
import rx.functions.Action0;
Expand Down Expand Up @@ -536,7 +537,7 @@ public void call(Throwable t1) {
new RequestMapEntry(RegisterMessage.ID, registerFuture));
connectionController.sendMessage(msg, IWampConnectionPromise.Empty);
}

/**
* Add an action that is added to the subscriber which is executed
* if unsubscribe is called on a registered procedure.<br>
Expand All @@ -549,7 +550,7 @@ private void attachCancelRegistrationAction(final Subscriber<? super Request> su
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
stateController.scheduler().execute(new Runnable() {
stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
if (mapEntry.state != RegistrationState.Registered) return;
Expand Down Expand Up @@ -668,7 +669,7 @@ public void call(Throwable t1) {
connectionController.sendMessage(msg, IWampConnectionPromise.Empty);
}
}

/**
* Add an action that is added to the subscriber which is executed
* if unsubscribe is called. This action will lead to the unsubscription at the
Expand All @@ -681,7 +682,7 @@ private void attachPubSubCancellationAction(final Subscriber<? super PubSubData>
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
stateController.scheduler().execute(new Runnable() {
stateController.tryScheduleAction(new Runnable() {
@Override
public void run() {
mapEntry.subscribers.remove(subscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,17 @@ public void setCloseError(Throwable closeError) {
* Rejected executions will be suppressed.
*
* @param action The action to schedule.
* @return true if the Runnable could be scheduled, false otherwise
*/
public void tryScheduleAction(Runnable action) {
public boolean tryScheduleAction(Runnable action) {
try {
scheduler.submit(action);
} catch (RejectedExecutionException e) {}
scheduler.execute(action);
// Ignore this exception
// The scheduling will be performed with best effort
return true;
} catch (RejectedExecutionException e) {
return false;
}
}

public ClientState currentState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
Expand Down

0 comments on commit 00cfb02

Please sign in to comment.