Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimizing NIO processing with reactor thread model #1254

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/java_websocket/AbstractWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected void stopConnectionLostTimer() {
*
* @since 1.3.4
*/
protected void startConnectionLostTimer() {
public void startConnectionLostTimer() {
synchronized (syncConnectionLost) {
if (this.connectionLostTimeout <= 0) {
log.trace("Connection lost timer deactivated");
Expand Down
122 changes: 119 additions & 3 deletions src/main/java/org/java_websocket/WebSocketImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.java_websocket.handshake.ServerHandshakeBuilder;
import org.java_websocket.interfaces.ISSLChannel;
import org.java_websocket.protocols.IProtocol;
import org.java_websocket.server.WebSocketServer;
import org.java_websocket.server.WebSocketServer.WebSocketWorker;
import org.java_websocket.util.Charsetfunctions;
import org.slf4j.Logger;
Expand All @@ -69,7 +71,7 @@
* "handshake" phase, then allows for easy sending of text frames, and receiving frames through an
* event-based model.
*/
public class WebSocketImpl implements WebSocket {
public class WebSocketImpl implements WebSocket, Runnable {

/**
* The default port of WebSockets, as defined in the spec. If the nullary constructor is used,
Expand Down Expand Up @@ -914,6 +916,120 @@ public WebSocketWorker getWorkerThread() {
public void setWorkerThread(WebSocketWorker workerThread) {
this.workerThread = workerThread;
}



@Override
public void run() {
if( key.isValid() ) {
WebSocketImpl conn = (WebSocketImpl) key.attachment();
if (key.isReadable()) {
ByteBuffer buf = null;
try {
synchronized (this.wsl) {
buf = ((WebSocketServer) wsl).takeBuffer();
}
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
if(conn.getChannel() == null){
key.cancel();
handleIOException( key, conn, new IOException() );
}
try {
if( SocketChannelIOHelper.read(buf, conn, conn.getChannel())) {
if(buf.hasRemaining()) {
conn.inQueue.put( buf );
synchronized (this.wsl) {
((WebSocketServer) wsl).queue( conn );
}
if(conn.getChannel() instanceof WrappedByteChannel && ((WrappedByteChannel) conn.getChannel()).isNeedRead()) {
synchronized (this.wsl) {
((WebSocketServer) wsl).getIqueue().add( conn );
}
}
} else {
synchronized (this.wsl) {
((WebSocketServer) wsl).pushBuffer(buf);
}
}
} else {
synchronized (this.wsl) {
((WebSocketServer) wsl).pushBuffer( buf );
}
}
} catch ( IOException e ) {
try {
synchronized (this.wsl) {
((WebSocketServer) wsl).pushBuffer( buf );
}
} catch (InterruptedException e1) {
log.error(e1.getMessage(), e1);
}
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
} else if(key.isWritable()) {
try {
if(SocketChannelIOHelper.batch(conn, conn.getChannel()) && key.isValid()) {
key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
try {
synchronized (this.wsl) {
doAdditionalRead();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
if( key != null )
key.cancel();
handleIOException( key, conn, e );
}
}
}

private void handleIOException(SelectionKey key, WebSocket conn, IOException ex) {
// onWebsocketError( conn, ex );// conn may be null here
if (key != null) {
key.cancel();
}
if (conn != null) {
conn.closeConnection(CloseFrame.ABNORMAL_CLOSE, ex.getMessage());
} else if (key != null) {
SelectableChannel channel = key.channel();
if (channel != null && channel
.isOpen()) { // this could be the case if the IOException ex is a SSLException
try {
channel.close();
} catch (IOException e) {
// there is nothing that must be done here
}
log.trace("Connection closed because of exception", ex);
}
}
}

private void doAdditionalRead() throws InterruptedException, IOException {
WebSocketImpl conn;
while ( !((WebSocketServer) wsl).getIqueue().isEmpty() ) {
conn = ((WebSocketServer) wsl).getIqueue().remove( 0 );
WrappedByteChannel c = ( (WrappedByteChannel) conn.getChannel() );
ByteBuffer buf = ((WebSocketServer) wsl).takeBuffer();
try {
if( SocketChannelIOHelper.readMore( buf, conn, c ) )
((WebSocketServer) wsl).getIqueue().add( conn );
if( buf.hasRemaining() ) {
conn.inQueue.put( buf );
((WebSocketServer) wsl).queue( conn );
} else {
((WebSocketServer) wsl).pushBuffer( buf );
}
} catch ( IOException e ) {
((WebSocketServer) wsl).pushBuffer( buf );
throw e;
}
}
}
}
111 changes: 111 additions & 0 deletions src/main/java/org/java_websocket/reactor/Acceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.java_websocket.reactor;

import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import org.java_websocket.WebSocket;
import org.java_websocket.WebSocketImpl;
import org.java_websocket.framing.CloseFrame;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* acceptor process new client connections and dispatch requests to the processor chain
*/
public class Acceptor implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Acceptor.class);
private final ServerSocketChannel ssc; // socket channel monitored by mainReactor
private final WebSocketServer wss;
private final int cores = Runtime.getRuntime().availableProcessors(); // get the number of CPU cores
private final Selector[] selectors = new Selector[cores]; // create several core selectors for subReactor
private int selIdx = 0; // currently available subreactor indexes
private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor thread
private Thread[] t = new Thread[cores]; // subReactor thread

public Acceptor(ServerSocketChannel s, WebSocketServer server) throws IOException {
this.ssc = s;
this.wss = server;
// Create multiple selectors and multiple subReactor threads
for (int i = 0; i < cores; i++) {
selectors[i] = Selector.open();
r[i] = new TCPSubReactor(selectors[i], s, i);
t[i] = new Thread(r[i]);
t[i].start();
}
}

@Override
public void run() {
try {
SocketChannel sc = ssc.accept(); // receive client connection request
if (sc != null) {
log.trace(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
log.trace("selIdx is {}", selIdx);
sc.configureBlocking(false); // set non blocking
Socket socket = sc.socket();
socket.setTcpNoDelay(this.wss.isTcpNoDelay());
socket.setKeepAlive(true);
r[selIdx].setRestart(true); // pause thread
selectors[selIdx].wakeup(); // causes a blocked selector operation to return immediately
WebSocketImpl w = this.wss.getWsf().createWebSocket(this.wss, this.wss.getDrafts());
SelectionKey sk = sc.register(selectors[selIdx], SelectionKey.OP_READ, w);
w.setSelectionKey(sk);
try {
w.setChannel(this.wss.getWsf().wrapChannel(sc, w.getSelectionKey()));
allocateBuffers(w);
} catch (IOException ex) {
log.error(ex.getMessage(), ex);
if (w.getSelectionKey() != null)
w.getSelectionKey().cancel();
handleIOException(w.getSelectionKey(), null, ex);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
selectors[selIdx].wakeup(); // causes a blocked selector operation to return immediately
r[selIdx].setRestart(false); // restart thread
if (++selIdx == selectors.length)
selIdx = 0;
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}

protected void allocateBuffers(WebSocket c) throws InterruptedException {
synchronized (this.wss) {
if (this.wss.getQueuesize().get() >= 2 * this.wss.getDecoders().size() + 1) {
return;
}
this.wss.getQueuesize().incrementAndGet();
this.wss.getBuffers().put(createBuffer());
}
}

public ByteBuffer createBuffer() {
return ByteBuffer.allocate(WebSocketImpl.RCVBUF);
}

private void handleIOException(SelectionKey key, WebSocket conn, IOException ex) {
// onWebsocketError( conn, ex );// conn may be null here
if (conn != null) {
conn.closeConnection(CloseFrame.ABNORMAL_CLOSE, ex.getMessage());
} else if (key != null) {
SelectableChannel channel = key.channel();
if (channel != null && channel.isOpen()) { // this could be the case if the IOException ex is a SSLException
try {
channel.close();
} catch (IOException e) {
// there is nothing that must be done here
}
log.trace("Connection closed because of exception", ex);
}
}
}
}
79 changes: 79 additions & 0 deletions src/main/java/org/java_websocket/reactor/TCPReactor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.java_websocket.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

import org.java_websocket.server.WebSocketServer;
import org.java_websocket.server.WebSocketServer.WebSocketWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TCPReactor is the mainReactor, process new client connections
*/
public class TCPReactor implements Runnable {
private static final Logger log = LoggerFactory.getLogger(TCPReactor.class);

private ServerSocketChannel ssc;
private Selector selector;

public TCPReactor(InetSocketAddress address, ServerSocketChannel server, WebSocketServer wss,
Selector s) {
try {
this.ssc = server;
this.selector = s;
Acceptor acceptor = new Acceptor(server, wss);
SelectionKey sk = server.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(acceptor);
wss.startConnectionLostTimer();
for (WebSocketWorker ex : wss.getDecoders()) {
ex.start();
}
wss.onStart();
} catch (SocketException e) {
log.error(e.getMessage(), e);
} catch (ClosedChannelException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}

@Override
public void run() {
while (!Thread.interrupted()) {
log.trace("mainReactor waiting for new event on port: " + ssc.socket().getLocalPort()
+ "...");
try {
if (selector.select() == 0) {// if no event is ready, do not proceed
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
it.remove();
}
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (ClosedSelectorException e) {
log.error(e.getMessage(), e);
}
}
}

private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment());
if (r != null)
r.run();
}

}
Loading