Skip to content

Commit

Permalink
Fixes #200 #122
Browse files Browse the repository at this point in the history
`InterceptorExecutor` was not adding unsubscribe hooks to the child when there were no interceptors.
  • Loading branch information
Nitesh Kant committed Mar 19, 2015
1 parent 908d8a1 commit 9d13790
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 24 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#
org.gradle.daemon=true

rxnetty_version=0.4.4
rxnetty_version=0.4.7
jersey_version=1.18.1
governator_version=1.3.3
pytheas_version=1.25
apache_httpclient_version=4.2.1
eureka_version=1.1.147
eureka_version=1.1.150
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public Observable<Void> execute(final I request, final O response, C keyEvaluati
@Override
public Subscriber<? super Void> call(Subscriber<? super Void> child) {
SerialSubscription subscription = new SerialSubscription();
return new ChainSubscriber(subscription, context, request, response, child);
ChainSubscriber chainSubscriber = new ChainSubscriber(subscription, context, request, response, child);
subscription.set(chainSubscriber);
child.add(subscription);
return chainSubscriber;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ public void testOutbound() throws Exception {
Assert.assertTrue("Tail interceptor did not get invoked.", router.isReceivedACall());
}

@Test
public void testUnsubscribe() throws Exception {

TestableRequestRouter<ByteBuf, ByteBuf> router = new TestableRequestRouter<ByteBuf, ByteBuf>();

InterceptorSupport<ByteBuf, ByteBuf, KeyEvaluationContext> support = new InterceptorSupport<ByteBuf, ByteBuf, KeyEvaluationContext>();

InterceptorExecutor<ByteBuf, ByteBuf, KeyEvaluationContext> executor =
new InterceptorExecutor<ByteBuf, ByteBuf, KeyEvaluationContext>(support, router);

executeAndAwait(executor);

Assert.assertTrue("Router did not get invoked.", router.isReceivedACall());
Assert.assertTrue("Router did not get unsubscribed.", router.isUnsubscribed());
}

protected void executeAndAwait(InterceptorExecutor<ByteBuf, ByteBuf, KeyEvaluationContext> executor)
throws InterruptedException {
final CountDownLatch completionLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,32 @@

import io.reactivex.netty.channel.Handler;
import rx.Observable;
import rx.functions.Action0;

/**
* @author Nitesh Kant
*/
class TestableRequestRouter<I, O> implements Handler<I, O> {

private volatile boolean called;
private volatile boolean unsubscribed;

public boolean isReceivedACall() {
return called;
}

public boolean isUnsubscribed() {
return unsubscribed;
}

@Override
public Observable<Void> handle(I input, O output) {
called = true;
return Observable.empty();
return Observable.<Void>empty().doOnUnsubscribe(new Action0() {
@Override
public void call() {
unsubscribed = true;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
package netflix.karyon.examples.hellonoss.server.rxnetty;

import com.google.inject.AbstractModule;
import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.examples.hellonoss.common.health.HealthCheck;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.servo.KaryonServoModule;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;

import javax.ws.rs.core.Response;

/**
* @author Nitesh Kant
*/
public class MyApplicationRunner {

public static class HealthCheckHandlerModule extends AbstractModule {
@Override
protected void configure() {
bind(HealthCheckHandler.class).toInstance(new HealthCheckHandler() {
@Override
public int getStatus() {
return Response.Status.OK.getStatusCode();
}
});
}
}

public static void main(String[] args) {
HealthCheck healthCheckHandler = new HealthCheck();
Karyon.forRequestHandler(8888,
Expand All @@ -40,9 +24,7 @@ public static void main(String[] args) {
// KaryonEurekaModule.asBootstrapModule(), /* Uncomment if you need eureka */
Karyon.toBootstrapModule(KaryonWebAdminModule.class),
ShutdownModule.asBootstrapModule(),
KaryonServoModule.asBootstrapModule(),
Karyon.toBootstrapModule(HealthCheckHandlerModule.class)
)
.startAndWaitTillShutdown();
KaryonServoModule.asBootstrapModule()
).startAndWaitTillShutdown();
}
}

0 comments on commit 9d13790

Please sign in to comment.