Skip to content
This repository was archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Merge pull request #106 from Darkless012/master
Browse files Browse the repository at this point in the history
Updated Netty version (current is buggy)
  • Loading branch information
Matthias247 authored May 17, 2017
2 parents 04a558e + 664dfb5 commit e98c2f3
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 57 deletions.
4 changes: 2 additions & 2 deletions jawampa-netty/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>ws.wamp.jawampa</groupId>
<artifactId>jawampa-parent</artifactId>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>4.0.24.Final</version>
<version>4.1.11.Final</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
* A websocket server adapter for WAMP that integrates into a Netty pipeline.
*/
public class WampServerWebsocketHandler extends ChannelInboundHandlerAdapter {

final String websocketPath;
final WampRouter router;
final IWampConnectionAcceptor connectionAcceptor;
final List<WampSerialization> supportedSerializations;

WampSerialization serialization = WampSerialization.Invalid;
boolean handshakeInProgress = false;

Expand All @@ -75,18 +75,18 @@ public WampServerWebsocketHandler(String websocketPath, WampRouter router,
this.connectionAcceptor = router.connectionAcceptor();
this.supportedSerializations = supportedSerializations;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest request = (msg instanceof FullHttpRequest) ? (FullHttpRequest) msg : null;

// Check for invalid http messages during handshake
if (request != null && handshakeInProgress) {
request.release();
sendBadRequestAndClose(ctx, null);
return;
}

// Transform this when we have an upgrade for our path,
// otherwise pass the message
if (request != null && isUpgradeRequest(request)) {
Expand All @@ -99,17 +99,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.fireChannelRead(msg);
}
}

private boolean isUpgradeRequest(FullHttpRequest request) {
if (!request.getDecoderResult().isSuccess()) {
return false;
}

String connectionHeaderValue = request.headers().get(HttpHeaders.Names.CONNECTION);
if (connectionHeaderValue == null) {
return false;
}
String[] connectionHeaderFields = StringUtil.split(connectionHeaderValue.toLowerCase(), ',');
String[] connectionHeaderFields = connectionHeaderValue.toLowerCase().split(",");
boolean hasUpgradeField = false;
for (String s : connectionHeaderFields) {
if (s.trim().equals(HttpHeaders.Values.UPGRADE.toLowerCase())) {
Expand All @@ -120,36 +120,36 @@ private boolean isUpgradeRequest(FullHttpRequest request) {
if (!hasUpgradeField) {
return false;
}

if (!request.headers().contains(HttpHeaders.Names.UPGRADE, HttpHeaders.Values.WEBSOCKET, true)){
return false;
}

return request.getUri().equals(websocketPath);
}

// All methods inside the connection will be called from the WampRouters thread
// This causes no problems on the ordering since they all will be called from
// the same thread. And Netty is threadsafe
static class WampServerConnection implements IWampConnection {
final WampSerialization serialization;

final WampSerialization serialization;
ChannelHandlerContext ctx;

public WampServerConnection(WampSerialization serialization) {
this.serialization = serialization;
}

@Override
public WampSerialization serialization() {
return serialization;
}

@Override
public boolean isSingleWriteOnly() {
return false;
}

@Override
public void sendMessage(WampMessage message, final IWampConnectionPromise<Void> promise) {
ChannelFuture f = ctx.writeAndFlush(message);
Expand All @@ -163,7 +163,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
});
}

@Override
public void close(boolean sendRemaining, final IWampConnectionPromise<Void> promise) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
Expand All @@ -185,7 +185,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}
}

private void tryWebsocketHandshake(final ChannelHandlerContext ctx, FullHttpRequest request) {
String wsLocation = getWebSocketLocation(ctx, request);
String subProtocols = WampSerialization.makeWebsocketSubprotocolList(supportedSerializations);
Expand All @@ -195,7 +195,7 @@ private void tryWebsocketHandshake(final ChannelHandlerContext ctx, FullHttpRequ
false,
WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE)
.newHandshaker(request);

if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
Expand All @@ -205,84 +205,84 @@ private void tryWebsocketHandshake(final ChannelHandlerContext ctx, FullHttpRequ
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), request);
String actualProtocol = handshaker.selectedSubprotocol();
serialization = WampSerialization.fromString(actualProtocol);

// In case of unsupported websocket subprotocols we close the connection.
// Won't help us when the client will ignore our protocol response and send
// invalid packets anyway
if (serialization == WampSerialization.Invalid) {
handshakeFuture.addListener(ChannelFutureListener.CLOSE);
return;
}

// Remove all handlers after this one - we don't need them anymore since we switch to WAMP
ChannelHandler last = ctx.pipeline().last();
while (last != null && last != this) {
ctx.pipeline().removeLast();
last = ctx.pipeline().last();
}

if (last == null) {
throw new IllegalStateException("Can't find the WAMP server handler in the pipeline");
}

// Remove the WampServerWebSocketHandler and replace it with the protocol handler
// which processes pings and closes
ProtocolHandler protocolHandler = new ProtocolHandler();
ctx.pipeline().replace(this, "wamp-websocket-protocol-handler", protocolHandler);
final ChannelHandlerContext protocolHandlerCtx = ctx.pipeline().context(protocolHandler);

// Handle websocket fragmentation before the deserializer
protocolHandlerCtx.pipeline().addLast(new WebSocketFrameAggregator(WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE));

// Install the serializer and deserializer
protocolHandlerCtx.pipeline().addLast("wamp-serializer",
protocolHandlerCtx.pipeline().addLast("wamp-serializer",
new WampSerializationHandler(serialization));
protocolHandlerCtx.pipeline().addLast("wamp-deserializer",
protocolHandlerCtx.pipeline().addLast("wamp-deserializer",
new WampDeserializationHandler(serialization));

// Retrieve a listener for this new connection
final IWampConnectionListener connectionListener = connectionAcceptor.createNewConnectionListener();

// Create a Wamp connection interface on top of that
final WampServerConnection connection = new WampServerConnection(serialization);

ChannelHandler routerHandler = new SimpleChannelInboundHandler<WampMessage> () {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Gets called once the channel gets added to the pipeline
connection.ctx = ctx;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
connectionAcceptor.acceptNewConnection(connection, connectionListener);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connectionListener.transportClosed();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, WampMessage msg) throws Exception {
connectionListener.messageReceived(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
connectionListener.transportError(cause);
}
};

// Install the router in the pipeline
protocolHandlerCtx.pipeline().addLast("wamp-router", routerHandler);

handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The handshake was not successful.
// The handshake was not successful.
// Close the channel without registering
ctx.fireExceptionCaught(future.cause()); // TODO: This is a race condition if the router did not yet accept the connection
} else {
Expand All @@ -292,12 +292,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
});

// TODO: Maybe there are frames incoming before the handshakeFuture is resolved
// This might lead to frames getting sent to the router before it is activated
}
}

private String getWebSocketLocation(ChannelHandlerContext ctx, FullHttpRequest req) {
String location = req.headers().get(HOST) + websocketPath;
if (ctx.pipeline().get(SslHandler.class) != null) {
Expand All @@ -306,7 +306,7 @@ private String getWebSocketLocation(ChannelHandlerContext ctx, FullHttpRequest r
return "ws://" + location;
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof WebSocketHandshakeException) {
Expand All @@ -315,51 +315,51 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}
}

private static void sendBadRequestAndClose(ChannelHandlerContext ctx, String message) {
FullHttpResponse response;
if (message != null) {
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST,
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST,
Unpooled.wrappedBuffer(message.getBytes()));
} else {
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
}
ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

public static class ProtocolHandler extends ChannelInboundHandlerAdapter {

enum ReadState {
Closed,
Reading,
Error
}

ReadState readState = ReadState.Reading;

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
readState = ReadState.Closed;
ctx.fireChannelInactive();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Discard messages when we are not reading
if (readState != ReadState.Reading) {
ReferenceCountUtil.release(msg);
return;
}

// We might receive http requests here when the whe clients sends something after the upgrade
// request but we have not fully sent out the response and the http codec is still installed.
// However that would be an error.
Expand All @@ -368,7 +368,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
WampServerWebsocketHandler.sendBadRequestAndClose(ctx, null);
return;
}

if (msg instanceof PingWebSocketFrame) {
// Respond to Pings with Pongs
try {
Expand All @@ -380,15 +380,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// Echo the close and close the connection
readState = ReadState.Closed;
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);

} else {
ctx.fireChannelRead(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Will be called either through an exception in channelRead
// Will be called either through an exception in channelRead
// or when the websocket handshake fails
readState = ReadState.Error;
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
Expand Down

0 comments on commit e98c2f3

Please sign in to comment.